diff --git a/ccserver/election.py b/ccserver/election.py new file mode 100644 index 0000000000000000000000000000000000000000..c41dd31eda275c35652e9ddb2719237d7fb2bb44 --- /dev/null +++ b/ccserver/election.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python +#coding=utf8 + +''' +This module contains the hypervisor destination election stuff. +''' + +from __future__ import absolute_import + +from copy import copy + +from ccserver.exceptions import UnknownElectionAlgo, UnknownElectionType + +def tags(*args): + ''' + Decorator used to declare tags used by a filter. + ''' + + def decorator(func): + func.__tags__ = set(args) + return func + return decorator + +class Elector(object): + + # Filtering function for destination hypervisors: + FILTERS = { + 'cold': (('is_hv', 'filter r=hv'), + ('not_source_hv', 'filter source hv'), + ('is_connected', 'filter connected hv'), + ('vm_htype_eq_hv', 'filter bad hv types'), + ('has_rights', 'filter rights'), + ('has_alloc', 'filter allocatable hv'), + ('duplicate_name', 'filter vm duplicate names'), + ('enough_disk', 'filter hv with not enough disk'),), + } + + # Available algos for each types: + ALGO_BY_TYPES = {'cold': ('fair', )} + + def __init__(self, server, query_vm, query_dest, login): + # The server instance for this election: + self._server = server + + # The TQL query to select vm: + self._query_vm = query_vm + + # The TQL query to select destination hypervisor: + self._query_dest = query_dest + + # The login of the initiator of the election: + self._login = login + + def election(self, mtype, algo): + ''' + Generate a new migration plan for this election. You must specify the + migration type and the distribution algoritm. + + :param mtype: the migration type + :param algo: the distribution algoritm + ''' + + # Check the choosen election method: + if mtype not in self.ALGO_BY_TYPES: + raise UnknownElectionType('%r is unknown migration type' % mtype) + else: + if algo not in self.ALGO_BY_TYPES[mtype]: + raise UnknownElectionAlgo('%r is unknown migration algo' % algo) + else: + func = '_algo_%s' % algo + if not hasattr(self, func): + raise UnknownElectionAlgo('%r not found' % func) + else: + distribute = getattr(self, func) + + # Get the destination hypervisor candidates: + candidates = self._get_candidates(self.FILTERS[mtype]) + + # Distributes VMs to each candidate: + migration_plan = distribute(candidates) + + # Return the migration plan: + return migration_plan + + def _get_candidates(self, filters): + # Get all the tags needed for hypervisors and construct the final + # filter list: + filterfuncs = [] + hv_tags = set() + + for name, desc in filters: + filterfunc = getattr(self, '_filter_%s' % name) + filterfuncs.append((filterfunc, desc)) + hv_tags |= getattr(filterfunc, '__tags__', set()) + + # Get the selected vms and hvs: + vms = self._server.list(self._query_vm, show=('*',)) + hvs = self._server.list(self._query_dest, show=hv_tags) + + candidates = [] + + # Filters the candidates: + for vm in vms: + if vm['r'] != 'vm': + continue + tql = 'id=%s' % vm['id'] + if not self._server.check(self._login, 'coldmigrate', tql): + continue + vm_dest = copy(hvs) + for func, desc in filterfuncs: + vm_dest = func(vm, vm_dest) + candidates.append((vm, vm_dest)) + + return candidates + +##### +##### Distribution algorithm methods: +##### + + def _algo_fair(self, candidates): + migration_plan = [] + + # Sort vm by number of destination hv: + candidates = sorted(candidates, key=lambda x: len(x[1])) + + hv_alloc = {} + for vm, hvs in candidates: + if not hvs: + migration_plan.append({'sid': vm['id'], + 'did': '', + 'error': 'no destination hv found', + 'type': 'error'}) + else: + # Try to take an hypervisor that is not already in the plan: + for hv in hvs: + if hv['id'] not in hv_alloc: + break + else: + # If all candidates for this VM are already in the + # migration plan, we take the less allocated: + hv = min(hvs, key=lambda x: hv_alloc[x['id']]) + + migration_plan.append({'sid': vm['id'], 'did': hv['id'], + 'type': 'cold'}) + hv_alloc[hv['id']] = hv_alloc.get(hv['id'], 0) + 1 + + return migration_plan + +##### +##### Filtering methods: +##### + + @tags('r') + def _filter_is_hv(self, vm, hvs): + returned = [] + for hv in hvs: + if hv.get('r') == 'hv': + returned.append(hv) + return returned + + @tags('con') + def _filter_is_connected(self, vm, hvs): + returned = [] + for hv in hvs: + if hv.get('con'): + returned.append(hv) + return returned + + @tags('htype') + def _filter_vm_htype_eq_hv(self, vm, hvs): + returned = [] + vm_htype = vm.get('htype') + for hv in hvs: + if hv.get('htype') == vm_htype: + returned.append(hv) + return returned + + @tags('id') + def _filter_not_source_hv(self, vm, hvs): + returned = [] + hv_id, _, _ = vm['id'].partition('.') + for hv in hvs: + if hv['id'] != hv_id: + returned.append(hv) + return returned + + def _filter_has_rights(self, vm, hvs): + returned = [] + for hv in hvs: + tql = 'id=%s' % hv['id'] + if self._server.check(self._login, 'coldmigrate_dest', tql): + returned.append(hv) + return returned + + @tags('alloc') + def _filter_has_alloc(self, vm, hvs): + returned = [] + for hv in hvs: + if hv.get('alloc', False): + returned.append(hv) + return returned + + def _filter_duplicate_name(self, vm, hvs): + returned = [] + hv_id, _, vm_name = vm['id'].partition('.') + for hv in hvs: + vms = self._server.list('id:%s.*$h' % hv['id']) + duplicate = False + for vm in vms: + if vm.get('h') == vm_name: + duplicate = True + break + if not duplicate: + returned.append(hv) + return returned + + @tags('memfree') + def _filter_enough_ram(self, vm, hvs): + returned = [] + for hv in hvs: + if int(hv.get('memfree', 0)) >= int(self.vm.get('mem', 0)): + returned.append(hv) + return returned + + @tags('cpu') + def _filter_enough_core(self, vm, hvs): + returned = [] + for hv in hvs: + # Calculate the total number of vcpu used by VMs: + vms = self.manager.server.list('id:%s.*$cpu' % hv['id']) + count = 0 + for vm in vms: + count += int(vm.get('cpu', 0)) + if int(hv.get('cpu', 0)) >= count + int(self.vm.get('cpu', 0)): + returned.append(hv) + return returned + + @tags('*') + def _filter_enough_disk(self, vm, hvs): + returned = [] + + return hvs + + # Calculate the size needed for each pools: + pools = {} + for disk in vm['disk'].split(): + size = vm['disk%s_size' % disk] + pool = vm['disk%s_pool' % disk] + vol = vm['disk%s_vol' % disk] + dpool = pools.get(pool, {'size': 0, 'vols': set()}) + dpool['size'] += int(size) + dpool['vols'].add(vol) + pools[pool] = dpool + + # Check for each HV if it match: + for hv in hvs: + good = True + for pool, prop in pools.iteritems(): + free = int(hv.get('sto%s_free' % pool, 0)) + vols = set(hv.get('sto%s_vol' % pool, '').split()) + + if free < prop['size'] or prop['vols'] & vols: + good = False + + if good: + returned.append(hv) + return returned + + def _filter_not_locked(self, vm, hvs): + if len(hvs) == 1: + return hvs + + returned = [] + for hv in hvs: + lock = self.manager.server.get_connection(hv['id']).lock + if not lock.locked(): + returned.append(hv) + + if len(returned) == 0: + return hvs + + return returned + + def _filter_biggest_id(self, hvs): + return sorted(hvs, key=lambda x: x['id'])[-1] diff --git a/ccserver/exceptions.py b/ccserver/exceptions.py index 653174400ca809f54d0eeec41d02789b490bf807..3836ef85f756513842558d40d8bc89432fb7e287 100644 --- a/ccserver/exceptions.py +++ b/ccserver/exceptions.py @@ -41,5 +41,18 @@ class BadJobTypeError(Exception): class UnknownJobError(Exception): pass + +class UnknownElectionType(Exception): + pass + + +class UnknownElectionAlgo(Exception): + pass + + +class UnknownMigrationType(Exception): + pass + + class UnknownObjectError(Exception): pass diff --git a/ccserver/handlers.py b/ccserver/handlers.py index 76b336f1df517b2f2f6e5f3e904c65ebab9a9a31..8e9822905f71dd50c3244130078036ee51bebed0 100644 --- a/ccserver/handlers.py +++ b/ccserver/handlers.py @@ -11,8 +11,13 @@ from ccserver.conf import CCConf from ccserver.exceptions import (AlreadyRegistered, AuthenticationError, RightError, ReservedTagError, BadObjectError, BadRoleError, NotConnectedAccountError) +from ccserver.election import Elector from ccserver import __version__ + +MIGRATION_TYPES = {'cold': 'cold_migrate'} + + def listed(func): func.__listed__ = True return func @@ -666,6 +671,58 @@ class CliHandler(OnlineCCHandler): self._server.jobs.purge() + @listed + def electiontypes(self, conn): + return Elector.ALGO_BY_TYPES + + @listed + def election(self, conn, query_vm, query_dest, mtype='cold', algo='fair', + **kwargs): + ''' + Consult the server for the migration of specified vm on an hypervisor + pool. + + :param query_vm: the tql query to select VMs to migrate + :param query_dest: the tql query to select destination hypervisors + candidates + :param mtype: type of migration + :param algo: algo used for distribution + ''' + client = self._server.search_client_by_connection(conn) + elector = Elector(self._server, query_vm, query_dest, client.login) + return elector.election(mtype, algo, **kwargs) + + @listed + def migrate(self, conn, migration_plan): + ''' + Launch the provided migration plan. + + :param migration_plan: the plan of the migration. + :return: a standard error report + ''' + client = self._server.search_client_by_connection(conn) + errs = Reporter() + for migration in migration_plan: + # Check if the migration type is know: + if migration['type'] in MIGRATION_TYPES: + mtype = MIGRATION_TYPES[migration['type']] + else: + errmsg = '%r unknown migration type' % migration['type'] + errs.error(migration['sid'], errmsg) + + # Construct the migration properties: + migration_properties = { + 'author': client.login, + 'vm_name': migration['sid'].split('.')[-1], + 'hv_source': '.'.join(migration['sid'].split('.')[:-1]), + 'hv_dest': migration['did'] + } + + # Create the job: + self._server.jobs.create(mtype, **migration_properties) + errs.success(migration['sid'], 'migration launched') + + return errs.get_dict() @listed def dbstats(self, conn): diff --git a/ccserver/jobs.py b/ccserver/jobs.py index 2b7b60e936e868d8d98d439bf91e55aea5b959a7..215cbcd2feb546db4e5300700d6c52a90ef42a08 100644 --- a/ccserver/jobs.py +++ b/ccserver/jobs.py @@ -12,7 +12,9 @@ import time from datetime import datetime from threading import Thread, Lock -from ccserver.exceptions import BadJobTypeError, UnknownJobError +from ccserver.exceptions import (BadJobTypeError, UnknownJobError, JobError, + UnknownObjectError) +from ccserver.utils import AcquiresAllOrNone class JobCancelError(Exception): ''' @@ -241,6 +243,179 @@ class KillOldCliJob(BaseJob): time.sleep(delay) +class ColdMigrationJob(BaseJob): + + ''' + A cold vm migration job. + + Mandatory items: + * vm_name: name of the vm to migrate + * hv_source: name of the hv which execute the VM + * hv_dest: the destination hypervisor + * author: login of the author cli + ''' + + def job(self): + vm_id = '%s.%s' % (self['hv_source'], self['vm_name']) + + self['title'] = 'Cold migration %s --> %s' % (vm_id, self['hv_dest']) + logging.info('Job-%s: Started migration for %s', self['id'], vm_id) + + # Cancel the job if the user has not the right to migrate the vm or to + # select an hypervisor as destination: + right_check = self.manager.server.check + + tql = 'id=%s' % vm_id + if not right_check(self['author'], 'coldmigrate', tql): + raise JobCancelError('author have no rights to migrate this VM') + + tql = 'id=%s' % self['hv_dest'] + if not right_check(self['author'], 'coldmigrate_dest', tql): + raise JobCancelError('author have no right to migrate to this hv') + + # Update the VM object: + try: + self.manager.server.objects.update(ids=(vm_id,)) + vm = self.manager.server.objects.get_by_id(vm_id) + except UnknownObjectError: + raise JobCancelError('Source VM not found') + + # Get the source and destination hv clients: + try: + source = self.manager.server.get_connection(self['hv_source']) + except KeyError: + raise JobCancelError('source hypervisor is not connected') + + try: + dest = self.manager.server.get_connection(self['hv_dest']) + except KeyError: + raise JobCancelError('destination hypervisor is not connected') + + self.checkpoint() + + self.report('waiting lock for source and dest hypervisors') + logging.info('Job-%s: Trying to acquire locks', self['id']) + + with AcquiresAllOrNone(source.lock, dest.lock): + logging.info('Job-%s: Locks acquired', self['id']) + self.checkpoint() + + if not self._check_status(vm_id, 'stopped'): + raise JobCancelError('vm is not stopped') + + # Create storages on destination: + self.report('create volumes') + for disk in vm.get('disk', '').split(): + # Getting informations about the disk: + pool = vm.get('disk%s_pool' % disk) + name = vm.get('disk%s_vol' % disk) + size = vm.get('disk%s_size' % disk) + assert pool is not None, 'pool tag doesn\'t exists' + assert name is not None, 'name tag doesn\'t exists' + assert size is not None, 'size tag doesn\'t exists' + + # Create the volume on destination: + dest.proxy.vol_create(pool, name, int(size)) + logging.info('Job-%s: Created volume %s/%s on destination ' + 'hypervisor', self['id'], pool, name) + + # Rollback stuff for this action: + def rb_volcreate(): + dest.proxy.vol_delete(pool, name) + self.checkpoint(rb_volcreate) + + # Define VM: + self.report('define vm') + logging.info('Job-%s: XML configuration transfert', self['id']) + vm_config = source.proxy.vm_export(self['vm_name']) + dest.proxy.vm_define(vm_config) + + # Rollback stuff for vm definition: + def rb_define(): + dest.proxy.vm_undefine(self['vm_name']) + self.checkpoint(rb_define) + + # Copy all source disk on destination disk: + for disk in vm.get('disk', '').split(): + self._copy_disk(source, dest, vm, disk) + + # At this point, if operation is a success, all we need is just to + # cleanup source hypervisor from disk and vm. This operation *CAN'T* + # be cancelled or rollbacked if anything fails (unlikely). The + # migration must be considered as a success, and the only way to + # undo this is to start a new migration in the other way. + + # Delete the rollback list. + # This is mandatory to avoid data loss if the cleanup + # code below fail. + self._wayback = [] + + # Cleanup the disks: + for disk in vm.get('disk', '').split(): + pool = vm.get('disk%s_pool' % disk) + name = vm.get('disk%s_vol' % disk) + + source.proxy.vol_delete(pool, name) + + # Cleanup the VM: + source.proxy.vm_undefine(self['vm_name']) + + logging.info('Job-%s: Migration completed with success', self['id']) + + def _check_status(self, vm_id, status): + ''' + Check the status of the VM. + ''' + + answer = self.manager.server.list('id=%s&status=%s' % (vm_id, status)) + return bool(answer) + + def _copy_disk(self, source, dest, vm, disk): + ''' + Copy the specified disk name of the vm from source to dest. + ''' + + # Get informations about the disk: + pool = vm.get('disk%s_pool' % disk) + name = vm.get('disk%s_vol' % disk) + logging.info('Job-%s: Started copy for %s/%s', self['id'], pool, name) + self.report('copy %s/%s' % (pool, name)) + + # Make the copy and wait for it end: + xferprop = dest.proxy.vol_import(pool, name) + + # Register the cancel function: + def cancel_xfer(): + dest.proxy.vol_import_cancel(xferprop['id']) + self['func_cancel_xfer'] = cancel_xfer + + # Wait for the end of transfert: + cids = set() + cids.add(source.connection.async_call('vol_export', pool, name, + dest.get_ip(), xferprop['port'])) + cids.add(dest.connection.async_call('vol_import_wait', xferprop['id'])) + msgs = self.manager.server.manager.wait(frozenset(cids)) + del self['func_cancel_xfer'] + + # Compare checksum of two answers: + checksums = [] + assert len(msgs) == 2 + for msg in msgs: + if msg.get('error') is not None: + msg = 'error while copy: %s' % msg['error']['message'] + raise JobCancelError(msg) + else: + checksums.append(msg.get('checksum')) + self.checkpoint() + + if checksums[0] != checksums[1]: + raise JobCancelError('checksum mismatches') + + def cancel(self): + if self.get('func_cancel_xfer') is not None: + self.get('func_cancel_xfer')() + super(ColdMigrationJob, self).cancel() + class JobsManager(object): ''' @@ -252,6 +427,7 @@ class JobsManager(object): JOBS_TYPES = { 'kill': KillClientJob, 'kill_oldcli': KillOldCliJob, + 'cold_migrate': ColdMigrationJob, } def __init__(self, server):