Loading ccserver/handlers.py +2 −1 Original line number Diff line number Diff line Loading @@ -17,7 +17,8 @@ from ccserver.election import Elector from ccserver import __version__ MIGRATION_TYPES = {'cold': 'cold_migrate'} MIGRATION_TYPES = {'cold': 'cold_migrate', 'hot': 'hot_migrate',} def listed(func): Loading ccserver/jobs.py +178 −0 Original line number Diff line number Diff line Loading @@ -425,6 +425,183 @@ class ColdMigrationJob(BaseMigrationJob): self.get('func_cancel_xfer')() super(ColdMigrationJob, self).cancel() class HotMigrationJob(BaseMigrationJob): ''' A hot vm migration job. Mandatory items: * vm_name: name of the vm to migrate * hv_source: name of the hv which execute the VM * hv_dest: the destination hypervisor * author: login of the author cli ''' def job(self): vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) self['title'] = 'Hot migration %s --> %s' % (vm_id, self['hv_dest']) logging.info('Job-%s: Started hot migration for %s', self['id'], vm_id) # Cancel the job if the user has not the right to migrate the vm or to # select an hypervisor as destination: right_check = self.manager.server.check tql = 'id=%s' % vm_id if not right_check(self['author'], 'hotmigrate', tql): raise JobCancelError('author have no rights to migrate this VM') tql = 'id=%s' % self['hv_dest'] if not right_check(self['author'], 'hotmigrate_dest', tql): raise JobCancelError('author have no right to migrate to this hv') # Update the VM object: try: self.manager.server.objects.update(ids=(vm_id,)) vm = self.manager.server.objects.get_by_id(vm_id) except UnknownObjectError: raise JobCancelError('Source VM not found') # Get the source and destination hv clients: try: source = self.manager.server.get_connection(self['hv_source']) except KeyError: raise JobCancelError('source hypervisor is not connected') try: dest = self.manager.server.get_connection(self['hv_dest']) except KeyError: raise JobCancelError('destination hypervisor is not connected') self.checkpoint() self.report('waiting lock for source and dest hypervisors') logging.info('Job-%s: Trying to acquire locks', self['id']) with AcquiresAllOrNone(source.lock, dest.lock): logging.info('Job-%s: Locks acquired', self['id']) self.checkpoint() if not self._check_status(vm_id, 'running'): raise JobCancelError('vm is not started') # Create storages on destination and start synchronization: disks = vm.get('disk', '').split() for disk in disks: # Getting informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) size = vm.get('disk%s_size' % disk) assert pool is not None, 'pool tag doesn\'t exists' assert name is not None, 'name tag doesn\'t exists' assert size is not None, 'size tag doesn\'t exists' self.report('sync volume %s/%s (creation)' % (pool, name)) # Create the volume on destination: dest.proxy.vol_create(pool, name, int(size)) logging.info('Job-%s: Created volume %s/%s on destination ' 'hypervisor', self['id'], pool, name) # Rollback stuff for this action: def rb_volcreate(): dest.proxy.vol_delete(pool, name) self.checkpoint(rb_volcreate) # Setup the drbd synchronization with each hypervisors: self.report('sync volume %s/%s (setup)' % (pool, name)) to_cleanup = [] res_src = source.proxy.drbd_setup(pool, name) def rb_setupsrc(): source.proxy.drbd_shutdown(res_src) self.checkpoint(rb_setupsrc) to_cleanup.append(rb_setupsrc) res_dst = dest.proxy.drbd_setup(pool, name) def rb_setupdst(): dest.proxy.drbd_shutdown(res_dst) self.checkpoint(rb_setupdst) to_cleanup.append(rb_setupdst) # Start connection of drbd: self.report('sync volume %s/%s (connect)' % (pool, name)) source.proxy.drbd_connect(res_src, res_dst, dest.get_ip()) dest.proxy.drbd_connect(res_dst, res_src, source.get_ip()) # Setup topology as Primary/Secondary: source.proxy.drbd_role(res_src, True) dest.proxy.drbd_role(res_dst, False) # Wait for the end of the disk synchronization: self.report('sync volume %s/%s (sync)' % (pool, name)) cids = set() cids.add(source.connection.async_call('drbd_sync', res_src)) cids.add(dest.connection.async_call('drbd_sync', res_dst)) msgs = self.manager.server.manager.wait(frozenset(cids)) #TODO: check error source.proxy.drbd_takeover(res_src, True) def rb_takeover_src(): source.proxy.drbd_takeover(res_src, False) self.checkpoint(rb_takeover_src) to_cleanup.append(rb_takeover_src) dest.proxy.drbd_takeover(res_dst, True) def rb_takeover_dst(): dest.proxy.drbd_takeover(res_dst, False) self.checkpoint(rb_takeover_dst) to_cleanup.append(rb_takeover_dst) #... tunres_src = source.proxy.tun_setup() def rb_tun_src(): source.proxy.tun_destroy(tunres_src) self.checkpoint(rb_tun_src) tunres_dst = dest.proxy.tun_setup(local=False) def rb_tun_dst(): dest.proxy.tun_destroy(tunres_dst) self.checkpoint(rb_tun_dst) source.proxy.tun_connect(tunres_src, tunres_dst, dest.get_ip()) dest.proxy.tun_connect_hv(tunres_dst, migration=True) # Initiate the live migration: self.report('migration in progress') source.proxy.vm_migrate_tunneled(self['vm_name'], tunres_src) # At this point, if operation is a success, all we need is just to # cleanup source hypervisor from disk and vm. This operation *CAN'T* # be cancelled or rollbacked if anything fails (unlikely). The # migration must be considered as a success, and the only way to # undo this is to start a new migration in the other way. # Delete the rollback list. # This is mandatory to avoid data loss if the cleanup # code below fail. self.report('cleanup') self._wayback = [] source.proxy.tun_destroy(tunres_src) dest.proxy.tun_destroy(tunres_dst) for cb_cleanup in reversed(to_cleanup): cb_cleanup() # Cleanup the disks: for disk in disks: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) source.proxy.vol_delete(pool, name) logging.info('Job-%s: Migration completed with success', self['id']) class CloneJob(BaseMigrationJob): ''' Loading Loading @@ -617,6 +794,7 @@ class JobsManager(object): 'kill': KillClientJob, 'kill_oldcli': KillOldCliJob, 'cold_migrate': ColdMigrationJob, 'hot_migrate': HotMigrationJob, 'clone': CloneJob, } Loading Loading
ccserver/handlers.py +2 −1 Original line number Diff line number Diff line Loading @@ -17,7 +17,8 @@ from ccserver.election import Elector from ccserver import __version__ MIGRATION_TYPES = {'cold': 'cold_migrate'} MIGRATION_TYPES = {'cold': 'cold_migrate', 'hot': 'hot_migrate',} def listed(func): Loading
ccserver/jobs.py +178 −0 Original line number Diff line number Diff line Loading @@ -425,6 +425,183 @@ class ColdMigrationJob(BaseMigrationJob): self.get('func_cancel_xfer')() super(ColdMigrationJob, self).cancel() class HotMigrationJob(BaseMigrationJob): ''' A hot vm migration job. Mandatory items: * vm_name: name of the vm to migrate * hv_source: name of the hv which execute the VM * hv_dest: the destination hypervisor * author: login of the author cli ''' def job(self): vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) self['title'] = 'Hot migration %s --> %s' % (vm_id, self['hv_dest']) logging.info('Job-%s: Started hot migration for %s', self['id'], vm_id) # Cancel the job if the user has not the right to migrate the vm or to # select an hypervisor as destination: right_check = self.manager.server.check tql = 'id=%s' % vm_id if not right_check(self['author'], 'hotmigrate', tql): raise JobCancelError('author have no rights to migrate this VM') tql = 'id=%s' % self['hv_dest'] if not right_check(self['author'], 'hotmigrate_dest', tql): raise JobCancelError('author have no right to migrate to this hv') # Update the VM object: try: self.manager.server.objects.update(ids=(vm_id,)) vm = self.manager.server.objects.get_by_id(vm_id) except UnknownObjectError: raise JobCancelError('Source VM not found') # Get the source and destination hv clients: try: source = self.manager.server.get_connection(self['hv_source']) except KeyError: raise JobCancelError('source hypervisor is not connected') try: dest = self.manager.server.get_connection(self['hv_dest']) except KeyError: raise JobCancelError('destination hypervisor is not connected') self.checkpoint() self.report('waiting lock for source and dest hypervisors') logging.info('Job-%s: Trying to acquire locks', self['id']) with AcquiresAllOrNone(source.lock, dest.lock): logging.info('Job-%s: Locks acquired', self['id']) self.checkpoint() if not self._check_status(vm_id, 'running'): raise JobCancelError('vm is not started') # Create storages on destination and start synchronization: disks = vm.get('disk', '').split() for disk in disks: # Getting informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) size = vm.get('disk%s_size' % disk) assert pool is not None, 'pool tag doesn\'t exists' assert name is not None, 'name tag doesn\'t exists' assert size is not None, 'size tag doesn\'t exists' self.report('sync volume %s/%s (creation)' % (pool, name)) # Create the volume on destination: dest.proxy.vol_create(pool, name, int(size)) logging.info('Job-%s: Created volume %s/%s on destination ' 'hypervisor', self['id'], pool, name) # Rollback stuff for this action: def rb_volcreate(): dest.proxy.vol_delete(pool, name) self.checkpoint(rb_volcreate) # Setup the drbd synchronization with each hypervisors: self.report('sync volume %s/%s (setup)' % (pool, name)) to_cleanup = [] res_src = source.proxy.drbd_setup(pool, name) def rb_setupsrc(): source.proxy.drbd_shutdown(res_src) self.checkpoint(rb_setupsrc) to_cleanup.append(rb_setupsrc) res_dst = dest.proxy.drbd_setup(pool, name) def rb_setupdst(): dest.proxy.drbd_shutdown(res_dst) self.checkpoint(rb_setupdst) to_cleanup.append(rb_setupdst) # Start connection of drbd: self.report('sync volume %s/%s (connect)' % (pool, name)) source.proxy.drbd_connect(res_src, res_dst, dest.get_ip()) dest.proxy.drbd_connect(res_dst, res_src, source.get_ip()) # Setup topology as Primary/Secondary: source.proxy.drbd_role(res_src, True) dest.proxy.drbd_role(res_dst, False) # Wait for the end of the disk synchronization: self.report('sync volume %s/%s (sync)' % (pool, name)) cids = set() cids.add(source.connection.async_call('drbd_sync', res_src)) cids.add(dest.connection.async_call('drbd_sync', res_dst)) msgs = self.manager.server.manager.wait(frozenset(cids)) #TODO: check error source.proxy.drbd_takeover(res_src, True) def rb_takeover_src(): source.proxy.drbd_takeover(res_src, False) self.checkpoint(rb_takeover_src) to_cleanup.append(rb_takeover_src) dest.proxy.drbd_takeover(res_dst, True) def rb_takeover_dst(): dest.proxy.drbd_takeover(res_dst, False) self.checkpoint(rb_takeover_dst) to_cleanup.append(rb_takeover_dst) #... tunres_src = source.proxy.tun_setup() def rb_tun_src(): source.proxy.tun_destroy(tunres_src) self.checkpoint(rb_tun_src) tunres_dst = dest.proxy.tun_setup(local=False) def rb_tun_dst(): dest.proxy.tun_destroy(tunres_dst) self.checkpoint(rb_tun_dst) source.proxy.tun_connect(tunres_src, tunres_dst, dest.get_ip()) dest.proxy.tun_connect_hv(tunres_dst, migration=True) # Initiate the live migration: self.report('migration in progress') source.proxy.vm_migrate_tunneled(self['vm_name'], tunres_src) # At this point, if operation is a success, all we need is just to # cleanup source hypervisor from disk and vm. This operation *CAN'T* # be cancelled or rollbacked if anything fails (unlikely). The # migration must be considered as a success, and the only way to # undo this is to start a new migration in the other way. # Delete the rollback list. # This is mandatory to avoid data loss if the cleanup # code below fail. self.report('cleanup') self._wayback = [] source.proxy.tun_destroy(tunres_src) dest.proxy.tun_destroy(tunres_dst) for cb_cleanup in reversed(to_cleanup): cb_cleanup() # Cleanup the disks: for disk in disks: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) source.proxy.vol_delete(pool, name) logging.info('Job-%s: Migration completed with success', self['id']) class CloneJob(BaseMigrationJob): ''' Loading Loading @@ -617,6 +794,7 @@ class JobsManager(object): 'kill': KillClientJob, 'kill_oldcli': KillOldCliJob, 'cold_migrate': ColdMigrationJob, 'hot_migrate': HotMigrationJob, 'clone': CloneJob, } Loading