Loading ccserver/election.py 0 → 100644 +285 −0 Original line number Diff line number Diff line #!/usr/bin/env python #coding=utf8 ''' This module contains the hypervisor destination election stuff. ''' from __future__ import absolute_import from copy import copy from ccserver.exceptions import UnknownElectionAlgo, UnknownElectionType def tags(*args): ''' Decorator used to declare tags used by a filter. ''' def decorator(func): func.__tags__ = set(args) return func return decorator class Elector(object): # Filtering function for destination hypervisors: FILTERS = { 'cold': (('is_hv', 'filter r=hv'), ('not_source_hv', 'filter source hv'), ('is_connected', 'filter connected hv'), ('vm_htype_eq_hv', 'filter bad hv types'), ('has_rights', 'filter rights'), ('has_alloc', 'filter allocatable hv'), ('duplicate_name', 'filter vm duplicate names'), ('enough_disk', 'filter hv with not enough disk'),), } # Available algos for each types: ALGO_BY_TYPES = {'cold': ('fair', )} def __init__(self, server, query_vm, query_dest, login): # The server instance for this election: self._server = server # The TQL query to select vm: self._query_vm = query_vm # The TQL query to select destination hypervisor: self._query_dest = query_dest # The login of the initiator of the election: self._login = login def election(self, mtype, algo): ''' Generate a new migration plan for this election. You must specify the migration type and the distribution algoritm. :param mtype: the migration type :param algo: the distribution algoritm ''' # Check the choosen election method: if mtype not in self.ALGO_BY_TYPES: raise UnknownElectionType('%r is unknown migration type' % mtype) else: if algo not in self.ALGO_BY_TYPES[mtype]: raise UnknownElectionAlgo('%r is unknown migration algo' % algo) else: func = '_algo_%s' % algo if not hasattr(self, func): raise UnknownElectionAlgo('%r not found' % func) else: distribute = getattr(self, func) # Get the destination hypervisor candidates: candidates = self._get_candidates(self.FILTERS[mtype]) # Distributes VMs to each candidate: migration_plan = distribute(candidates) # Return the migration plan: return migration_plan def _get_candidates(self, filters): # Get all the tags needed for hypervisors and construct the final # filter list: filterfuncs = [] hv_tags = set() for name, desc in filters: filterfunc = getattr(self, '_filter_%s' % name) filterfuncs.append((filterfunc, desc)) hv_tags |= getattr(filterfunc, '__tags__', set()) # Get the selected vms and hvs: vms = self._server.list(self._query_vm, show=('*',)) hvs = self._server.list(self._query_dest, show=hv_tags) candidates = [] # Filters the candidates: for vm in vms: if vm['r'] != 'vm': continue tql = 'id=%s' % vm['id'] if not self._server.check(self._login, 'coldmigrate', tql): continue vm_dest = copy(hvs) for func, desc in filterfuncs: vm_dest = func(vm, vm_dest) candidates.append((vm, vm_dest)) return candidates ##### ##### Distribution algorithm methods: ##### def _algo_fair(self, candidates): migration_plan = [] # Sort vm by number of destination hv: candidates = sorted(candidates, key=lambda x: len(x[1])) hv_alloc = {} for vm, hvs in candidates: if not hvs: migration_plan.append({'sid': vm['id'], 'did': '', 'error': 'no destination hv found', 'type': 'error'}) else: # Try to take an hypervisor that is not already in the plan: for hv in hvs: if hv['id'] not in hv_alloc: break else: # If all candidates for this VM are already in the # migration plan, we take the less allocated: hv = min(hvs, key=lambda x: hv_alloc[x['id']]) migration_plan.append({'sid': vm['id'], 'did': hv['id'], 'type': 'cold'}) hv_alloc[hv['id']] = hv_alloc.get(hv['id'], 0) + 1 return migration_plan ##### ##### Filtering methods: ##### @tags('r') def _filter_is_hv(self, vm, hvs): returned = [] for hv in hvs: if hv.get('r') == 'hv': returned.append(hv) return returned @tags('con') def _filter_is_connected(self, vm, hvs): returned = [] for hv in hvs: if hv.get('con'): returned.append(hv) return returned @tags('htype') def _filter_vm_htype_eq_hv(self, vm, hvs): returned = [] vm_htype = vm.get('htype') for hv in hvs: if hv.get('htype') == vm_htype: returned.append(hv) return returned @tags('id') def _filter_not_source_hv(self, vm, hvs): returned = [] hv_id, _, _ = vm['id'].partition('.') for hv in hvs: if hv['id'] != hv_id: returned.append(hv) return returned def _filter_has_rights(self, vm, hvs): returned = [] for hv in hvs: tql = 'id=%s' % hv['id'] if self._server.check(self._login, 'coldmigrate_dest', tql): returned.append(hv) return returned @tags('alloc') def _filter_has_alloc(self, vm, hvs): returned = [] for hv in hvs: if hv.get('alloc', False): returned.append(hv) return returned def _filter_duplicate_name(self, vm, hvs): returned = [] hv_id, _, vm_name = vm['id'].partition('.') for hv in hvs: vms = self._server.list('id:%s.*$h' % hv['id']) duplicate = False for vm in vms: if vm.get('h') == vm_name: duplicate = True break if not duplicate: returned.append(hv) return returned @tags('memfree') def _filter_enough_ram(self, vm, hvs): returned = [] for hv in hvs: if int(hv.get('memfree', 0)) >= int(self.vm.get('mem', 0)): returned.append(hv) return returned @tags('cpu') def _filter_enough_core(self, vm, hvs): returned = [] for hv in hvs: # Calculate the total number of vcpu used by VMs: vms = self.manager.server.list('id:%s.*$cpu' % hv['id']) count = 0 for vm in vms: count += int(vm.get('cpu', 0)) if int(hv.get('cpu', 0)) >= count + int(self.vm.get('cpu', 0)): returned.append(hv) return returned @tags('*') def _filter_enough_disk(self, vm, hvs): returned = [] return hvs # Calculate the size needed for each pools: pools = {} for disk in vm['disk'].split(): size = vm['disk%s_size' % disk] pool = vm['disk%s_pool' % disk] vol = vm['disk%s_vol' % disk] dpool = pools.get(pool, {'size': 0, 'vols': set()}) dpool['size'] += int(size) dpool['vols'].add(vol) pools[pool] = dpool # Check for each HV if it match: for hv in hvs: good = True for pool, prop in pools.iteritems(): free = int(hv.get('sto%s_free' % pool, 0)) vols = set(hv.get('sto%s_vol' % pool, '').split()) if free < prop['size'] or prop['vols'] & vols: good = False if good: returned.append(hv) return returned def _filter_not_locked(self, vm, hvs): if len(hvs) == 1: return hvs returned = [] for hv in hvs: lock = self.manager.server.get_connection(hv['id']).lock if not lock.locked(): returned.append(hv) if len(returned) == 0: return hvs return returned def _filter_biggest_id(self, hvs): return sorted(hvs, key=lambda x: x['id'])[-1] ccserver/exceptions.py +13 −0 Original line number Diff line number Diff line Loading @@ -41,5 +41,18 @@ class BadJobTypeError(Exception): class UnknownJobError(Exception): pass class UnknownElectionType(Exception): pass class UnknownElectionAlgo(Exception): pass class UnknownMigrationType(Exception): pass class UnknownObjectError(Exception): pass ccserver/handlers.py +57 −0 Original line number Diff line number Diff line Loading @@ -11,8 +11,13 @@ from ccserver.conf import CCConf from ccserver.exceptions import (AlreadyRegistered, AuthenticationError, RightError, ReservedTagError, BadObjectError, BadRoleError, NotConnectedAccountError) from ccserver.election import Elector from ccserver import __version__ MIGRATION_TYPES = {'cold': 'cold_migrate'} def listed(func): func.__listed__ = True return func Loading Loading @@ -666,6 +671,58 @@ class CliHandler(OnlineCCHandler): self._server.jobs.purge() @listed def electiontypes(self, conn): return Elector.ALGO_BY_TYPES @listed def election(self, conn, query_vm, query_dest, mtype='cold', algo='fair', **kwargs): ''' Consult the server for the migration of specified vm on an hypervisor pool. :param query_vm: the tql query to select VMs to migrate :param query_dest: the tql query to select destination hypervisors candidates :param mtype: type of migration :param algo: algo used for distribution ''' client = self._server.search_client_by_connection(conn) elector = Elector(self._server, query_vm, query_dest, client.login) return elector.election(mtype, algo, **kwargs) @listed def migrate(self, conn, migration_plan): ''' Launch the provided migration plan. :param migration_plan: the plan of the migration. :return: a standard error report ''' client = self._server.search_client_by_connection(conn) errs = Reporter() for migration in migration_plan: # Check if the migration type is know: if migration['type'] in MIGRATION_TYPES: mtype = MIGRATION_TYPES[migration['type']] else: errmsg = '%r unknown migration type' % migration['type'] errs.error(migration['sid'], errmsg) # Construct the migration properties: migration_properties = { 'author': client.login, 'vm_name': migration['sid'].split('.')[-1], 'hv_source': '.'.join(migration['sid'].split('.')[:-1]), 'hv_dest': migration['did'] } # Create the job: self._server.jobs.create(mtype, **migration_properties) errs.success(migration['sid'], 'migration launched') return errs.get_dict() @listed def dbstats(self, conn): Loading ccserver/jobs.py +177 −1 Original line number Diff line number Diff line Loading @@ -12,7 +12,9 @@ import time from datetime import datetime from threading import Thread, Lock from ccserver.exceptions import BadJobTypeError, UnknownJobError from ccserver.exceptions import (BadJobTypeError, UnknownJobError, JobError, UnknownObjectError) from ccserver.utils import AcquiresAllOrNone class JobCancelError(Exception): ''' Loading Loading @@ -241,6 +243,179 @@ class KillOldCliJob(BaseJob): time.sleep(delay) class ColdMigrationJob(BaseJob): ''' A cold vm migration job. Mandatory items: * vm_name: name of the vm to migrate * hv_source: name of the hv which execute the VM * hv_dest: the destination hypervisor * author: login of the author cli ''' def job(self): vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) self['title'] = 'Cold migration %s --> %s' % (vm_id, self['hv_dest']) logging.info('Job-%s: Started migration for %s', self['id'], vm_id) # Cancel the job if the user has not the right to migrate the vm or to # select an hypervisor as destination: right_check = self.manager.server.check tql = 'id=%s' % vm_id if not right_check(self['author'], 'coldmigrate', tql): raise JobCancelError('author have no rights to migrate this VM') tql = 'id=%s' % self['hv_dest'] if not right_check(self['author'], 'coldmigrate_dest', tql): raise JobCancelError('author have no right to migrate to this hv') # Update the VM object: try: self.manager.server.objects.update(ids=(vm_id,)) vm = self.manager.server.objects.get_by_id(vm_id) except UnknownObjectError: raise JobCancelError('Source VM not found') # Get the source and destination hv clients: try: source = self.manager.server.get_connection(self['hv_source']) except KeyError: raise JobCancelError('source hypervisor is not connected') try: dest = self.manager.server.get_connection(self['hv_dest']) except KeyError: raise JobCancelError('destination hypervisor is not connected') self.checkpoint() self.report('waiting lock for source and dest hypervisors') logging.info('Job-%s: Trying to acquire locks', self['id']) with AcquiresAllOrNone(source.lock, dest.lock): logging.info('Job-%s: Locks acquired', self['id']) self.checkpoint() if not self._check_status(vm_id, 'stopped'): raise JobCancelError('vm is not stopped') # Create storages on destination: self.report('create volumes') for disk in vm.get('disk', '').split(): # Getting informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) size = vm.get('disk%s_size' % disk) assert pool is not None, 'pool tag doesn\'t exists' assert name is not None, 'name tag doesn\'t exists' assert size is not None, 'size tag doesn\'t exists' # Create the volume on destination: dest.proxy.vol_create(pool, name, int(size)) logging.info('Job-%s: Created volume %s/%s on destination ' 'hypervisor', self['id'], pool, name) # Rollback stuff for this action: def rb_volcreate(): dest.proxy.vol_delete(pool, name) self.checkpoint(rb_volcreate) # Define VM: self.report('define vm') logging.info('Job-%s: XML configuration transfert', self['id']) vm_config = source.proxy.vm_export(self['vm_name']) dest.proxy.vm_define(vm_config) # Rollback stuff for vm definition: def rb_define(): dest.proxy.vm_undefine(self['vm_name']) self.checkpoint(rb_define) # Copy all source disk on destination disk: for disk in vm.get('disk', '').split(): self._copy_disk(source, dest, vm, disk) # At this point, if operation is a success, all we need is just to # cleanup source hypervisor from disk and vm. This operation *CAN'T* # be cancelled or rollbacked if anything fails (unlikely). The # migration must be considered as a success, and the only way to # undo this is to start a new migration in the other way. # Delete the rollback list. # This is mandatory to avoid data loss if the cleanup # code below fail. self._wayback = [] # Cleanup the disks: for disk in vm.get('disk', '').split(): pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) source.proxy.vol_delete(pool, name) # Cleanup the VM: source.proxy.vm_undefine(self['vm_name']) logging.info('Job-%s: Migration completed with success', self['id']) def _check_status(self, vm_id, status): ''' Check the status of the VM. ''' answer = self.manager.server.list('id=%s&status=%s' % (vm_id, status)) return bool(answer) def _copy_disk(self, source, dest, vm, disk): ''' Copy the specified disk name of the vm from source to dest. ''' # Get informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) logging.info('Job-%s: Started copy for %s/%s', self['id'], pool, name) self.report('copy %s/%s' % (pool, name)) # Make the copy and wait for it end: xferprop = dest.proxy.vol_import(pool, name) # Register the cancel function: def cancel_xfer(): dest.proxy.vol_import_cancel(xferprop['id']) self['func_cancel_xfer'] = cancel_xfer # Wait for the end of transfert: cids = set() cids.add(source.connection.async_call('vol_export', pool, name, dest.get_ip(), xferprop['port'])) cids.add(dest.connection.async_call('vol_import_wait', xferprop['id'])) msgs = self.manager.server.manager.wait(frozenset(cids)) del self['func_cancel_xfer'] # Compare checksum of two answers: checksums = [] assert len(msgs) == 2 for msg in msgs: if msg.get('error') is not None: msg = 'error while copy: %s' % msg['error']['message'] raise JobCancelError(msg) else: checksums.append(msg.get('checksum')) self.checkpoint() if checksums[0] != checksums[1]: raise JobCancelError('checksum mismatches') def cancel(self): if self.get('func_cancel_xfer') is not None: self.get('func_cancel_xfer')() super(ColdMigrationJob, self).cancel() class JobsManager(object): ''' Loading @@ -252,6 +427,7 @@ class JobsManager(object): JOBS_TYPES = { 'kill': KillClientJob, 'kill_oldcli': KillOldCliJob, 'cold_migrate': ColdMigrationJob, } def __init__(self, server): Loading Loading
ccserver/election.py 0 → 100644 +285 −0 Original line number Diff line number Diff line #!/usr/bin/env python #coding=utf8 ''' This module contains the hypervisor destination election stuff. ''' from __future__ import absolute_import from copy import copy from ccserver.exceptions import UnknownElectionAlgo, UnknownElectionType def tags(*args): ''' Decorator used to declare tags used by a filter. ''' def decorator(func): func.__tags__ = set(args) return func return decorator class Elector(object): # Filtering function for destination hypervisors: FILTERS = { 'cold': (('is_hv', 'filter r=hv'), ('not_source_hv', 'filter source hv'), ('is_connected', 'filter connected hv'), ('vm_htype_eq_hv', 'filter bad hv types'), ('has_rights', 'filter rights'), ('has_alloc', 'filter allocatable hv'), ('duplicate_name', 'filter vm duplicate names'), ('enough_disk', 'filter hv with not enough disk'),), } # Available algos for each types: ALGO_BY_TYPES = {'cold': ('fair', )} def __init__(self, server, query_vm, query_dest, login): # The server instance for this election: self._server = server # The TQL query to select vm: self._query_vm = query_vm # The TQL query to select destination hypervisor: self._query_dest = query_dest # The login of the initiator of the election: self._login = login def election(self, mtype, algo): ''' Generate a new migration plan for this election. You must specify the migration type and the distribution algoritm. :param mtype: the migration type :param algo: the distribution algoritm ''' # Check the choosen election method: if mtype not in self.ALGO_BY_TYPES: raise UnknownElectionType('%r is unknown migration type' % mtype) else: if algo not in self.ALGO_BY_TYPES[mtype]: raise UnknownElectionAlgo('%r is unknown migration algo' % algo) else: func = '_algo_%s' % algo if not hasattr(self, func): raise UnknownElectionAlgo('%r not found' % func) else: distribute = getattr(self, func) # Get the destination hypervisor candidates: candidates = self._get_candidates(self.FILTERS[mtype]) # Distributes VMs to each candidate: migration_plan = distribute(candidates) # Return the migration plan: return migration_plan def _get_candidates(self, filters): # Get all the tags needed for hypervisors and construct the final # filter list: filterfuncs = [] hv_tags = set() for name, desc in filters: filterfunc = getattr(self, '_filter_%s' % name) filterfuncs.append((filterfunc, desc)) hv_tags |= getattr(filterfunc, '__tags__', set()) # Get the selected vms and hvs: vms = self._server.list(self._query_vm, show=('*',)) hvs = self._server.list(self._query_dest, show=hv_tags) candidates = [] # Filters the candidates: for vm in vms: if vm['r'] != 'vm': continue tql = 'id=%s' % vm['id'] if not self._server.check(self._login, 'coldmigrate', tql): continue vm_dest = copy(hvs) for func, desc in filterfuncs: vm_dest = func(vm, vm_dest) candidates.append((vm, vm_dest)) return candidates ##### ##### Distribution algorithm methods: ##### def _algo_fair(self, candidates): migration_plan = [] # Sort vm by number of destination hv: candidates = sorted(candidates, key=lambda x: len(x[1])) hv_alloc = {} for vm, hvs in candidates: if not hvs: migration_plan.append({'sid': vm['id'], 'did': '', 'error': 'no destination hv found', 'type': 'error'}) else: # Try to take an hypervisor that is not already in the plan: for hv in hvs: if hv['id'] not in hv_alloc: break else: # If all candidates for this VM are already in the # migration plan, we take the less allocated: hv = min(hvs, key=lambda x: hv_alloc[x['id']]) migration_plan.append({'sid': vm['id'], 'did': hv['id'], 'type': 'cold'}) hv_alloc[hv['id']] = hv_alloc.get(hv['id'], 0) + 1 return migration_plan ##### ##### Filtering methods: ##### @tags('r') def _filter_is_hv(self, vm, hvs): returned = [] for hv in hvs: if hv.get('r') == 'hv': returned.append(hv) return returned @tags('con') def _filter_is_connected(self, vm, hvs): returned = [] for hv in hvs: if hv.get('con'): returned.append(hv) return returned @tags('htype') def _filter_vm_htype_eq_hv(self, vm, hvs): returned = [] vm_htype = vm.get('htype') for hv in hvs: if hv.get('htype') == vm_htype: returned.append(hv) return returned @tags('id') def _filter_not_source_hv(self, vm, hvs): returned = [] hv_id, _, _ = vm['id'].partition('.') for hv in hvs: if hv['id'] != hv_id: returned.append(hv) return returned def _filter_has_rights(self, vm, hvs): returned = [] for hv in hvs: tql = 'id=%s' % hv['id'] if self._server.check(self._login, 'coldmigrate_dest', tql): returned.append(hv) return returned @tags('alloc') def _filter_has_alloc(self, vm, hvs): returned = [] for hv in hvs: if hv.get('alloc', False): returned.append(hv) return returned def _filter_duplicate_name(self, vm, hvs): returned = [] hv_id, _, vm_name = vm['id'].partition('.') for hv in hvs: vms = self._server.list('id:%s.*$h' % hv['id']) duplicate = False for vm in vms: if vm.get('h') == vm_name: duplicate = True break if not duplicate: returned.append(hv) return returned @tags('memfree') def _filter_enough_ram(self, vm, hvs): returned = [] for hv in hvs: if int(hv.get('memfree', 0)) >= int(self.vm.get('mem', 0)): returned.append(hv) return returned @tags('cpu') def _filter_enough_core(self, vm, hvs): returned = [] for hv in hvs: # Calculate the total number of vcpu used by VMs: vms = self.manager.server.list('id:%s.*$cpu' % hv['id']) count = 0 for vm in vms: count += int(vm.get('cpu', 0)) if int(hv.get('cpu', 0)) >= count + int(self.vm.get('cpu', 0)): returned.append(hv) return returned @tags('*') def _filter_enough_disk(self, vm, hvs): returned = [] return hvs # Calculate the size needed for each pools: pools = {} for disk in vm['disk'].split(): size = vm['disk%s_size' % disk] pool = vm['disk%s_pool' % disk] vol = vm['disk%s_vol' % disk] dpool = pools.get(pool, {'size': 0, 'vols': set()}) dpool['size'] += int(size) dpool['vols'].add(vol) pools[pool] = dpool # Check for each HV if it match: for hv in hvs: good = True for pool, prop in pools.iteritems(): free = int(hv.get('sto%s_free' % pool, 0)) vols = set(hv.get('sto%s_vol' % pool, '').split()) if free < prop['size'] or prop['vols'] & vols: good = False if good: returned.append(hv) return returned def _filter_not_locked(self, vm, hvs): if len(hvs) == 1: return hvs returned = [] for hv in hvs: lock = self.manager.server.get_connection(hv['id']).lock if not lock.locked(): returned.append(hv) if len(returned) == 0: return hvs return returned def _filter_biggest_id(self, hvs): return sorted(hvs, key=lambda x: x['id'])[-1]
ccserver/exceptions.py +13 −0 Original line number Diff line number Diff line Loading @@ -41,5 +41,18 @@ class BadJobTypeError(Exception): class UnknownJobError(Exception): pass class UnknownElectionType(Exception): pass class UnknownElectionAlgo(Exception): pass class UnknownMigrationType(Exception): pass class UnknownObjectError(Exception): pass
ccserver/handlers.py +57 −0 Original line number Diff line number Diff line Loading @@ -11,8 +11,13 @@ from ccserver.conf import CCConf from ccserver.exceptions import (AlreadyRegistered, AuthenticationError, RightError, ReservedTagError, BadObjectError, BadRoleError, NotConnectedAccountError) from ccserver.election import Elector from ccserver import __version__ MIGRATION_TYPES = {'cold': 'cold_migrate'} def listed(func): func.__listed__ = True return func Loading Loading @@ -666,6 +671,58 @@ class CliHandler(OnlineCCHandler): self._server.jobs.purge() @listed def electiontypes(self, conn): return Elector.ALGO_BY_TYPES @listed def election(self, conn, query_vm, query_dest, mtype='cold', algo='fair', **kwargs): ''' Consult the server for the migration of specified vm on an hypervisor pool. :param query_vm: the tql query to select VMs to migrate :param query_dest: the tql query to select destination hypervisors candidates :param mtype: type of migration :param algo: algo used for distribution ''' client = self._server.search_client_by_connection(conn) elector = Elector(self._server, query_vm, query_dest, client.login) return elector.election(mtype, algo, **kwargs) @listed def migrate(self, conn, migration_plan): ''' Launch the provided migration plan. :param migration_plan: the plan of the migration. :return: a standard error report ''' client = self._server.search_client_by_connection(conn) errs = Reporter() for migration in migration_plan: # Check if the migration type is know: if migration['type'] in MIGRATION_TYPES: mtype = MIGRATION_TYPES[migration['type']] else: errmsg = '%r unknown migration type' % migration['type'] errs.error(migration['sid'], errmsg) # Construct the migration properties: migration_properties = { 'author': client.login, 'vm_name': migration['sid'].split('.')[-1], 'hv_source': '.'.join(migration['sid'].split('.')[:-1]), 'hv_dest': migration['did'] } # Create the job: self._server.jobs.create(mtype, **migration_properties) errs.success(migration['sid'], 'migration launched') return errs.get_dict() @listed def dbstats(self, conn): Loading
ccserver/jobs.py +177 −1 Original line number Diff line number Diff line Loading @@ -12,7 +12,9 @@ import time from datetime import datetime from threading import Thread, Lock from ccserver.exceptions import BadJobTypeError, UnknownJobError from ccserver.exceptions import (BadJobTypeError, UnknownJobError, JobError, UnknownObjectError) from ccserver.utils import AcquiresAllOrNone class JobCancelError(Exception): ''' Loading Loading @@ -241,6 +243,179 @@ class KillOldCliJob(BaseJob): time.sleep(delay) class ColdMigrationJob(BaseJob): ''' A cold vm migration job. Mandatory items: * vm_name: name of the vm to migrate * hv_source: name of the hv which execute the VM * hv_dest: the destination hypervisor * author: login of the author cli ''' def job(self): vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) self['title'] = 'Cold migration %s --> %s' % (vm_id, self['hv_dest']) logging.info('Job-%s: Started migration for %s', self['id'], vm_id) # Cancel the job if the user has not the right to migrate the vm or to # select an hypervisor as destination: right_check = self.manager.server.check tql = 'id=%s' % vm_id if not right_check(self['author'], 'coldmigrate', tql): raise JobCancelError('author have no rights to migrate this VM') tql = 'id=%s' % self['hv_dest'] if not right_check(self['author'], 'coldmigrate_dest', tql): raise JobCancelError('author have no right to migrate to this hv') # Update the VM object: try: self.manager.server.objects.update(ids=(vm_id,)) vm = self.manager.server.objects.get_by_id(vm_id) except UnknownObjectError: raise JobCancelError('Source VM not found') # Get the source and destination hv clients: try: source = self.manager.server.get_connection(self['hv_source']) except KeyError: raise JobCancelError('source hypervisor is not connected') try: dest = self.manager.server.get_connection(self['hv_dest']) except KeyError: raise JobCancelError('destination hypervisor is not connected') self.checkpoint() self.report('waiting lock for source and dest hypervisors') logging.info('Job-%s: Trying to acquire locks', self['id']) with AcquiresAllOrNone(source.lock, dest.lock): logging.info('Job-%s: Locks acquired', self['id']) self.checkpoint() if not self._check_status(vm_id, 'stopped'): raise JobCancelError('vm is not stopped') # Create storages on destination: self.report('create volumes') for disk in vm.get('disk', '').split(): # Getting informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) size = vm.get('disk%s_size' % disk) assert pool is not None, 'pool tag doesn\'t exists' assert name is not None, 'name tag doesn\'t exists' assert size is not None, 'size tag doesn\'t exists' # Create the volume on destination: dest.proxy.vol_create(pool, name, int(size)) logging.info('Job-%s: Created volume %s/%s on destination ' 'hypervisor', self['id'], pool, name) # Rollback stuff for this action: def rb_volcreate(): dest.proxy.vol_delete(pool, name) self.checkpoint(rb_volcreate) # Define VM: self.report('define vm') logging.info('Job-%s: XML configuration transfert', self['id']) vm_config = source.proxy.vm_export(self['vm_name']) dest.proxy.vm_define(vm_config) # Rollback stuff for vm definition: def rb_define(): dest.proxy.vm_undefine(self['vm_name']) self.checkpoint(rb_define) # Copy all source disk on destination disk: for disk in vm.get('disk', '').split(): self._copy_disk(source, dest, vm, disk) # At this point, if operation is a success, all we need is just to # cleanup source hypervisor from disk and vm. This operation *CAN'T* # be cancelled or rollbacked if anything fails (unlikely). The # migration must be considered as a success, and the only way to # undo this is to start a new migration in the other way. # Delete the rollback list. # This is mandatory to avoid data loss if the cleanup # code below fail. self._wayback = [] # Cleanup the disks: for disk in vm.get('disk', '').split(): pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) source.proxy.vol_delete(pool, name) # Cleanup the VM: source.proxy.vm_undefine(self['vm_name']) logging.info('Job-%s: Migration completed with success', self['id']) def _check_status(self, vm_id, status): ''' Check the status of the VM. ''' answer = self.manager.server.list('id=%s&status=%s' % (vm_id, status)) return bool(answer) def _copy_disk(self, source, dest, vm, disk): ''' Copy the specified disk name of the vm from source to dest. ''' # Get informations about the disk: pool = vm.get('disk%s_pool' % disk) name = vm.get('disk%s_vol' % disk) logging.info('Job-%s: Started copy for %s/%s', self['id'], pool, name) self.report('copy %s/%s' % (pool, name)) # Make the copy and wait for it end: xferprop = dest.proxy.vol_import(pool, name) # Register the cancel function: def cancel_xfer(): dest.proxy.vol_import_cancel(xferprop['id']) self['func_cancel_xfer'] = cancel_xfer # Wait for the end of transfert: cids = set() cids.add(source.connection.async_call('vol_export', pool, name, dest.get_ip(), xferprop['port'])) cids.add(dest.connection.async_call('vol_import_wait', xferprop['id'])) msgs = self.manager.server.manager.wait(frozenset(cids)) del self['func_cancel_xfer'] # Compare checksum of two answers: checksums = [] assert len(msgs) == 2 for msg in msgs: if msg.get('error') is not None: msg = 'error while copy: %s' % msg['error']['message'] raise JobCancelError(msg) else: checksums.append(msg.get('checksum')) self.checkpoint() if checksums[0] != checksums[1]: raise JobCancelError('checksum mismatches') def cancel(self): if self.get('func_cancel_xfer') is not None: self.get('func_cancel_xfer')() super(ColdMigrationJob, self).cancel() class JobsManager(object): ''' Loading @@ -252,6 +427,7 @@ class JobsManager(object): JOBS_TYPES = { 'kill': KillClientJob, 'kill_oldcli': KillOldCliJob, 'cold_migrate': ColdMigrationJob, } def __init__(self, server): Loading