Commit 3da7d7f3 authored by Antoine Millet's avatar Antoine Millet
Browse files

Integrated new jobs manager into server

parent 32ecb1b5
Loading
Loading
Loading
Loading
+81 −30
Original line number Diff line number Diff line
@@ -9,10 +9,12 @@ from cloudcontrol.server.election import Elector

from cloudcontrol.server.handlers import listed, Reporter
from cloudcontrol.server.clients import Client, RegisteredCCHandler
from cloudcontrol.server.jobs import (ColdMigrationJob, HotMigrationJob,
                                      CloneJob)
from cloudcontrol.common.tql.db.tag import StaticTag

MIGRATION_TYPES = {'cold': 'cold_migrate',
                   'hot': 'hot_migrate',}
MIGRATION_TYPES = {'cold': ColdMigrationJob,
                   'hot': HotMigrationJob,}


class CliHandler(RegisteredCCHandler):
@@ -47,7 +49,8 @@ class CliHandler(RegisteredCCHandler):
       CliHandler.shutdown
       CliHandler.jobs
       CliHandler.cancel
       CliHandler.jobspurge
       CliHandler.purge
       CliHandler.attachment
       CliHandler.console
       CliHandler.rshell
       CliHandler.rshell_resize
@@ -569,38 +572,83 @@ class CliHandler(RegisteredCCHandler):
        return errs.get_dict()

    @listed
    def jobs(self, query, show_done=True, show_running=True):
        """ Return all jobs.
    def cancel(self, query):
        """ Cancel a job.

        :param show_done: show done jobs
        :param show_running: show running jobs
        :param query: the tql query used to select jobs to cancel
        """

        if query:
            raise NotImplementedError('Tql in jobs is not yet supported.')
        props = ('id', 'author', 'created', 'ended',
                 'duration', 'done', 'title', 'status')
        jobs = []
        for job in self.server.jobs.iterjobs(show_done, show_running):

            jobs.append(job.export(props))
        self.check('cancel', query)
        objects = self.server.list(query, show=('r', 'p'))
        errs = Reporter()
        for obj in objects:
            if obj['r'] != 'job':
                errs.error(obj['id'], 'not a job')
            elif 'p' in obj:
                errs.error(obj['id'], 'cancel on remote jobs not implemented')
            else:
                try:
                    self.server.jobs.get(obj['id']).cancel()
                except KeyError:
                    errs.error(obj['id'], 'unknown job')
                else:
                    errs.success(obj['id'], 'job cancelled')

        return {'objects': jobs, 'order': props}
        return errs.get_dict()

    @listed
    def cancel(self, jobid):
        """ Cancel a job.
    def purge(self, query):
        """ Purge matching jobs from server.

        :param jobid: the id of the job to cancel.
        :param query: the tql query used to select jobs to purge

        .. note::
           Purge only work for job with state done.
        """
        self.check('purge', query)
        objects = self.server.list(query, show=('r', 'p', 'state'))
        errs = Reporter()
        for obj in objects:
            if obj['r'] != 'job':
                errs.error(obj['id'], 'not a job')
            elif obj['state'] != 'done':
                errs.error(obj['id'], 'job must be done')
            elif 'p' in obj:
                errs.error(obj['id'], 'purge on remote jobs not implemented')
            else:
                try:
                    self.server.jobs.purge(obj['id'])
                except KeyError:
                    raise
                    errs.warn(obj['id'], 'job already purged')
                else:
                    errs.success(obj['id'], 'job purged')

        self.server.jobs.cancel(jobid)
        return errs.get_dict()

    @listed
    def jobspurge(self):
        """ Purge all done jobs from the job list.
    def attachment(self, query, name):
        """ Get the specified attachment for jobs matching query.

        :param query: the tql query used to select jobs
        """
        self.server.jobs.purge()
        self.check('attachment', query)
        objects = self.server.list(query, show=('r', 'p'))
        errs = Reporter()
        for obj in objects:
            if obj['r'] != 'job':
                errs.error(obj['id'], 'not a job')
            elif 'p' in obj:
                errs.error(obj['id'], 'purge on remote jobs not implemented')
            else:
                try:
                    output = self.server.jobs.get(obj['id']).read_attachment(name)
                except KeyError:
                    errs.error(obj['id'], 'unknown attachment')
                else:
                    errs.success(obj['id'], 'job purged', output=output)

        return errs.get_dict()

    @listed
    def electiontypes(self):
@@ -641,14 +689,14 @@ class CliHandler(RegisteredCCHandler):

            # Construct the migration properties:
            migration_properties = {
                'author': self.client.login,
                'server': self.server,
                'vm_name': vm['h'],
                'hv_source': vm['p'],
                'hv_dest': migration['did']
            }

            # Create the job:
            self.server.jobs.create(mtype, **migration_properties)
            self.client.spawn_job(mtype, settings=migration_properties)
            errs.success(migration['sid'], 'migration launched')

        return errs.get_dict()
@@ -680,11 +728,11 @@ class CliHandler(RegisteredCCHandler):
        else:
            dest = dest[0]

        self.server.jobs.create('clone', **{'vm_name': vm['h'],
        self.client.spawn_job(CloneJob, settings={'server': self.server,
                                                  'vm_name': vm['h'],
                                                  'new_vm_name': name,
                                                  'hv_source': vm['p'],
                                           'hv_dest': dest['id'],
                                           'author': self.client.login})
                                                  'hv_dest': dest['id']})

    @listed
    def console(self, tql):
@@ -815,6 +863,9 @@ class CliClient(Client):
        super(CliClient, self).__init__(*args, **kwargs)
        self._tunnels = {}  # Running tunnels for this client (as client)

    def spawn_job(self, job_class, **kwargs):
        self._server.jobs.spawn(job_class, self.login, **kwargs)

    def register_tunnel(self, ttype, client, tun, label=None):
        """ Create and register a tunnel for this client.

+6 −93
Original line number Diff line number Diff line
from threading import Lock
""" Server jobs.
"""

from cloudcontrol.server.jobs.hotmigration import HotMigrationJob
from cloudcontrol.server.jobs.coldmigration import ColdMigrationJob
from cloudcontrol.server.jobs.hotmigration import HotMigrationJob
from cloudcontrol.server.jobs.clone import CloneJob
from cloudcontrol.server.jobs.killclient import KillClientJob
from cloudcontrol.server.jobs.killoldcli import KillOldCliJob
from cloudcontrol.server.jobs.killclient import KillClientJob

from cloudcontrol.server.exceptions import (BadJobTypeError, UnknownJobError)


class JobsManager(object):

    """ Manage the current job list.

    :param server: The :class:`CCServer` instance.
    """

    JOBS_TYPES = {
        'kill': KillClientJob,
        'kill_oldcli': KillOldCliJob,
        'cold_migrate': ColdMigrationJob,
        'hot_migrate': HotMigrationJob,
        'clone': CloneJob,
    }

    def __init__(self, logger, server):
        self.logger = logger
        # The main job dict, the keys are id of jobs.
        self._jobs = {}

        # The server:
        self.server = server

        # The id of the next job and it's lock:
        self._current_id = 1
        self._current_id_lock = Lock()

    def create(self, jtype, **kwargs):
        """ Create a new job.

        :param jtype: the type of the new job
        :param \*\*kwargs: arguments to pass to the job
        :raise BadJobTypeError: when invalid jtype is passed
        """

        jobid = self.get_id()
        jobtype = JobsManager.JOBS_TYPES.get(jtype)

        if jobtype is None:
            raise BadJobTypeError('Invalid job type %r' % jtype)

        job = jobtype(self.logger.getChild(str(jobid)), self, id=jobid, **kwargs)
        self._jobs[jobid] = job
        job.daemon = True
        job.start()
        return job

    def get_id(self):
        """ Get the current id and increment the counter.
        """

        with self._current_id_lock:
            jobid = self._current_id
            self._current_id += 1

        return jobid

    def cancel(self, jobid):
        """ Cancel the provided job.
        """

        job = self._jobs.get(jobid)

        if job is None:
            raise UnknownJobError('Invalid job id: %r' % jobid)
        elif job.get('_hidden', False):
            raise UnknownJobError('Invalid job id: %r (hidden)' % jobid)
        else:
            job.cancel()

    def purge(self):
        """ Purge all done jobs.
        """

        for job in self._jobs.values():
            if job['done']:
                del self._jobs[job['id']]

    def iterjobs(self, show_done=True, show_running=True):
        """ Iter over jobs.

        :param done: If set, iterate over done or not done jobs.
        """

        for job in self._jobs.itervalues():
            if (show_done and job['done'] or show_running and not job['done']
                and not job.get('_hidden')):
                yield job
__all__ = ('ColdMigrationJob', 'HotMigrationJob', 'CloneJob', 'KillOldCliJob',
           'KillClientJob')
 No newline at end of file

cloudcontrol/server/jobs/base.py

deleted100644 → 0
+0 −167
Original line number Diff line number Diff line
import time
from threading import Thread
from datetime import datetime

from cloudcontrol.server.exceptions import JobError


class JobCancelError(Exception):
    """ Exception used by jobs to stop it when a cancel signal is sent.
    """
    pass


class BaseJob(dict, Thread, object):

    """ A base class to define a job.

    The standards job items are:

     * id: id of the job
     * status: message explaining the current job status
     * done: True if the job is done
     * cancelled: True if job has been cancelled by user
     * created: job date of creation
     * ended: job date of end (or None if done = False)
     * duration: duration in seconds of the job (processed on export)
     * author: author login of the job

    :param manager: the :class:`JobsManager` instance.
    """

    def __init__(self, logger, manager, *args, **kwargs):
        # Initialize the inherited classes:
        dict.__init__(self, *args, **kwargs)
        Thread.__init__(self)

        self.logger = logger
        # The manager of this job:
        self.manager = manager

        # Define default job properties:
        self['status'] = 'pending'
        self['done'] = False
        self['cancelled'] = False
        self['created'] = datetime.now()
        self['ended'] = None
        self['duration'] = 0

        #~ assert self.get('author') is not None, 'author is not defined'

        # List of actions to do by the rollback method:
        self._wayback = []

        # Set the thread name:
        self.name = 'job-%s' % self['id']

    def __hash__(self):
        return self['id'].__hash__()

    def __setitem__(self, key, value):
        if key == 'id':
            raise KeyError('Key %r in read-only.' % key)
        else:
            super(BaseJob, self).__setitem__(key, value)

    def __delitem__(self, key):
        if key == 'id':
            raise KeyError('Key %r in read-only.' % key)
        else:
            super(BaseJob, self).__delitem__(key)

    def report(self, status, done=None):
        """ Report the status of the job.

        :param status: the status to set to the job
        :param done: is the job done, None to keep current value
        """

        self['status'] = status
        if done is not None:
            self['ended'] = datetime.now()
            self['done'] = done

    def run(self):
        """ Run the job itself.
        """

        try:
            self.job()
        except JobCancelError as err:
            self._rollback('%s' % err)
        except Exception as err:
            self.logger.error('Error while executing job: %s, %r', err, err)
            self._rollback('%s' % err)
        else:
            self.report('success', done=True)

    def job(self):
        """ Method to override to define the job's behavior.
        """

        pass

    def checkpoint(self, func=None):
        """ Check if job is not cancelled, else raise the CancellJobError. Also
            add the provided function (optionnal) to the wayback list.

        :param func: callable to add to the wayback list.
        """

        if self['cancelled']:
            raise JobCancelError('Job has been cancelled by user')

        if func is not None:
            self._wayback.append(func)

    def _rollback(self, error):
        """ Rollback the job.
        """

        self.report('rollbacking')
        try:
            for func in reversed(self._wayback):
                func()
        except Exception as err:
            self.report('rollback failed: %s' % err, done=True)
        else:
            self.report('cancelled: %s' % error, done=True)

    def cancel(self):
        """ Cancel the job.

        .. note::
            You can override this method to trigger an action when user cancel
            the job.
        """

        if self['done']:
            raise JobError('Job is done')
        if self['cancelled']:
            raise JobError('Job is already cancelled')

        self['cancelled'] = True
        self.report('cancelling')

    def export(self, props=None):
        """ Export the job in a simple dict format.
        """

        exported = {}
        for key, val in self.iteritems():
            if key.startswith('_') or (props is not None and key not in props):
                continue
            if isinstance(val, datetime):
                val = int(time.mktime(val.timetuple()))

            if key == 'duration':
                if self['done']:
                    dt = self['ended'] - self['created']
                    val = dt.seconds + dt.days * 86400
                else:
                    now = datetime.now()
                    dt = now - self['created']
                    val = dt.seconds + dt.days * 86400

            exported[key] = str(val)
        return exported
+34 −34
Original line number Diff line number Diff line
@@ -2,11 +2,12 @@ import re

from sjrpc.core import AsyncWatcher

from cloudcontrol.server.jobs.base import BaseJob, JobCancelError
from cloudcontrol.common.jobs import Job, JobCancelError

from cloudcontrol.server.utils import AcquiresAllOrNone


class CloneJob(BaseJob):
class CloneJob(Job):

    """ A clone job.

@@ -18,47 +19,49 @@ class CloneJob(BaseJob):
     * author: login of the author cli
    """

    def job(self):
        vm_id = '%s.%s' % (self['hv_source'], self['vm_name'])
    def job(self, server, hv_source, vm_name, hv_dest, new_vm_name):
        self._func_cancel_xfer = None  # Callback to a function used to cancel
                                       # a disk transfert
        vm_id = '%s.%s' % (hv_source, vm_name)

        self['title'] = 'Clone %s --> %s' % (vm_id, self['hv_dest'])
        self.logger.info('Job-%s: Started clone for %s', self['id'], vm_id)
        self.title = 'Clone %s --> %s' % (vm_id, hv_dest)
        self.logger.info('Started clone for %s', vm_id)

        # Cancel the job if the user has not the right to clone the vm or to
        # select an hypervisor as destination:
        right_check = self.manager.server.check
        right_check = server.check

        tql = 'id=%s' % vm_id
        if not right_check(self['author'], 'clone', tql):
        if not right_check(self.state.owner, 'clone', tql):
            raise JobCancelError('author have no rights to migrate this VM')

        tql = 'id=%s' % self['hv_dest']
        if not right_check(self['author'], 'clone_dest', tql):
        tql = 'id=%s' % hv_dest
        if not right_check(self.state.owner, 'clone_dest', tql):
            raise JobCancelError('author have no right to migrate to this hv')

        # Update the VM object:
        vm = self.manager.server.db.get_by_id(vm_id)
        vm = server.db.get_by_id(vm_id)
        if vm is None:
            raise JobCancelError('Source VM not found')

        # Get the source and destination hv clients:
        try:
            source = self.manager.server.get_client(self['hv_source'])
            source = server.get_client(hv_source)
        except KeyError:
            raise JobCancelError('source hypervisor is not connected')

        try:
            dest = self.manager.server.get_client(self['hv_dest'])
            dest = server.get_client(hv_dest)
        except KeyError:
            raise JobCancelError('destination hypervisor is not connected')

        self.checkpoint()

        self.report('waiting lock for source and dest hypervisors')
        self.logger.info('Job-%s: Trying to acquire locks', self['id'])
        self.logger.info('Trying to acquire locks')

        with AcquiresAllOrNone(source.hvlock, dest.hvlock):
            self.logger.info('Job-%s: Locks acquired', self['id'])
            self.logger.info('Locks acquired')
            self.checkpoint()

            before_clone_autostart = vm['autostart'].lower() == 'yes'
@@ -78,11 +81,11 @@ class CloneJob(BaseJob):

                # Change the name of the disk:
                old_name = name
                if name.startswith(self['vm_name']):
                    suffix = name[len(self['vm_name']):]
                    name = self['new_vm_name'] + suffix
                if name.startswith(vm_name):
                    suffix = name[len(vm_name):]
                    name = new_vm_name + suffix
                else:
                    name = '%s_%s' % (self['new_vm_name'], name)
                    name = '%s_%s' % (new_vm_name, name)

                names[disk] = name

@@ -92,9 +95,8 @@ class CloneJob(BaseJob):

                # Create the volume on destination:
                dest.proxy.vol_create(pool, name, int(size))
                self.logger.info('Job-%s: Created volume %s/%s on destination '
                             'hypervisor (was %s)', self['id'], pool, name,
                             old_name)
                self.logger.info('Created volume %s/%s on destination '
                                 'hypervisor (was %s)', pool, name, old_name)

                # Rollback stuff for this action:
                def rb_volcreate():
@@ -103,12 +105,12 @@ class CloneJob(BaseJob):

            # Define VM:
            self.report('define vm')
            self.logger.info('Job-%s: XML configuration transfert', self['id'])
            vm_config = source.proxy.vm_export(self['vm_name'])
            self.logger.info('XML configuration transfert')
            vm_config = source.proxy.vm_export(vm_name)

            # Change vm configuration XML to update it with new name:
            new_vm_config = self._update_xml(vm_config, self['vm_name'],
                                             self['new_vm_name'],
            new_vm_config = self._update_xml(vm_config, vm_name,
                                             new_vm_name,
                                             old_new_disk_mapping)

            dest.proxy.vm_define(new_vm_config)
@@ -123,10 +125,10 @@ class CloneJob(BaseJob):
                self._copy_disk(source, dest, vm, disk, name)

            # Setup autostart as it was before the migration
            dest.proxy.vm_set_autostart(self['new_vm_name'],
            dest.proxy.vm_set_autostart(new_vm_name,
                                        before_clone_autostart)

            self.logger.info('Job-%s: Clonage completed with success', self['id'])
            self.logger.info('Cloning completed with success')


    def _update_xml(self, vm_config, old_name, name, old_new_name_mapping):
@@ -151,8 +153,6 @@ class CloneJob(BaseJob):
        # Get informations about the disk:
        pool = vm.get('disk%s_pool' % disk)
        name = vm.get('disk%s_vol' % disk)
        self.logger.info('Job-%s: Started copy for %s/%s to %s/%s',
                         self['id'], pool, name, pool, new_disk)
        self.report('copy %s/%s' % (pool, name))

        # Make the copy and wait for it end:
@@ -161,7 +161,7 @@ class CloneJob(BaseJob):
        # Register the cancel function:
        def cancel_xfer():
            dest.proxy.vol_import_cancel(xferprop['id'])
        self['func_cancel_xfer'] = cancel_xfer
        self._func_cancel_xfer = cancel_xfer

        # Wait for the end of transfert:
        watcher = AsyncWatcher()
@@ -169,7 +169,7 @@ class CloneJob(BaseJob):
        watcher.register(dest.conn, 'vol_import_wait', xferprop['id'])

        msgs = watcher.wait()
        del self['func_cancel_xfer']
        self._func_cancel_xfer = None

        # Compare checksum of two answers:
        checksums = []
@@ -187,6 +187,6 @@ class CloneJob(BaseJob):
            raise JobCancelError('checksum mismatches')

    def cancel(self):
        if self.get('func_cancel_xfer') is not None:
            self.get('func_cancel_xfer')()
        if self._func_cancel_xfer is not None:
            self._func_cancel_xfer()
        super(CloneJob, self).cancel()
+30 −28

File changed.

Preview size limit exceeded, changes collapsed.

Loading