Loading cloudcontrol/node/host/jobs.py +42 −4 Original line number Diff line number Diff line """Interface implementation for JobsManager from cc-common.""" import time from datetime import datetime import subprocess from cloudcontrol.common.jobs import JobsManagerInterface from cloudcontrol.common.jobs import JobsManagerInterface, Job, JobCancelError from cloudcontrol.common.tql.db.helpers import taggify from cloudcontrol.common.client.tags import TagDB, Tag class NodeJobsManagerInterface(JobsManagerInterface): TAG_ATTRIBUTES = ('title', 'status', 'state', 'owner', 'created', 'ended', 'attachments') def __init__(self, host_handler): self.tag_db = TagDB(host_handler.tag_db) self.handler = host_handler self.tag_db = TagDB(self.handler.tag_db) def on_job_created(self, job): def get_tag_value(tag): Loading @@ -24,7 +27,15 @@ class NodeJobsManagerInterface(JobsManagerInterface): for tag in self.TAG_ATTRIBUTES), 'job', ) # TODO add duration tag def tag_duration(job): if job.ended is None: ended = datetime.fromtimestamp(self.handler.main.evloop.now()) else: ended = job.ended return taggify(ended - job.created) self.tag_db.add_sub_tag(job.id, Tag('duration', tag_duration, 2, 1, job)) def on_job_updated(self, job): for tag in self.TAG_ATTRIBUTES: Loading @@ -32,3 +43,30 @@ class NodeJobsManagerInterface(JobsManagerInterface): def on_job_purged(self, job_id): self.tag_db.remove_sub_object(job_id) class ScriptJob(Job): """Job that runs a script.""" def job(self, script, filename, args): self.title = 'Script: %s %s' % (script, ' '.join(args)) self.process = None to_exec = (filename,) + args output = self.attachment('output') self.logger.debug('To exec: %s', to_exec) try: self.process = subprocess.Popen(to_exec, stdout=output, stderr=output, close_fds=True) self.logger.info('Script running with pid: %d', self.process.pid) ex = self.process.wait() if ex != 0: raise JobCancelError('Script returned exit code %d' % ex) except subprocess.CalledProcessError: self.logger.error('Error while executing script: %s', script) raise def cancel(self): super(ScriptJob, self).cancel() if self.process is not None: self.logger.info('Terminating job %s', self.title) self.process.terminate() Loading
cloudcontrol/node/host/jobs.py +42 −4 Original line number Diff line number Diff line """Interface implementation for JobsManager from cc-common.""" import time from datetime import datetime import subprocess from cloudcontrol.common.jobs import JobsManagerInterface from cloudcontrol.common.jobs import JobsManagerInterface, Job, JobCancelError from cloudcontrol.common.tql.db.helpers import taggify from cloudcontrol.common.client.tags import TagDB, Tag class NodeJobsManagerInterface(JobsManagerInterface): TAG_ATTRIBUTES = ('title', 'status', 'state', 'owner', 'created', 'ended', 'attachments') def __init__(self, host_handler): self.tag_db = TagDB(host_handler.tag_db) self.handler = host_handler self.tag_db = TagDB(self.handler.tag_db) def on_job_created(self, job): def get_tag_value(tag): Loading @@ -24,7 +27,15 @@ class NodeJobsManagerInterface(JobsManagerInterface): for tag in self.TAG_ATTRIBUTES), 'job', ) # TODO add duration tag def tag_duration(job): if job.ended is None: ended = datetime.fromtimestamp(self.handler.main.evloop.now()) else: ended = job.ended return taggify(ended - job.created) self.tag_db.add_sub_tag(job.id, Tag('duration', tag_duration, 2, 1, job)) def on_job_updated(self, job): for tag in self.TAG_ATTRIBUTES: Loading @@ -32,3 +43,30 @@ class NodeJobsManagerInterface(JobsManagerInterface): def on_job_purged(self, job_id): self.tag_db.remove_sub_object(job_id) class ScriptJob(Job): """Job that runs a script.""" def job(self, script, filename, args): self.title = 'Script: %s %s' % (script, ' '.join(args)) self.process = None to_exec = (filename,) + args output = self.attachment('output') self.logger.debug('To exec: %s', to_exec) try: self.process = subprocess.Popen(to_exec, stdout=output, stderr=output, close_fds=True) self.logger.info('Script running with pid: %d', self.process.pid) ex = self.process.wait() if ex != 0: raise JobCancelError('Script returned exit code %d' % ex) except subprocess.CalledProcessError: self.logger.error('Error while executing script: %s', script) raise def cancel(self): super(ScriptJob, self).cancel() if self.process is not None: self.logger.info('Terminating job %s', self.title) self.process.terminate()