Newer
Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see <http://www.gnu.org/licenses/>.
"""Interface implementation for JobsManager from cc-common."""
from cloudcontrol.common.jobs import JobsManagerInterface, Job, JobCancelError
from cloudcontrol.common.tql.db.helpers import taggify
from cloudcontrol.common.client.tags import TagDB, Tag
class NodeJobsManagerInterface(JobsManagerInterface):
TAG_ATTRIBUTES = ('title', 'status', 'state', 'owner', 'created', 'ended',
self.handler = host_handler
self.tag_db = TagDB(self.handler.tag_db)
def on_job_created(self, job):
def get_tag_value(tag):
return lambda job: taggify(getattr(job, tag))
self.tag_db.add_sub_object(
job.id,
(Tag(tag, get_tag_value(tag), parent=job)
for tag in self.TAG_ATTRIBUTES),
'job',
)
def tag_duration(job):
if job.ended is None:
ended = datetime.fromtimestamp(self.handler.main.evloop.now())
else:
ended = job.ended
return taggify(ended - job.created)
self.tag_db.add_sub_tag(job.id, Tag('duration', tag_duration, 2, 1, job))
def on_job_updated(self, job):
for tag in self.TAG_ATTRIBUTES:
self.tag_db[job.id][tag].update_value()
def on_job_purged(self, job_id):
self.tag_db.remove_sub_object(job_id)
class ScriptJob(Job):
"""Job that runs a script."""
def job(self, script, filename, args):
self.title = 'Script: %s %s' % (script, ' '.join(args))
self.process = None
to_exec = (filename,) + args
output = self.attachment('output')
self.logger.debug('To exec: %s', to_exec)
try:
self.process = subprocess.Popen(to_exec, stdout=output,
stderr=output, close_fds=True)
self.logger.info('Script running with pid: %d', self.process.pid)
ex = self.process.wait()
if ex != 0:
raise JobCancelError('Script returned exit code %d' % ex)
except subprocess.CalledProcessError:
self.logger.error('Error while executing script: %s', script)
raise
def cancel(self):
super(ScriptJob, self).cancel()
if self.process is not None:
self.logger.info('Terminating job %s', self.title)
self.process.terminate()