#!/usr/bin/env python #coding=utf8 ''' Jobs management on the server. ''' from __future__ import absolute_import import re import logging import time from datetime import datetime from threading import Thread, Lock from ccserver.exceptions import (BadJobTypeError, UnknownJobError, JobError, UnknownObjectError) from ccserver.utils import AcquiresAllOrNone class JobCancelError(Exception): ''' Exception used by jobs to stop it when a cancel signal is sent. ''' pass class BaseJob(dict, Thread, object): ''' A base class to define a job. The standards job items are: * id: id of the job * status: message explaining the current job status * done: True if the job is done * cancelled: True if job has been cancelled by user * created: job date of creation * ended: job date of end (or None if done = False) * duration: duration in seconds of the job (processed on export) * author: author login of the job :param manager: the :class:`JobsManager` instance. ''' def __init__(self, manager, *args, **kwargs): # Initialize the inherited classes: dict.__init__(self, *args, **kwargs) Thread.__init__(self) # The manager of this job: self.manager = manager # Define default job properties: self['status'] = 'pending' self['done'] = False self['cancelled'] = False self['created'] = datetime.now() self['ended'] = None self['duration'] = 0 #~ assert self.get('author') is not None, 'author is not defined' # List of actions to do by the rollback method: self._wayback = [] # Set the thread name: self.name = 'job-%s' % self['id'] def __hash__(self): return self['id'].__hash__() def __setitem__(self, key, value): if key == 'id': raise KeyError('Key %r in read-only.' % key) else: super(BaseJob, self).__setitem__(key, value) def __delitem__(self, key): if key == 'id': raise KeyError('Key %r in read-only.' % key) else: super(BaseJob, self).__delitem__(key) def report(self, status, done=None): ''' Report the status of the job. :param status: the status to set to the job :param done: is the job done, None to keep current value ''' self['status'] = status if done is not None: self['ended'] = datetime.now() self['done'] = done def run(self): ''' Run the job itself. ''' try: self.job() except JobCancelError as err: self._rollback('%s' % err) except Exception as err: logging.error('Error while executing job: %s, %r', err, err) self._rollback('%s' % err) else: self.report('success', done=True) def job(self): ''' Method to override to define the job's behavior. ''' pass def checkpoint(self, func=None): ''' Check if job is not cancelled, else raise the CancellJobError. Also add the provided function (optionnal) to the wayback list. :param func: callable to add to the wayback list. ''' if self['cancelled']: raise JobCancelError('Job has been cancelled by user') if func is not None: self._wayback.append(func) def _rollback(self, error): ''' Rollback the job. ''' self.report('rollbacking') try: for func in reversed(self._wayback): func() except Exception as err: self.report('rollback failed: %s' % err, done=True) else: self.report('cancelled: %s' % error, done=True) def cancel(self): ''' Cancel the job. .. note:: You can override this method to trigger an action when user cancel the job. ''' if self['done']: raise JobError('Job is done') if self['cancelled']: raise JobError('Job is already cancelled') self['cancelled'] = True self.report('cancelling') def export(self, props=None): ''' Export the job in a simple dict format. ''' exported = {} for key, val in self.iteritems(): if key.startswith('_') or (props is not None and key not in props): continue if isinstance(val, datetime): val = int(time.mktime(val.timetuple())) if key == 'duration': if self['done']: dt = self['ended'] - self['created'] val = dt.seconds + dt.days * 86400 else: now = datetime.now() dt = now - self['created'] val = dt.seconds + dt.days * 86400 exported[key] = str(val) return exported class KillClientJob(BaseJob): ''' A job used to kill connected accounts. Mandatory items: * account: the account login to kill Optional items: * gracetime: time before to kill the user ''' def job(self): gracetime = self.get('gracetime') account = self.get('account') assert account is not None, 'Account not specified' if gracetime is not None: time.sleep(int(gracetime)) self.checkpoint() self.manager.server.kill(account) class KillOldCliJob(BaseJob): ''' Typically an hidden job used to kill clients who are connected/idle since too much time. Mandatory items: * maxcon: maximum connection time in minutes * maxidle: maximum idle time in minutes Optional items: * delay: delay in secondes between two checks (default 1m) ''' DEFAULT_DELAY = 60 def job(self): maxcon = self.get('maxcon') assert maxcon is not None, 'maxcon is None' maxidle = self.get('maxidle') assert maxidle is not None, 'maxidle is None' delay = self.get('delay', self.DEFAULT_DELAY) while True: self.checkpoint() for client in self.manager.server.iter_connected_role('cli'): if client.get_uptime() > (maxcon * 60): self.manager.server.kill(client.login) #TODO: handle idleing. time.sleep(delay) class BaseMigrationJob(BaseJob): ''' Base class for migration jobs. ''' def _check_status(self, vm_id, status): ''' Check the status of the VM. ''' answer = self.manager.server.list('id=%s&status=%s' % (vm_id, status)) return bool(answer) class ColdMigrationJob(BaseMigrationJob): ''' A cold vm migration job. Mandatory items: * vm_name: name of the vm to migrate * hv_source: name of the hv which execute the VM * hv_dest: the destination hypervisor * author: login of the author cli ''' def job(self): vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) self['title'] = 'Cold migration %s --> %s' % (vm_id, self['hv_dest']) logging.info('Job-%s: Started migration for %s', self['id'], vm_id) # Cancel the job if the user has not the right to migrate the vm or to # select an hypervisor as destination: right_check = self.manager.server.check tql = 'id=%s' % vm_id if not right_check(self['author'], 'coldmigrate', tql): raise JobCancelError('author have no rights to migrate this VM') tql = 'id=%s' % self['hv_dest'] if not right_check(self['author'], 'coldmigrate_dest', tql): raise JobCancelError('author have no right to migrate to this hv') # Update the VM object: try: self.manager.server.objects.update(ids=(vm_id,)) vm = self.manager.server.objects.get_by_id(vm_id) except UnknownObjectError: raise JobCancelError('Source VM not found') # Get the source and destination hv clients: try: source = self.manager.server.get_connection(self['hv_source']) except KeyError: raise JobCancelError('source hypervisor is not connected') try: dest = self.manager.server.get_connection(self['hv_dest']) except KeyError: raise JobCancelError('destination hypervisor is not connected') self.checkpoint() self.report('waiting lock for source and dest hypervisors') logging.info('Job-%s: Trying to acquire locks', self['id']) with AcquiresAllOrNone(source.lock, dest.lock): logging.info('Job-%s: Locks acquired', self['id']) self.checkpoint() if not self._check_status(vm_id, 'stopped'): raise JobCancelError('vm is not stopped') # Create storages on destination: self.report('create volumes') for disk in vm.get('disk', '').split(): # Getting informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) size = vm.get('disk%s_size' % disk) assert pool is not None, 'pool tag doesn\'t exists' assert name is not None, 'name tag doesn\'t exists' assert size is not None, 'size tag doesn\'t exists' # Create the volume on destination: dest.proxy.vol_create(pool, name, int(size)) logging.info('Job-%s: Created volume %s/%s on destination ' 'hypervisor', self['id'], pool, name) # Rollback stuff for this action: def rb_volcreate(): dest.proxy.vol_delete(pool, name) self.checkpoint(rb_volcreate) # Define VM: self.report('define vm') logging.info('Job-%s: XML configuration transfert', self['id']) vm_config = source.proxy.vm_export(self['vm_name']) dest.proxy.vm_define(vm_config) # Rollback stuff for vm definition: def rb_define(): dest.proxy.vm_undefine(self['vm_name']) self.checkpoint(rb_define) # Copy all source disk on destination disk: for disk in vm.get('disk', '').split(): self._copy_disk(source, dest, vm, disk) # At this point, if operation is a success, all we need is just to # cleanup source hypervisor from disk and vm. This operation *CAN'T* # be cancelled or rollbacked if anything fails (unlikely). The # migration must be considered as a success, and the only way to # undo this is to start a new migration in the other way. # Delete the rollback list. # This is mandatory to avoid data loss if the cleanup # code below fail. self._wayback = [] # Cleanup the disks: for disk in vm.get('disk', '').split(): pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) source.proxy.vol_delete(pool, name) # Cleanup the VM: source.proxy.vm_undefine(self['vm_name']) logging.info('Job-%s: Migration completed with success', self['id']) def _copy_disk(self, source, dest, vm, disk): ''' Copy the specified disk name of the vm from source to dest. ''' # Get informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) logging.info('Job-%s: Started copy for %s/%s', self['id'], pool, name) self.report('copy %s/%s' % (pool, name)) # Make the copy and wait for it end: xferprop = dest.proxy.vol_import(pool, name) # Register the cancel function: def cancel_xfer(): dest.proxy.vol_import_cancel(xferprop['id']) self['func_cancel_xfer'] = cancel_xfer # Wait for the end of transfert: cids = set() cids.add(source.connection.async_call('vol_export', pool, name, dest.get_ip(), xferprop['port'])) cids.add(dest.connection.async_call('vol_import_wait', xferprop['id'])) msgs = self.manager.server.manager.wait(frozenset(cids)) del self['func_cancel_xfer'] # Compare checksum of two answers: checksums = [] assert len(msgs) == 2 for msg in msgs: if msg.get('error') is not None: msg = 'error while copy: %s' % msg['error']['message'] raise JobCancelError(msg) else: checksums.append(msg.get('checksum')) self.checkpoint() if checksums[0] != checksums[1]: raise JobCancelError('checksum mismatches') def cancel(self): if self.get('func_cancel_xfer') is not None: self.get('func_cancel_xfer')() super(ColdMigrationJob, self).cancel() class HotMigrationJob(BaseMigrationJob): ''' A hot vm migration job. Mandatory items: * vm_name: name of the vm to migrate * hv_source: name of the hv which execute the VM * hv_dest: the destination hypervisor * author: login of the author cli ''' def job(self): vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) self['title'] = 'Hot migration %s --> %s' % (vm_id, self['hv_dest']) logging.info('Job-%s: Started hot migration for %s', self['id'], vm_id) # Cancel the job if the user has not the right to migrate the vm or to # select an hypervisor as destination: right_check = self.manager.server.check tql = 'id=%s' % vm_id if not right_check(self['author'], 'hotmigrate', tql): raise JobCancelError('author have no rights to migrate this VM') tql = 'id=%s' % self['hv_dest'] if not right_check(self['author'], 'hotmigrate_dest', tql): raise JobCancelError('author have no right to migrate to this hv') # Update the VM object: try: self.manager.server.objects.update(ids=(vm_id,)) vm = self.manager.server.objects.get_by_id(vm_id) except UnknownObjectError: raise JobCancelError('Source VM not found') # Get the source and destination hv clients: try: source = self.manager.server.get_connection(self['hv_source']) except KeyError: raise JobCancelError('source hypervisor is not connected') try: dest = self.manager.server.get_connection(self['hv_dest']) except KeyError: raise JobCancelError('destination hypervisor is not connected') self.checkpoint() self.report('waiting lock for source and dest hypervisors') logging.info('Job-%s: Trying to acquire locks', self['id']) with AcquiresAllOrNone(source.lock, dest.lock): logging.info('Job-%s: Locks acquired', self['id']) self.checkpoint() if not self._check_status(vm_id, 'running'): raise JobCancelError('vm is not started') # Create storages on destination and start synchronization: disks = vm.get('disk', '').split() for disk in disks: # Getting informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) size = vm.get('disk%s_size' % disk) assert pool is not None, 'pool tag doesn\'t exists' assert name is not None, 'name tag doesn\'t exists' assert size is not None, 'size tag doesn\'t exists' self.report('sync volume %s/%s (creation)' % (pool, name)) # Create the volume on destination: dest.proxy.vol_create(pool, name, int(size)) logging.info('Job-%s: Created volume %s/%s on destination ' 'hypervisor', self['id'], pool, name) # Rollback stuff for this action: def rb_volcreate(): dest.proxy.vol_delete(pool, name) self.checkpoint(rb_volcreate) # Setup the drbd synchronization with each hypervisors: self.report('sync volume %s/%s (setup)' % (pool, name)) to_cleanup = [] res_src = source.proxy.drbd_setup(pool, name) def rb_setupsrc(): source.proxy.drbd_shutdown(res_src) self.checkpoint(rb_setupsrc) to_cleanup.append(rb_setupsrc) res_dst = dest.proxy.drbd_setup(pool, name) def rb_setupdst(): dest.proxy.drbd_shutdown(res_dst) self.checkpoint(rb_setupdst) to_cleanup.append(rb_setupdst) # Start connection of drbd: self.report('sync volume %s/%s (connect)' % (pool, name)) source.proxy.drbd_connect(res_src, res_dst, dest.get_ip()) dest.proxy.drbd_connect(res_dst, res_src, source.get_ip()) # Setup topology as Primary/Secondary: source.proxy.drbd_role(res_src, True) dest.proxy.drbd_role(res_dst, False) # Wait for the end of the disk synchronization: self.report('sync volume %s/%s (sync)' % (pool, name)) cids = set() cids.add(source.connection.async_call('drbd_sync', res_src)) cids.add(dest.connection.async_call('drbd_sync', res_dst)) msgs = self.manager.server.manager.wait(frozenset(cids)) #TODO: check error source.proxy.drbd_takeover(res_src, True) def rb_takeover_src(): source.proxy.drbd_takeover(res_src, False) self.checkpoint(rb_takeover_src) to_cleanup.append(rb_takeover_src) dest.proxy.drbd_takeover(res_dst, True) def rb_takeover_dst(): dest.proxy.drbd_takeover(res_dst, False) self.checkpoint(rb_takeover_dst) to_cleanup.append(rb_takeover_dst) #... tunres_src = source.proxy.tun_setup() def rb_tun_src(): source.proxy.tun_destroy(tunres_src) self.checkpoint(rb_tun_src) tunres_dst = dest.proxy.tun_setup(local=False) def rb_tun_dst(): dest.proxy.tun_destroy(tunres_dst) self.checkpoint(rb_tun_dst) source.proxy.tun_connect(tunres_src, tunres_dst, dest.get_ip()) dest.proxy.tun_connect_hv(tunres_dst, migration=True) # Initiate the live migration: self.report('migration in progress') source.proxy.vm_migrate_tunneled(self['vm_name'], tunres_src) # At this point, if operation is a success, all we need is just to # cleanup source hypervisor from disk and vm. This operation *CAN'T* # be cancelled or rollbacked if anything fails (unlikely). The # migration must be considered as a success, and the only way to # undo this is to start a new migration in the other way. # Delete the rollback list. # This is mandatory to avoid data loss if the cleanup # code below fail. self.report('cleanup') self._wayback = [] source.proxy.tun_destroy(tunres_src) dest.proxy.tun_destroy(tunres_dst) for cb_cleanup in reversed(to_cleanup): cb_cleanup() # Cleanup the disks: for disk in disks: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) source.proxy.vol_delete(pool, name) logging.info('Job-%s: Migration completed with success', self['id']) class CloneJob(BaseMigrationJob): ''' A clone job. Mandatory items: * vm_name: name of the vm to migrate * new_vm_name: the name of the cloned vm * hv_source: name of the hv which execute the VM * hv_dest: the destination hypervisor * author: login of the author cli ''' def job(self): vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) self['title'] = 'Clone %s --> %s' % (vm_id, self['hv_dest']) logging.info('Job-%s: Started clone for %s', self['id'], vm_id) # Cancel the job if the user has not the right to clone the vm or to # select an hypervisor as destination: right_check = self.manager.server.check tql = 'id=%s' % vm_id if not right_check(self['author'], 'clone', tql): raise JobCancelError('author have no rights to migrate this VM') tql = 'id=%s' % self['hv_dest'] if not right_check(self['author'], 'clone_dest', tql): raise JobCancelError('author have no right to migrate to this hv') # Update the VM object: try: self.manager.server.objects.update(ids=(vm_id,)) vm = self.manager.server.objects.get_by_id(vm_id) except UnknownObjectError: raise JobCancelError('Cloned VM not found') # Get the source and destination hv clients: try: source = self.manager.server.get_connection(self['hv_source']) except KeyError: raise JobCancelError('source hypervisor is not connected') try: dest = self.manager.server.get_connection(self['hv_dest']) except KeyError: raise JobCancelError('destination hypervisor is not connected') self.checkpoint() self.report('waiting lock for source and dest hypervisors') logging.info('Job-%s: Trying to acquire locks', self['id']) with AcquiresAllOrNone(source.lock, dest.lock): logging.info('Job-%s: Locks acquired', self['id']) self.checkpoint() # Create storages on destination: old_new_disk_mapping = {} # Mapping between old and new disk names self.report('create volumes') for disk in vm.get('disk', '').split(): # Getting informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) size = vm.get('disk%s_size' % disk) assert pool is not None, 'pool tag doesn\'t exists' assert name is not None, 'name tag doesn\'t exists' assert size is not None, 'size tag doesn\'t exists' # Change the name of the disk: old_name = name if name.startswith(self['vm_name']): suffix = name[len(self['vm_name']):] name = self['new_vm_name'] + suffix else: name = '%s_%s' % (self['new_vm_name'], name) fulloldname = '/dev/%s/%s' % (pool, old_name) fullnewname = '/dev/%s/%s' % (pool, name) old_new_disk_mapping[fulloldname] = fullnewname # Create the volume on destination: dest.proxy.vol_create(pool, name, int(size)) logging.info('Job-%s: Created volume %s/%s on destination ' 'hypervisor (was %s)', self['id'], pool, name, old_name) # Rollback stuff for this action: def rb_volcreate(): dest.proxy.vol_delete(pool, name) self.checkpoint(rb_volcreate) # Define VM: self.report('define vm') logging.info('Job-%s: XML configuration transfert', self['id']) vm_config = source.proxy.vm_export(self['vm_name']) # Change vm configuration XML to update it with new name: new_vm_config = self._update_xml(vm_config, self['vm_name'], self['new_vm_name'], old_new_disk_mapping) dest.proxy.vm_define(new_vm_config) # Rollback stuff for vm definition: def rb_define(): dest.proxy.vm_undefine(name) self.checkpoint(rb_define) # Copy all source disk on destination disk: for disk in vm.get('disk', '').split(): self._copy_disk(source, dest, vm, disk, name) logging.info('Job-%s: Clonage completed with success', self['id']) def _update_xml(self, vm_config, old_name, name, old_new_name_mapping): ''' Update the XML definition of the VM with the new vm name, the new vm disk, and remove the uuid tag. ''' vm_config = vm_config.replace('%s' % old_name, '%s' % name) vm_config = re.sub('(.*?\n)', '', vm_config) for old, new in old_new_name_mapping.iteritems(): vm_config = vm_config.replace("='%s'" % old, "='%s'" % new) return vm_config def _copy_disk(self, source, dest, vm, disk, new_disk): ''' Copy the specified disk name of the vm from source to dest. ''' # Get informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) logging.info('Job-%s: Started copy for %s/%s to %s/%s', self['id'], pool, name, pool, new_disk) self.report('copy %s/%s' % (pool, name)) # Make the copy and wait for it end: xferprop = dest.proxy.vol_import(pool, new_disk) # Register the cancel function: def cancel_xfer(): dest.proxy.vol_import_cancel(xferprop['id']) self['func_cancel_xfer'] = cancel_xfer # Wait for the end of transfert: cids = set() cids.add(source.connection.async_call('vol_export', pool, name, dest.get_ip(), xferprop['port'])) cids.add(dest.connection.async_call('vol_import_wait', xferprop['id'])) msgs = self.manager.server.manager.wait(frozenset(cids)) del self['func_cancel_xfer'] # Compare checksum of two answers: checksums = [] assert len(msgs) == 2 for msg in msgs: if msg.get('error') is not None: msg = 'error while copy: %s' % msg['error']['message'] raise JobCancelError(msg) else: checksums.append(msg.get('checksum')) self.checkpoint() if checksums[0] != checksums[1]: raise JobCancelError('checksum mismatches') def cancel(self): if self.get('func_cancel_xfer') is not None: self.get('func_cancel_xfer')() super(CloneJob, self).cancel() class JobsManager(object): ''' Manage the current job list. :param server: The :class:`CCServer` instance. ''' JOBS_TYPES = { 'kill': KillClientJob, 'kill_oldcli': KillOldCliJob, 'cold_migrate': ColdMigrationJob, 'hot_migrate': HotMigrationJob, 'clone': CloneJob, } def __init__(self, server): # The main job dict, the keys are id of jobs. self._jobs = {} # The server: self.server = server # The id of the next job and it's lock: self._current_id = 1 self._current_id_lock = Lock() def create(self, jtype, **kwargs): ''' Create a new job. :param jtype: the type of the new job :param \*\*kwargs: arguments to pass to the job :raise BadJobTypeError: when invalid jtype is passed ''' jobid = self.get_id() jobtype = JobsManager.JOBS_TYPES.get(jtype) if jobtype is None: raise BadJobTypeError('Invalid job type %r' % jtype) job = jobtype(self, id=jobid, **kwargs) self._jobs[jobid] = job job.daemon = True job.start() return job def get_id(self): ''' Get the current id and increment the counter. ''' with self._current_id_lock: jobid = self._current_id self._current_id += 1 return jobid def cancel(self, jobid): ''' Cancel the provided job. ''' job = self._jobs.get(jobid) if job is None: raise UnknownJobError('Invalid job id: %r' % jobid) elif job.get('_hidden', False): raise UnknownJobError('Invalid job id: %r (hidden)' % jobid) else: job.cancel() def purge(self): ''' Purge all done jobs. ''' for job in self._jobs.values(): if job['done']: del self._jobs[job['id']] def iterjobs(self, show_done=True, show_running=True): ''' Iter over jobs. :param done: If set, iterate over done or not done jobs. ''' for job in self._jobs.itervalues(): if (show_done and job['done'] or show_running and not job['done'] and not job.get('_hidden')): yield job