diff --git a/ccserver/handlers.py b/ccserver/handlers.py index cb1a67868e35f00eb342dab35bfda9131d35e2f0..38d46231fa56080ea16ec60bab418610619d7aae 100644 --- a/ccserver/handlers.py +++ b/ccserver/handlers.py @@ -17,7 +17,8 @@ from ccserver.election import Elector from ccserver import __version__ -MIGRATION_TYPES = {'cold': 'cold_migrate'} +MIGRATION_TYPES = {'cold': 'cold_migrate', + 'hot': 'hot_migrate',} def listed(func): diff --git a/ccserver/jobs.py b/ccserver/jobs.py index a6d5d0e587b7b3083ea761d05296f9a19b4408fa..0fe222d3b904756b605287ce220e9ef3c56512d6 100644 --- a/ccserver/jobs.py +++ b/ccserver/jobs.py @@ -425,6 +425,183 @@ class ColdMigrationJob(BaseMigrationJob): self.get('func_cancel_xfer')() super(ColdMigrationJob, self).cancel() + +class HotMigrationJob(BaseMigrationJob): + + ''' + A hot vm migration job. + + Mandatory items: + * vm_name: name of the vm to migrate + * hv_source: name of the hv which execute the VM + * hv_dest: the destination hypervisor + * author: login of the author cli + ''' + + def job(self): + vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) + + self['title'] = 'Hot migration %s --> %s' % (vm_id, self['hv_dest']) + logging.info('Job-%s: Started hot migration for %s', self['id'], vm_id) + + # Cancel the job if the user has not the right to migrate the vm or to + # select an hypervisor as destination: + right_check = self.manager.server.check + + tql = 'id=%s' % vm_id + if not right_check(self['author'], 'hotmigrate', tql): + raise JobCancelError('author have no rights to migrate this VM') + + tql = 'id=%s' % self['hv_dest'] + if not right_check(self['author'], 'hotmigrate_dest', tql): + raise JobCancelError('author have no right to migrate to this hv') + + # Update the VM object: + try: + self.manager.server.objects.update(ids=(vm_id,)) + vm = self.manager.server.objects.get_by_id(vm_id) + except UnknownObjectError: + raise JobCancelError('Source VM not found') + + # Get the source and destination hv clients: + try: + source = self.manager.server.get_connection(self['hv_source']) + except KeyError: + raise JobCancelError('source hypervisor is not connected') + + try: + dest = self.manager.server.get_connection(self['hv_dest']) + except KeyError: + raise JobCancelError('destination hypervisor is not connected') + + self.checkpoint() + + self.report('waiting lock for source and dest hypervisors') + logging.info('Job-%s: Trying to acquire locks', self['id']) + + with AcquiresAllOrNone(source.lock, dest.lock): + logging.info('Job-%s: Locks acquired', self['id']) + self.checkpoint() + + if not self._check_status(vm_id, 'running'): + raise JobCancelError('vm is not started') + + # Create storages on destination and start synchronization: + disks = vm.get('disk', '').split() + for disk in disks: + # Getting informations about the disk: + pool = vm.get('disk%s_pool' % disk) + name = vm.get('disk%s_vol' % disk) + size = vm.get('disk%s_size' % disk) + assert pool is not None, 'pool tag doesn\'t exists' + assert name is not None, 'name tag doesn\'t exists' + assert size is not None, 'size tag doesn\'t exists' + + self.report('sync volume %s/%s (creation)' % (pool, name)) + + # Create the volume on destination: + dest.proxy.vol_create(pool, name, int(size)) + logging.info('Job-%s: Created volume %s/%s on destination ' + 'hypervisor', self['id'], pool, name) + + # Rollback stuff for this action: + def rb_volcreate(): + dest.proxy.vol_delete(pool, name) + self.checkpoint(rb_volcreate) + + # Setup the drbd synchronization with each hypervisors: + self.report('sync volume %s/%s (setup)' % (pool, name)) + + to_cleanup = [] + + res_src = source.proxy.drbd_setup(pool, name) + def rb_setupsrc(): + source.proxy.drbd_shutdown(res_src) + self.checkpoint(rb_setupsrc) + to_cleanup.append(rb_setupsrc) + + res_dst = dest.proxy.drbd_setup(pool, name) + def rb_setupdst(): + dest.proxy.drbd_shutdown(res_dst) + self.checkpoint(rb_setupdst) + to_cleanup.append(rb_setupdst) + + # Start connection of drbd: + self.report('sync volume %s/%s (connect)' % (pool, name)) + + source.proxy.drbd_connect(res_src, res_dst, dest.get_ip()) + dest.proxy.drbd_connect(res_dst, res_src, source.get_ip()) + + # Setup topology as Primary/Secondary: + source.proxy.drbd_role(res_src, True) + dest.proxy.drbd_role(res_dst, False) + + # Wait for the end of the disk synchronization: + self.report('sync volume %s/%s (sync)' % (pool, name)) + cids = set() + cids.add(source.connection.async_call('drbd_sync', res_src)) + cids.add(dest.connection.async_call('drbd_sync', res_dst)) + msgs = self.manager.server.manager.wait(frozenset(cids)) + #TODO: check error + + source.proxy.drbd_takeover(res_src, True) + def rb_takeover_src(): + source.proxy.drbd_takeover(res_src, False) + self.checkpoint(rb_takeover_src) + to_cleanup.append(rb_takeover_src) + + dest.proxy.drbd_takeover(res_dst, True) + def rb_takeover_dst(): + dest.proxy.drbd_takeover(res_dst, False) + self.checkpoint(rb_takeover_dst) + to_cleanup.append(rb_takeover_dst) + + #... + tunres_src = source.proxy.tun_setup() + def rb_tun_src(): + source.proxy.tun_destroy(tunres_src) + self.checkpoint(rb_tun_src) + + tunres_dst = dest.proxy.tun_setup(local=False) + def rb_tun_dst(): + dest.proxy.tun_destroy(tunres_dst) + self.checkpoint(rb_tun_dst) + + source.proxy.tun_connect(tunres_src, tunres_dst, dest.get_ip()) + dest.proxy.tun_connect_hv(tunres_dst, migration=True) + + # Initiate the live migration: + self.report('migration in progress') + source.proxy.vm_migrate_tunneled(self['vm_name'], tunres_src) + + # At this point, if operation is a success, all we need is just to + # cleanup source hypervisor from disk and vm. This operation *CAN'T* + # be cancelled or rollbacked if anything fails (unlikely). The + # migration must be considered as a success, and the only way to + # undo this is to start a new migration in the other way. + + # Delete the rollback list. + # This is mandatory to avoid data loss if the cleanup + # code below fail. + self.report('cleanup') + self._wayback = [] + + source.proxy.tun_destroy(tunres_src) + dest.proxy.tun_destroy(tunres_dst) + + for cb_cleanup in reversed(to_cleanup): + cb_cleanup() + + # Cleanup the disks: + for disk in disks: + pool = vm.get('disk%s_pool' % disk) + name = vm.get('disk%s_vol' % disk) + + source.proxy.vol_delete(pool, name) + + logging.info('Job-%s: Migration completed with success', self['id']) + + class CloneJob(BaseMigrationJob): ''' @@ -617,6 +794,7 @@ class JobsManager(object): 'kill': KillClientJob, 'kill_oldcli': KillOldCliJob, 'cold_migrate': ColdMigrationJob, + 'hot_migrate': HotMigrationJob, 'clone': CloneJob, }