Skip to content
jobs.py 3.11 KiB
Newer Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl.  If not, see <http://www.gnu.org/licenses/>.


Anael Beutot's avatar
Anael Beutot committed
"""Interface implementation for JobsManager from cc-common."""

Anael Beutot's avatar
Anael Beutot committed
from datetime import datetime
import subprocess
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.common.jobs import JobsManagerInterface, Job, JobCancelError
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.common.tql.db.helpers import taggify
from cloudcontrol.common.client.tags import TagDB, Tag

Antoine Millet's avatar
Antoine Millet committed

Anael Beutot's avatar
Anael Beutot committed
class NodeJobsManagerInterface(JobsManagerInterface):

    TAG_ATTRIBUTES = ('title', 'status', 'state', 'owner', 'created', 'ended',
Antoine Millet's avatar
Antoine Millet committed
                      'attachments', 'batch')
Anael Beutot's avatar
Anael Beutot committed

    def __init__(self, host_handler):
Anael Beutot's avatar
Anael Beutot committed
        self.handler = host_handler
        self.tag_db = TagDB(self.handler.tag_db)
Anael Beutot's avatar
Anael Beutot committed

    def on_job_created(self, job):
        def get_tag_value(tag):
            return lambda job: taggify(getattr(job, tag))

        self.tag_db.add_sub_object(
            job.id,
            (Tag(tag, get_tag_value(tag), parent=job)
             for tag in self.TAG_ATTRIBUTES),
            'job',
        )
Antoine Millet's avatar
Antoine Millet committed

Anael Beutot's avatar
Anael Beutot committed
        def tag_duration(job):
            if job.ended is None:
                ended = datetime.fromtimestamp(self.handler.main.evloop.now())
            else:
                ended = job.ended

            return taggify(ended - job.created)

        self.tag_db.add_sub_tag(job.id, Tag('duration', tag_duration, 2, 1, job))
Anael Beutot's avatar
Anael Beutot committed

    def on_job_updated(self, job):
        for tag in self.TAG_ATTRIBUTES:
            self.tag_db[job.id][tag].update_value()

    def on_job_purged(self, job_id):
        self.tag_db.remove_sub_object(job_id)
Anael Beutot's avatar
Anael Beutot committed


class ScriptJob(Job):
    """Job that runs a script."""

    def job(self, script, filename, args):
        self.title = 'Script: %s %s' % (script, ' '.join(args))
        self.process = None
        to_exec = (filename,) + args
        output = self.attachment('output')
        self.logger.debug('To exec: %s', to_exec)
        try:
            self.process = subprocess.Popen(to_exec, stdout=output,
                                            stderr=output, close_fds=True)
            self.logger.info('Script running with pid: %d', self.process.pid)
            ex = self.process.wait()
            if ex != 0:
                raise JobCancelError('Script returned exit code %d' % ex)
        except subprocess.CalledProcessError:
            self.logger.error('Error while executing script: %s', script)
            raise

    def cancel(self):
        super(ScriptJob, self).cancel()
        if self.process is not None:
            self.logger.info('Terminating job %s', self.title)
            self.process.terminate()