diff --git a/ccserver/handlers.py b/ccserver/handlers.py index c7019d8558564cd4fbe8ae3ed9b05d15a93b8b54..594d70d2ff603df69fa3a58702758f9217baa1c3 100644 --- a/ccserver/handlers.py +++ b/ccserver/handlers.py @@ -11,7 +11,8 @@ from ccserver.orderedset import OrderedSet from ccserver.conf import CCConf from ccserver.exceptions import (AlreadyRegistered, AuthenticationError, RightError, ReservedTagError, BadObjectError, - BadRoleError, NotConnectedAccountError) + BadRoleError, NotConnectedAccountError, + CloneError) from ccserver.election import Elector from ccserver import __version__ @@ -768,6 +769,42 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() + @listed + def clone(self, conn, tql_vm, tql_dest, name): + ''' + Create and launch a clone job. + + :param tql_vm: a tql matching one vm object (the cloned vm) + :param tql_dest: a tql matching one hypervisor object (the destination + hypervisor) + :param name: the new name of the VM + ''' + + client = self._server.search_client_by_connection(conn) + + vm = self._server.list(tql_vm, show=('r', 'h', 'p')) + + if len(vm) != 1: + raise CloneError('VM Tql must select ONE vm') + elif vm[0]['r'] != 'vm': + raise CloneError('Destination Tql must select a vm') + else: + vm = vm[0] + + dest = self._server.list(tql_dest, show=('r',)) + if len(dest) != 1: + raise CloneError('Destination Tql must select ONE hypervisor') + elif dest[0]['r'] != 'hv': + raise CloneError('Destination Tql must select an hypervisor') + else: + dest = dest[0] + + self._server.jobs.create('clone', **{'vm_name': vm['h'], + 'new_vm_name': name, + 'hv_source': vm['p'], + 'hv_dest': dest['id'], + 'author': client.login}) + @listed def dbstats(self, conn): ''' diff --git a/ccserver/jobs.py b/ccserver/jobs.py index f960a7a89b710734a3b96ee88d8c4b07858ac262..15c8884acdfe91c5a2250bd17aa0c8b9ea3736ee 100644 --- a/ccserver/jobs.py +++ b/ccserver/jobs.py @@ -7,6 +7,7 @@ Jobs management on the server. from __future__ import absolute_import +import re import logging import time from datetime import datetime @@ -424,6 +425,190 @@ class ColdMigrationJob(BaseMigrationJob): self.get('func_cancel_xfer')() super(ColdMigrationJob, self).cancel() +class CloneJob(BaseMigrationJob): + + ''' + A clone job. + + Mandatory items: + * vm_name: name of the vm to migrate + * new_vm_name: the name of the cloned vm + * hv_source: name of the hv which execute the VM + * hv_dest: the destination hypervisor + * author: login of the author cli + ''' + + def job(self): + vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) + + self['title'] = 'Clone %s --> %s' % (vm_id, self['hv_dest']) + logging.info('Job-%s: Started clone for %s', self['id'], vm_id) + + # Cancel the job if the user has not the right to clone the vm or to + # select an hypervisor as destination: + right_check = self.manager.server.check + + tql = 'id=%s' % vm_id + if not right_check(self['author'], 'clone', tql): + raise JobCancelError('author have no rights to migrate this VM') + + tql = 'id=%s' % self['hv_dest'] + if not right_check(self['author'], 'clone_dest', tql): + raise JobCancelError('author have no right to migrate to this hv') + + # Update the VM object: + try: + self.manager.server.objects.update(ids=(vm_id,)) + vm = self.manager.server.objects.get_by_id(vm_id) + except UnknownObjectError: + raise JobCancelError('Cloned VM not found') + + # Get the source and destination hv clients: + try: + source = self.manager.server.get_connection(self['hv_source']) + except KeyError: + raise JobCancelError('source hypervisor is not connected') + + try: + dest = self.manager.server.get_connection(self['hv_dest']) + except KeyError: + raise JobCancelError('destination hypervisor is not connected') + + self.checkpoint() + + self.report('waiting lock for source and dest hypervisors') + logging.info('Job-%s: Trying to acquire locks', self['id']) + + with AcquiresAllOrNone(source.lock, dest.lock): + logging.info('Job-%s: Locks acquired', self['id']) + self.checkpoint() + + # Check if VM is stopped: + if not self._check_status(vm_id, 'stopped'): + raise JobCancelError('vm is not stopped') + + # Create storages on destination: + old_new_disk_mapping = {} # Mapping between old and new disk names + self.report('create volumes') + for disk in vm.get('disk', '').split(): + # Getting informations about the disk: + pool = vm.get('disk%s_pool' % disk) + name = vm.get('disk%s_vol' % disk) + size = vm.get('disk%s_size' % disk) + assert pool is not None, 'pool tag doesn\'t exists' + assert name is not None, 'name tag doesn\'t exists' + assert size is not None, 'size tag doesn\'t exists' + + # Change the name of the disk: + old_name = name + if name.startswith(self['vm_name']): + suffix = name[len(self['vm_name']):] + name = self['new_vm_name'] + suffix + else: + name = '%s_%s' % (self['new_vm_name'], name) + + fulloldname = '/dev/%s/%s' % (pool, old_name) + fullnewname = '/dev/%s/%s' % (pool, name) + old_new_disk_mapping[fulloldname] = fullnewname + + # Create the volume on destination: + dest.proxy.vol_create(pool, name, int(size)) + logging.info('Job-%s: Created volume %s/%s on destination ' + 'hypervisor (was %s)', self['id'], pool, name, + old_name) + + # Rollback stuff for this action: + def rb_volcreate(): + dest.proxy.vol_delete(pool, name) + self.checkpoint(rb_volcreate) + + # Define VM: + self.report('define vm') + logging.info('Job-%s: XML configuration transfert', self['id']) + vm_config = source.proxy.vm_export(self['vm_name']) + + # Change vm configuration XML to update it with new name: + new_vm_config = self._update_xml(vm_config, self['vm_name'], + self['new_vm_name'], + old_new_disk_mapping) + + dest.proxy.vm_define(new_vm_config) + + # Rollback stuff for vm definition: + def rb_define(): + dest.proxy.vm_undefine(name) + self.checkpoint(rb_define) + + # Copy all source disk on destination disk: + for disk in vm.get('disk', '').split(): + self._copy_disk(source, dest, vm, disk, name) + + logging.info('Job-%s: Clonage completed with success', self['id']) + + + def _update_xml(self, vm_config, old_name, name, old_new_name_mapping): + ''' + Update the XML definition of the VM with the new vm name, the new vm + disk, and remove the uuid tag. + ''' + + vm_config = vm_config.replace('%s' % old_name, + '%s' % name) + vm_config = re.sub('(.*?\n)', '', vm_config) + for old, new in old_new_name_mapping.iteritems(): + vm_config = vm_config.replace("='%s'" % old, + "='%s'" % new) + return vm_config + + def _copy_disk(self, source, dest, vm, disk, new_disk): + ''' + Copy the specified disk name of the vm from source to dest. + ''' + + # Get informations about the disk: + pool = vm.get('disk%s_pool' % disk) + name = vm.get('disk%s_vol' % disk) + logging.info('Job-%s: Started copy for %s/%s to %s/%s', + self['id'], pool, name, pool, new_disk) + self.report('copy %s/%s' % (pool, name)) + + # Make the copy and wait for it end: + xferprop = dest.proxy.vol_import(pool, new_disk) + + # Register the cancel function: + def cancel_xfer(): + dest.proxy.vol_import_cancel(xferprop['id']) + self['func_cancel_xfer'] = cancel_xfer + + # Wait for the end of transfert: + cids = set() + cids.add(source.connection.async_call('vol_export', pool, name, + dest.get_ip(), xferprop['port'])) + cids.add(dest.connection.async_call('vol_import_wait', xferprop['id'])) + msgs = self.manager.server.manager.wait(frozenset(cids)) + del self['func_cancel_xfer'] + + # Compare checksum of two answers: + checksums = [] + assert len(msgs) == 2 + logging.info('%r' % (msgs, )) + for msg in msgs: + if msg.get('error') is not None: + msg = 'error while copy: %s' % msg['error']['message'] + raise JobCancelError(msg) + else: + checksums.append(msg.get('checksum')) + self.checkpoint() + + if checksums[0] != checksums[1]: + raise JobCancelError('checksum mismatches') + + def cancel(self): + if self.get('func_cancel_xfer') is not None: + self.get('func_cancel_xfer')() + super(CloneJob, self).cancel() + + class JobsManager(object): ''' @@ -436,6 +621,7 @@ class JobsManager(object): 'kill': KillClientJob, 'kill_oldcli': KillOldCliJob, 'cold_migrate': ColdMigrationJob, + 'clone': CloneJob, } def __init__(self, server):