Newer
Older
"""Interface implementation for JobsManager from cc-common."""
from cloudcontrol.common.jobs import JobsManagerInterface, Job, JobCancelError
from cloudcontrol.common.tql.db.helpers import taggify
from cloudcontrol.common.client.tags import TagDB, Tag
class NodeJobsManagerInterface(JobsManagerInterface):
TAG_ATTRIBUTES = ('title', 'status', 'state', 'owner', 'created', 'ended',
self.handler = host_handler
self.tag_db = TagDB(self.handler.tag_db)
def on_job_created(self, job):
def get_tag_value(tag):
return lambda job: taggify(getattr(job, tag))
self.tag_db.add_sub_object(
job.id,
(Tag(tag, get_tag_value(tag), parent=job)
for tag in self.TAG_ATTRIBUTES),
'job',
)
def tag_duration(job):
if job.ended is None:
ended = datetime.fromtimestamp(self.handler.main.evloop.now())
else:
ended = job.ended
return taggify(ended - job.created)
self.tag_db.add_sub_tag(job.id, Tag('duration', tag_duration, 2, 1, job))
def on_job_updated(self, job):
for tag in self.TAG_ATTRIBUTES:
self.tag_db[job.id][tag].update_value()
def on_job_purged(self, job_id):
self.tag_db.remove_sub_object(job_id)
class ScriptJob(Job):
"""Job that runs a script."""
def job(self, script, filename, args):
self.title = 'Script: %s %s' % (script, ' '.join(args))
self.process = None
to_exec = (filename,) + args
output = self.attachment('output')
self.logger.debug('To exec: %s', to_exec)
try:
self.process = subprocess.Popen(to_exec, stdout=output,
stderr=output, close_fds=True)
self.logger.info('Script running with pid: %d', self.process.pid)
ex = self.process.wait()
if ex != 0:
raise JobCancelError('Script returned exit code %d' % ex)
except subprocess.CalledProcessError:
self.logger.error('Error while executing script: %s', script)
raise
def cancel(self):
super(ScriptJob, self).cancel()
if self.process is not None:
self.logger.info('Terminating job %s', self.title)
self.process.terminate()