Loading ccserver/handlers.py +38 −1 Original line number Diff line number Diff line Loading @@ -11,7 +11,8 @@ from ccserver.orderedset import OrderedSet from ccserver.conf import CCConf from ccserver.exceptions import (AlreadyRegistered, AuthenticationError, RightError, ReservedTagError, BadObjectError, BadRoleError, NotConnectedAccountError) BadRoleError, NotConnectedAccountError, CloneError) from ccserver.election import Elector from ccserver import __version__ Loading Loading @@ -768,6 +769,42 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed def clone(self, conn, tql_vm, tql_dest, name): ''' Create and launch a clone job. :param tql_vm: a tql matching one vm object (the cloned vm) :param tql_dest: a tql matching one hypervisor object (the destination hypervisor) :param name: the new name of the VM ''' client = self._server.search_client_by_connection(conn) vm = self._server.list(tql_vm, show=('r', 'h', 'p')) if len(vm) != 1: raise CloneError('VM Tql must select ONE vm') elif vm[0]['r'] != 'vm': raise CloneError('Destination Tql must select a vm') else: vm = vm[0] dest = self._server.list(tql_dest, show=('r',)) if len(dest) != 1: raise CloneError('Destination Tql must select ONE hypervisor') elif dest[0]['r'] != 'hv': raise CloneError('Destination Tql must select an hypervisor') else: dest = dest[0] self._server.jobs.create('clone', **{'vm_name': vm['h'], 'new_vm_name': name, 'hv_source': vm['p'], 'hv_dest': dest['id'], 'author': client.login}) @listed def dbstats(self, conn): ''' Loading ccserver/jobs.py +186 −0 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ Jobs management on the server. from __future__ import absolute_import import re import logging import time from datetime import datetime Loading Loading @@ -424,6 +425,190 @@ class ColdMigrationJob(BaseMigrationJob): self.get('func_cancel_xfer')() super(ColdMigrationJob, self).cancel() class CloneJob(BaseMigrationJob): ''' A clone job. Mandatory items: * vm_name: name of the vm to migrate * new_vm_name: the name of the cloned vm * hv_source: name of the hv which execute the VM * hv_dest: the destination hypervisor * author: login of the author cli ''' def job(self): vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) self['title'] = 'Clone %s --> %s' % (vm_id, self['hv_dest']) logging.info('Job-%s: Started clone for %s', self['id'], vm_id) # Cancel the job if the user has not the right to clone the vm or to # select an hypervisor as destination: right_check = self.manager.server.check tql = 'id=%s' % vm_id if not right_check(self['author'], 'clone', tql): raise JobCancelError('author have no rights to migrate this VM') tql = 'id=%s' % self['hv_dest'] if not right_check(self['author'], 'clone_dest', tql): raise JobCancelError('author have no right to migrate to this hv') # Update the VM object: try: self.manager.server.objects.update(ids=(vm_id,)) vm = self.manager.server.objects.get_by_id(vm_id) except UnknownObjectError: raise JobCancelError('Cloned VM not found') # Get the source and destination hv clients: try: source = self.manager.server.get_connection(self['hv_source']) except KeyError: raise JobCancelError('source hypervisor is not connected') try: dest = self.manager.server.get_connection(self['hv_dest']) except KeyError: raise JobCancelError('destination hypervisor is not connected') self.checkpoint() self.report('waiting lock for source and dest hypervisors') logging.info('Job-%s: Trying to acquire locks', self['id']) with AcquiresAllOrNone(source.lock, dest.lock): logging.info('Job-%s: Locks acquired', self['id']) self.checkpoint() # Check if VM is stopped: if not self._check_status(vm_id, 'stopped'): raise JobCancelError('vm is not stopped') # Create storages on destination: old_new_disk_mapping = {} # Mapping between old and new disk names self.report('create volumes') for disk in vm.get('disk', '').split(): # Getting informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) size = vm.get('disk%s_size' % disk) assert pool is not None, 'pool tag doesn\'t exists' assert name is not None, 'name tag doesn\'t exists' assert size is not None, 'size tag doesn\'t exists' # Change the name of the disk: old_name = name if name.startswith(self['vm_name']): suffix = name[len(self['vm_name']):] name = self['new_vm_name'] + suffix else: name = '%s_%s' % (self['new_vm_name'], name) fulloldname = '/dev/%s/%s' % (pool, old_name) fullnewname = '/dev/%s/%s' % (pool, name) old_new_disk_mapping[fulloldname] = fullnewname # Create the volume on destination: dest.proxy.vol_create(pool, name, int(size)) logging.info('Job-%s: Created volume %s/%s on destination ' 'hypervisor (was %s)', self['id'], pool, name, old_name) # Rollback stuff for this action: def rb_volcreate(): dest.proxy.vol_delete(pool, name) self.checkpoint(rb_volcreate) # Define VM: self.report('define vm') logging.info('Job-%s: XML configuration transfert', self['id']) vm_config = source.proxy.vm_export(self['vm_name']) # Change vm configuration XML to update it with new name: new_vm_config = self._update_xml(vm_config, self['vm_name'], self['new_vm_name'], old_new_disk_mapping) dest.proxy.vm_define(new_vm_config) # Rollback stuff for vm definition: def rb_define(): dest.proxy.vm_undefine(name) self.checkpoint(rb_define) # Copy all source disk on destination disk: for disk in vm.get('disk', '').split(): self._copy_disk(source, dest, vm, disk, name) logging.info('Job-%s: Clonage completed with success', self['id']) def _update_xml(self, vm_config, old_name, name, old_new_name_mapping): ''' Update the XML definition of the VM with the new vm name, the new vm disk, and remove the uuid tag. ''' vm_config = vm_config.replace('<name>%s</name>' % old_name, '<name>%s</name>' % name) vm_config = re.sub('(<uuid>.*?</uuid>\n)', '', vm_config) for old, new in old_new_name_mapping.iteritems(): vm_config = vm_config.replace("='%s'" % old, "='%s'" % new) return vm_config def _copy_disk(self, source, dest, vm, disk, new_disk): ''' Copy the specified disk name of the vm from source to dest. ''' # Get informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) logging.info('Job-%s: Started copy for %s/%s to %s/%s', self['id'], pool, name, pool, new_disk) self.report('copy %s/%s' % (pool, name)) # Make the copy and wait for it end: xferprop = dest.proxy.vol_import(pool, new_disk) # Register the cancel function: def cancel_xfer(): dest.proxy.vol_import_cancel(xferprop['id']) self['func_cancel_xfer'] = cancel_xfer # Wait for the end of transfert: cids = set() cids.add(source.connection.async_call('vol_export', pool, name, dest.get_ip(), xferprop['port'])) cids.add(dest.connection.async_call('vol_import_wait', xferprop['id'])) msgs = self.manager.server.manager.wait(frozenset(cids)) del self['func_cancel_xfer'] # Compare checksum of two answers: checksums = [] assert len(msgs) == 2 logging.info('%r' % (msgs, )) for msg in msgs: if msg.get('error') is not None: msg = 'error while copy: %s' % msg['error']['message'] raise JobCancelError(msg) else: checksums.append(msg.get('checksum')) self.checkpoint() if checksums[0] != checksums[1]: raise JobCancelError('checksum mismatches') def cancel(self): if self.get('func_cancel_xfer') is not None: self.get('func_cancel_xfer')() super(CloneJob, self).cancel() class JobsManager(object): ''' Loading @@ -436,6 +621,7 @@ class JobsManager(object): 'kill': KillClientJob, 'kill_oldcli': KillOldCliJob, 'cold_migrate': ColdMigrationJob, 'clone': CloneJob, } def __init__(self, server): Loading Loading
ccserver/handlers.py +38 −1 Original line number Diff line number Diff line Loading @@ -11,7 +11,8 @@ from ccserver.orderedset import OrderedSet from ccserver.conf import CCConf from ccserver.exceptions import (AlreadyRegistered, AuthenticationError, RightError, ReservedTagError, BadObjectError, BadRoleError, NotConnectedAccountError) BadRoleError, NotConnectedAccountError, CloneError) from ccserver.election import Elector from ccserver import __version__ Loading Loading @@ -768,6 +769,42 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed def clone(self, conn, tql_vm, tql_dest, name): ''' Create and launch a clone job. :param tql_vm: a tql matching one vm object (the cloned vm) :param tql_dest: a tql matching one hypervisor object (the destination hypervisor) :param name: the new name of the VM ''' client = self._server.search_client_by_connection(conn) vm = self._server.list(tql_vm, show=('r', 'h', 'p')) if len(vm) != 1: raise CloneError('VM Tql must select ONE vm') elif vm[0]['r'] != 'vm': raise CloneError('Destination Tql must select a vm') else: vm = vm[0] dest = self._server.list(tql_dest, show=('r',)) if len(dest) != 1: raise CloneError('Destination Tql must select ONE hypervisor') elif dest[0]['r'] != 'hv': raise CloneError('Destination Tql must select an hypervisor') else: dest = dest[0] self._server.jobs.create('clone', **{'vm_name': vm['h'], 'new_vm_name': name, 'hv_source': vm['p'], 'hv_dest': dest['id'], 'author': client.login}) @listed def dbstats(self, conn): ''' Loading
ccserver/jobs.py +186 −0 Original line number Diff line number Diff line Loading @@ -7,6 +7,7 @@ Jobs management on the server. from __future__ import absolute_import import re import logging import time from datetime import datetime Loading Loading @@ -424,6 +425,190 @@ class ColdMigrationJob(BaseMigrationJob): self.get('func_cancel_xfer')() super(ColdMigrationJob, self).cancel() class CloneJob(BaseMigrationJob): ''' A clone job. Mandatory items: * vm_name: name of the vm to migrate * new_vm_name: the name of the cloned vm * hv_source: name of the hv which execute the VM * hv_dest: the destination hypervisor * author: login of the author cli ''' def job(self): vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) self['title'] = 'Clone %s --> %s' % (vm_id, self['hv_dest']) logging.info('Job-%s: Started clone for %s', self['id'], vm_id) # Cancel the job if the user has not the right to clone the vm or to # select an hypervisor as destination: right_check = self.manager.server.check tql = 'id=%s' % vm_id if not right_check(self['author'], 'clone', tql): raise JobCancelError('author have no rights to migrate this VM') tql = 'id=%s' % self['hv_dest'] if not right_check(self['author'], 'clone_dest', tql): raise JobCancelError('author have no right to migrate to this hv') # Update the VM object: try: self.manager.server.objects.update(ids=(vm_id,)) vm = self.manager.server.objects.get_by_id(vm_id) except UnknownObjectError: raise JobCancelError('Cloned VM not found') # Get the source and destination hv clients: try: source = self.manager.server.get_connection(self['hv_source']) except KeyError: raise JobCancelError('source hypervisor is not connected') try: dest = self.manager.server.get_connection(self['hv_dest']) except KeyError: raise JobCancelError('destination hypervisor is not connected') self.checkpoint() self.report('waiting lock for source and dest hypervisors') logging.info('Job-%s: Trying to acquire locks', self['id']) with AcquiresAllOrNone(source.lock, dest.lock): logging.info('Job-%s: Locks acquired', self['id']) self.checkpoint() # Check if VM is stopped: if not self._check_status(vm_id, 'stopped'): raise JobCancelError('vm is not stopped') # Create storages on destination: old_new_disk_mapping = {} # Mapping between old and new disk names self.report('create volumes') for disk in vm.get('disk', '').split(): # Getting informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) size = vm.get('disk%s_size' % disk) assert pool is not None, 'pool tag doesn\'t exists' assert name is not None, 'name tag doesn\'t exists' assert size is not None, 'size tag doesn\'t exists' # Change the name of the disk: old_name = name if name.startswith(self['vm_name']): suffix = name[len(self['vm_name']):] name = self['new_vm_name'] + suffix else: name = '%s_%s' % (self['new_vm_name'], name) fulloldname = '/dev/%s/%s' % (pool, old_name) fullnewname = '/dev/%s/%s' % (pool, name) old_new_disk_mapping[fulloldname] = fullnewname # Create the volume on destination: dest.proxy.vol_create(pool, name, int(size)) logging.info('Job-%s: Created volume %s/%s on destination ' 'hypervisor (was %s)', self['id'], pool, name, old_name) # Rollback stuff for this action: def rb_volcreate(): dest.proxy.vol_delete(pool, name) self.checkpoint(rb_volcreate) # Define VM: self.report('define vm') logging.info('Job-%s: XML configuration transfert', self['id']) vm_config = source.proxy.vm_export(self['vm_name']) # Change vm configuration XML to update it with new name: new_vm_config = self._update_xml(vm_config, self['vm_name'], self['new_vm_name'], old_new_disk_mapping) dest.proxy.vm_define(new_vm_config) # Rollback stuff for vm definition: def rb_define(): dest.proxy.vm_undefine(name) self.checkpoint(rb_define) # Copy all source disk on destination disk: for disk in vm.get('disk', '').split(): self._copy_disk(source, dest, vm, disk, name) logging.info('Job-%s: Clonage completed with success', self['id']) def _update_xml(self, vm_config, old_name, name, old_new_name_mapping): ''' Update the XML definition of the VM with the new vm name, the new vm disk, and remove the uuid tag. ''' vm_config = vm_config.replace('<name>%s</name>' % old_name, '<name>%s</name>' % name) vm_config = re.sub('(<uuid>.*?</uuid>\n)', '', vm_config) for old, new in old_new_name_mapping.iteritems(): vm_config = vm_config.replace("='%s'" % old, "='%s'" % new) return vm_config def _copy_disk(self, source, dest, vm, disk, new_disk): ''' Copy the specified disk name of the vm from source to dest. ''' # Get informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) logging.info('Job-%s: Started copy for %s/%s to %s/%s', self['id'], pool, name, pool, new_disk) self.report('copy %s/%s' % (pool, name)) # Make the copy and wait for it end: xferprop = dest.proxy.vol_import(pool, new_disk) # Register the cancel function: def cancel_xfer(): dest.proxy.vol_import_cancel(xferprop['id']) self['func_cancel_xfer'] = cancel_xfer # Wait for the end of transfert: cids = set() cids.add(source.connection.async_call('vol_export', pool, name, dest.get_ip(), xferprop['port'])) cids.add(dest.connection.async_call('vol_import_wait', xferprop['id'])) msgs = self.manager.server.manager.wait(frozenset(cids)) del self['func_cancel_xfer'] # Compare checksum of two answers: checksums = [] assert len(msgs) == 2 logging.info('%r' % (msgs, )) for msg in msgs: if msg.get('error') is not None: msg = 'error while copy: %s' % msg['error']['message'] raise JobCancelError(msg) else: checksums.append(msg.get('checksum')) self.checkpoint() if checksums[0] != checksums[1]: raise JobCancelError('checksum mismatches') def cancel(self): if self.get('func_cancel_xfer') is not None: self.get('func_cancel_xfer')() super(CloneJob, self).cancel() class JobsManager(object): ''' Loading @@ -436,6 +621,7 @@ class JobsManager(object): 'kill': KillClientJob, 'kill_oldcli': KillOldCliJob, 'cold_migrate': ColdMigrationJob, 'clone': CloneJob, } def __init__(self, server): Loading