Commit 3b2b16b8 authored by Antoine Millet's avatar Antoine Millet
Browse files

Added jobs system

parent 9918e7d6
Loading
Loading
Loading
Loading
+14 −0
Original line number Diff line number Diff line
""" Jobs manager of CloudControl.

The jobs manager is a component running on both server and nodes. It is in
charge of reporting of jobs status.
"""


from cloudcontrol.common.jobs.interface import JobsManagerInterface
from cloudcontrol.common.jobs.store import JobsStore
from cloudcontrol.common.jobs.job import Job, JobCancelError
from cloudcontrol.common.jobs.manager import JobsManager


__all__ = ['JobsManager', 'JobsManagerInterface', 'Job', 'JobsStore', 'JobCancelError']
+23 −0
Original line number Diff line number Diff line
""" Bunch of helpers for jobs system.
"""

from threading import Lock


class AutoIncrementCounter(object):

    """ Manage an auto incremented counter.
    """

    def __init__(self, store):
        self._store = store
        self._current = self._store.get_id_counter()
        self._lock = Lock()

    def get(self):
        """ Get a new incremented number.
        """
        with self._lock:
            self._current += 1
            self._store.store_id_counter(self._current)
            return self._current
+24 −0
Original line number Diff line number Diff line
""" Interface used to get events from the jobs manager.
"""

from abc import ABCMeta


class JobsManagerInterface(object):

    __metaclass__ = ABCMeta

    def __init__(self):
        pass

    def on_job_created(self, job):
        """ Method called by the jobs manager when a job is created.
        """

    def on_job_updated(self, job):
        """ Method called when a change occurs in a job attribute.
        """

    def on_job_purged(self, job):
        """ Method called by the jobs manager when a job is purged.
        """
+291 −0
Original line number Diff line number Diff line
""" Job and job state.

This module contains the following objects:

- JobState: A class representing the job state
- Job: A class (subclass of Thread) representing the job itself
- JobCancelError, FreezedJobError, AlreadyGotAttachmentError: exceptions used by
  job classes.
"""

import logging
from datetime import datetime
from threading import Thread


class JobCancelError(Exception):
    """ Error raised to cancel a job.
    """
    pass


class FreezedJobError(Exception):
    """ Error raised when the requested action is not compatible with the
        freeze state of the job.
    """
    pass


class AlreadyGotAttachmentError(Exception):
    """ Error raised when an attachment is requested more than one time.
    """
    pass


class JobState(object):

    """ State of a job.

    A system job is a special job which can be used to start background tasks.
    System jobs are not persistent and are not notified through the interface.

    This class represent the full state of a job.
    """

    STATES = ('init', 'running', 'done', 'cancelling', 'rollbacking')

    def __init__(self, logger, job_class, job_id, manager, owner, settings, system=False):
        self.logger = logger
        self._frozen = False  # Is the jobstate frozen (loaded from store and
                               # no more modifiable)
        self._manager = manager

        # Jobs attributes:
        self._job_id = job_id

        self._title = str(job_class.__class__)      # The title of the job
        self._status = None                         # Textual description about status
        self._state = 'init'                        # State of the job
        self._owner = owner                         # Id of the owner object
        self._created = datetime.now()              # Creation date of the job
        self._ended = None                          # Termination date of the job
        self._system = system                       # Is a system job?
        self._attachments = set()                   # Files attached to the job

        # Bound to the job itself:
        self._job = job_class(self.logger, self, **settings)

    def _check_freeze(self):
        """ Raise FreezedJobError if object is freezed.
        """
        if self._frozen:
            raise FreezedJobError('This job is freezed')

    def __getstate__(self):
        jobstate = self.__dict__.copy()
        jobstate['logger'] = None
        jobstate['_job'] = None
        jobstate['_manager'] = None
        jobstate['_frozen'] = True
        if jobstate['_state'] != 'done':
            jobstate['_status'] = 'Interrupted while: %s' % jobstate['_status']
            jobstate['_ended'] = datetime.now()
            jobstate['_state'] = 'done'
        return jobstate

    def _notify_attrs_changes(self):
        if self._manager is not None:
            self._manager.notify_attrs_changes(self)

    @property
    def manager(self):
        return self._manager

    @manager.setter
    def manager(self, value):
        if not self._frozen:
            raise FreezedJobError('manager can be set only when job is frozen')
        else:
            self._frozen = value

    #
    # Attribute properties.
    #

    @property
    def id(self):
        return self._job_id

    @property
    def system(self):
        return self._system

    @property
    def title(self):
        return self._title

    @title.setter
    def title(self, value):
        self._check_freeze()
        self._title = value
        self._notify_attrs_changes()

    @property
    def status(self):
        return self._status

    @status.setter
    def status(self, value):
        self._check_freeze()
        self.logger.info('Status: %s' % value)
        self._status = value
        self._notify_attrs_changes()

    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, value):
        self._check_freeze()
        if value not in self.STATES:
            raise ValueError('Bad state')
        else:
            self.logger.info('State: %s' % value)
            self._state = value
            if value == 'done':
                self._job = None
            self._notify_attrs_changes()

    @property
    def created(self):
        return self._created

    @property
    def ended(self):
        return self._ended

    @property
    def owner(self):
        return self._owner

    @property
    def attachments(self):
        return self._attachments

    def start(self):
        """ Start the job.
        """
        # Configure logger of the job:
        if not self.system:
            handler = logging.StreamHandler(self.attachment('logs'))
            fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
            handler.setFormatter(fmt)
            self.logger.addHandler(handler)

        self._job.start()

    def report(self, status, state=None):
        """ Report the job status.
        """
        self._check_freeze()
        self.status = status
        if state is not None:
            self.state = state
            if state == 'done':
                self._ended = datetime.now()
            self._notify_attrs_changes()

    def attachment(self, name):
        if name in self._attachments:
            raise AlreadyGotAttachmentError('This attachment already exists')
        fattach = self._manager.get_attachment_file(self, name)
        self._attachments.add(name)
        self._notify_attrs_changes()
        return fattach

    def cancel(self):
        """ Cancel the job.
        """
        self._check_freeze()
        if self.state != 'done':
            self.state = 'cancelling'

    def read_attachment(self, name):
        if name not in self._attachments:
            raise KeyError('Unknown attachment')
        return self.manager.read_attachment(self, name)


class Job(Thread, object):

    """ Base class for a job.
    """

    daemon = True  # Set job as a daemon thread

    def __init__(self, logger, state, **settings):
        Thread.__init__(self)

        self.logger = logger
        self.state = state
        self._settings = settings

        # Actions to do by the rollback method:
        self._wayback = []

        # Set the thread name:
        self.name = self.state.id

    def _rollback(self, error):
        """ Rollback the job using the specified error message.
        """
        self.report('Rollback in progress', state='rollbacking')
        try:
            for func in reversed(self._wayback):
                func()
        except Exception as err:
            self.report('Rollback failed: %s (error was: %s)' % err, error, state='done')
        else:
            self.report('Cancelled: %s' % error, state='done')

    #
    # Shortcuts to useful state methods:
    #

    def report(self, *args, **kwargs):
        return self.state.report(*args, **kwargs)

    def attachment(self, *args, **kwargs):
        return self.state.attachment(*args, **kwargs)

    @property
    def title(self):
        return self.state.title

    @title.setter
    def title(self, value):
        self.state.title = value

    #
    # Public methods:
    #

    def run(self, *args, **kwargs):
        """ Run the job.
        """
        self.state.state = 'running'
        try:
            self.job(**self._settings)
        except JobCancelError as err:
            self._rollback(str(err))
            self.logger.info('Job cancelled')
        except Exception as err:
            self.logger.error('Error while executing job: %r(%s)', err, err)
            self._rollback(str(err))
        else:
            self.report('Success', state='done')

    def checkpoint(self, wayback_callback=None):
        """ Check if job is not cancelled, else raise the CancelJobError. Also
            optionally add the provided callback to the wayback list.
        """
        if self.state.state == 'cancelling':
            raise JobCancelError('Job has been cancelled by user')
        if wayback_callback is not None:
            self._wayback.append(wayback_callback)

    def job(self, *args, **kwargs):
        """ Method to override in order to define the job behavior.
        """
        pass
+84 −0
Original line number Diff line number Diff line
""" Jobs manager class.
"""

from cloudcontrol.common.jobs.helpers import AutoIncrementCounter
from cloudcontrol.common.jobs.job import JobState


class JobsManager(object):

    """ Store created jobs.
    """

    def __init__(self, logger, interface, store):
        self.logger = logger
        self._jobs = {}
        self._interface = interface
        self._store = store
        self._counter = AutoIncrementCounter(store)

        # Load jobs from store:
        for job in self._store.iter_jobs():
            job.manager = self
            self._jobs[job.id] = job
            self._interface.on_job_created(job)
            self._interface.on_job_updated(job)

    #
    # Jobs methods
    #

    def notify_attrs_changes(self, job):
        """ Notify a change in attributes of a job to the manager.
        """
        if not job.system:
            self._interface.on_job_updated(job)
            self._store.update_job(job)

    def get_attachment_file(self, job, name):
        """ Get an attachment file object open with write mode for the specified
            job.
        """
        if not job.system:
            return self._store.get_attachment_file(job.id, name)

    def read_attachment(self, job, name):
        """ Get content of an attachment in a string.
        """
        if not job.system:
            return self._store.read_attachment(job.id, name)

    #
    # Public methods
    #

    def spawn(self, job_class, owner, system=False, settings={}):
        """ Spawn a new job.
        """
        job_id = 'job-%s' % self._counter.get()
        job_logger = self.logger.getChild(str(job_id))
        job = JobState(job_logger, job_class, job_id, self, owner,
                       settings=settings, system=system)
        if not system:
            self._store.create_job(job)
            self._interface.on_job_created(job)
        self._jobs[job_id] = job

        job.start()
        self.logger.info('Started new job #%s of class %s', job_id, job_class)
        return job

    def get(self, job_id):
        """ Get a job by its id.
        """
        return self._jobs[job_id]

    def purge(self, job_id):
        """ Purge a done job.
        """
        job = self.get(job_id)
        if job.state != 'done':
            raise Exception('Job is not done')
        self._store.delete_job(job_id)
        del self._jobs[job.id]
        self._interface.on_job_purged(job.id)
Loading