Skip to content
jobs.py 2.43 KiB
Newer Older
Anael Beutot's avatar
Anael Beutot committed
"""Interface implementation for JobsManager from cc-common."""

Anael Beutot's avatar
Anael Beutot committed
import time
from datetime import datetime
import subprocess
Anael Beutot's avatar
Anael Beutot committed

Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.common.jobs import JobsManagerInterface, Job, JobCancelError
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.common.tql.db.helpers import taggify
from cloudcontrol.common.client.tags import TagDB, Tag

class NodeJobsManagerInterface(JobsManagerInterface):

    TAG_ATTRIBUTES = ('title', 'status', 'state', 'owner', 'created', 'ended',
                      'attachments')

    def __init__(self, host_handler):
Anael Beutot's avatar
Anael Beutot committed
        self.handler = host_handler
        self.tag_db = TagDB(self.handler.tag_db)
Anael Beutot's avatar
Anael Beutot committed

    def on_job_created(self, job):
        def get_tag_value(tag):
            return lambda job: taggify(getattr(job, tag))

        self.tag_db.add_sub_object(
            job.id,
            (Tag(tag, get_tag_value(tag), parent=job)
             for tag in self.TAG_ATTRIBUTES),
            'job',
        )
Anael Beutot's avatar
Anael Beutot committed
        def tag_duration(job):
            if job.ended is None:
                ended = datetime.fromtimestamp(self.handler.main.evloop.now())
            else:
                ended = job.ended

            return taggify(ended - job.created)

        self.tag_db.add_sub_tag(job.id, Tag('duration', tag_duration, 2, 1, job))
Anael Beutot's avatar
Anael Beutot committed

    def on_job_updated(self, job):
        for tag in self.TAG_ATTRIBUTES:
            self.tag_db[job.id][tag].update_value()

    def on_job_purged(self, job_id):
        self.tag_db.remove_sub_object(job_id)
Anael Beutot's avatar
Anael Beutot committed


class ScriptJob(Job):
    """Job that runs a script."""

    def job(self, script, filename, args):
        self.title = 'Script: %s %s' % (script, ' '.join(args))
        self.process = None
        to_exec = (filename,) + args
        output = self.attachment('output')
        self.logger.debug('To exec: %s', to_exec)
        try:
            self.process = subprocess.Popen(to_exec, stdout=output,
                                            stderr=output, close_fds=True)
            self.logger.info('Script running with pid: %d', self.process.pid)
            ex = self.process.wait()
            if ex != 0:
                raise JobCancelError('Script returned exit code %d' % ex)
        except subprocess.CalledProcessError:
            self.logger.error('Error while executing script: %s', script)
            raise

    def cancel(self):
        super(ScriptJob, self).cancel()
        if self.process is not None:
            self.logger.info('Terminating job %s', self.title)
            self.process.terminate()