# This file is part of CloudControl. # # CloudControl is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # CloudControl is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with CloudControl. If not, see . """Interface implementation for JobsManager from cc-common.""" from datetime import datetime import subprocess from cloudcontrol.common.jobs import JobsManagerInterface, Job, JobCancelError from cloudcontrol.common.tql.db.helpers import taggify from cloudcontrol.common.client.tags import TagDB, Tag class NodeJobsManagerInterface(JobsManagerInterface): TAG_ATTRIBUTES = ('title', 'status', 'state', 'owner', 'created', 'ended', 'attachments', 'batch') def __init__(self, host_handler): self.handler = host_handler self.tag_db = TagDB(self.handler.tag_db) def on_job_created(self, job): def get_tag_value(tag): return lambda job: taggify(getattr(job, tag)) self.tag_db.add_sub_object( job.id, (Tag(tag, get_tag_value(tag), parent=job) for tag in self.TAG_ATTRIBUTES), 'job', ) def tag_duration(job): if job.ended is None: ended = datetime.fromtimestamp(self.handler.main.evloop.now()) else: ended = job.ended return taggify(ended - job.created) self.tag_db.add_sub_tag(job.id, Tag('duration', tag_duration, 2, 1, job)) def on_job_updated(self, job): for tag in self.TAG_ATTRIBUTES: self.tag_db[job.id][tag].update_value() def on_job_purged(self, job_id): self.tag_db.remove_sub_object(job_id) class ScriptJob(Job): """Job that runs a script.""" def job(self, script, filename, args): self.title = 'Script: %s %s' % (script, ' '.join(args)) self.process = None to_exec = (filename,) + args output = self.attachment('output') self.logger.debug('To exec: %s', to_exec) try: self.process = subprocess.Popen(to_exec, stdout=output, stderr=output, close_fds=True) self.logger.info('Script running with pid: %d', self.process.pid) ex = self.process.wait() if ex != 0: raise JobCancelError('Script returned exit code %d' % ex) except subprocess.CalledProcessError: self.logger.error('Error while executing script: %s', script) raise def cancel(self): super(ScriptJob, self).cancel() if self.process is not None: self.logger.info('Terminating job %s', self.title) self.process.terminate()