Commit 065e6dcf authored by Antoine Millet's avatar Antoine Millet
Browse files

Implemented new VM migrations

parent 74b6f864
Loading
Loading
Loading
Loading
+26 −1
Original line number Diff line number Diff line
@@ -28,7 +28,8 @@ from cloudcontrol.server.repository import RepositoryOperationError
from cloudcontrol.server.handlers import listed, Reporter
from cloudcontrol.server.clients import Client, RegisteredCCHandler
from cloudcontrol.server.jobs import (ColdMigrationJob, HotMigrationJob,
                                      CloneJob, KillClientJob, AllocationJob)
                                      CloneJob, KillClientJob, AllocationJob,
                                      MigrationJob)
from cloudcontrol.common.tql.db.tag import StaticTag

MIGRATION_TYPES = {'cold': ColdMigrationJob,
@@ -1106,6 +1107,30 @@ class CliHandler(RegisteredCCHandler):

        return errs.get_dict()

    @listed
    def migrate2(self, tql_vm, tql_target, batch=None, live=False, flags=None):
        """ Launch a migration.
        """

        self.client.check('migrate')
        vms = self.client.list(tql_vm, show=('r', ), method='migrate')
        errs = Reporter()

        for vm in vms:
            if vm['r'] != 'vm':
                errs.error(vm['id'], 'not a vm')
            else:
                settings = {'server': self.server,
                            'client': self.client,
                            'vm_id': vm['id'],
                            'tql_target': tql_target,
                            'live': live,
                            'flags': flags}

                job = self.client.spawn_job(MigrationJob, batch=batch, settings=settings)
                errs.success(vm['id'], 'migration launched, id:%s' % job.id)
        return errs.get_dict()

    @listed
    def clone(self, tql_vm, tql_dest, name):
        """ Create and launch a clone job.
+2 −1
Original line number Diff line number Diff line
@@ -23,7 +23,8 @@ from cloudcontrol.server.jobs.clone import CloneJob
from cloudcontrol.server.jobs.killoldcli import KillOldCliJob
from cloudcontrol.server.jobs.killclient import KillClientJob
from cloudcontrol.server.jobs.allocation import AllocationJob
from cloudcontrol.server.jobs.migration import MigrationJob


__all__ = ('ColdMigrationJob', 'HotMigrationJob', 'CloneJob', 'KillOldCliJob',
           'KillClientJob', 'AllocationJob')
 No newline at end of file
           'KillClientJob', 'AllocationJob', 'MigrationJob')
+237 −0
Original line number Diff line number Diff line
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl.  If not, see <http://www.gnu.org/licenses/>.

import time

from sjrpc.core import AsyncWatcher

from cloudcontrol.common.jobs import Job, JobCancelError

from cloudcontrol.server.jobs import AllocationJob
from cloudcontrol.server.allocator import (Allocator, IsAllocatable, TargetFilter,
                                           IsConnected, SatisfyRiskGroups, HaveEnoughCPU,
                                           HaveEnoughMemory, HaveEnoughStorage)

MIGRATION_RETRY = 30
MIGRATION_FILTERS = [IsAllocatable, TargetFilter, IsConnected, SatisfyRiskGroups,
                     HaveEnoughCPU, HaveEnoughMemory, HaveEnoughStorage]


class MigrationJob(Job):

    """ Migration job of a virtual machine.
    """

    # Migration global state
    migrating_hv = set()

    def job(self, server, client, vm_id, tql_target, live=False, flags=None):
        self._func_cancel_xfer = None  # Callback to a function used to cancel a disk transfert

        if live:
            self.logger.info('Started live migration of %s', vm_id)
            self.title = 'Migration %s --> TBD (live)' % vm_id
        else:
            self.logger.info('Started offline migration of %s', vm_id)
            self.title = 'Migration %s --> TBD (offline)' % vm_id

        # Update the VM object:
        vm = server.db.get_by_id(vm_id)
        if vm is None:
            raise JobCancelError('Source VM not found')

        # Get the source hypervisor client:
        try:
            source = server.get_client(vm['p'])
        except KeyError:
            raise JobCancelError('source hypervisor is not connected')

        # Remove the current hv of target
        tql_target = '(%s)/id:%s' % (tql_target, vm['p'])

        self.report('waiting source hypervisor for migration')
        with source.hvlock:
            self.report('looking for a candidate')

            # Looking for a candidate to run a migration:
            while True:
                self.checkpoint()
                # Generate the VM spec:
                vmspec = {'name': vm['h'],
                          'cpu': vm['cpu'],
                          'memory': vm['mem'],
                          'volumes': []}
                for disk in vm.get('disk', '').split():
                    vmspec['volumes'].append({'pool': vm['disk%s_pool' % disk], 'size': int(vm['disk%s_size' % disk])})
                if 'target' in vm:
                    vmspec['target'] = vm['target']

                # Get candidates:
                allocator = Allocator(self.logger.getChild('allocator'), server, client, filters=MIGRATION_FILTERS)
                with AllocationJob.allocation_lock:
                    candidates = allocator.allocate(vmspec, tql_target)

                    # Choose a candidate which is not already involved in a migration
                    for candidate in candidates:
                        if candidate not in MigrationJob.migrating_hv:
                            break
                        self.logger.info('Candidate %s is already busy', candidate)
                    else:
                        self.report('all candidates destinations are busy, retrying...')
                        time.sleep(MIGRATION_RETRY)
                        continue

                    MigrationJob.migrating_hv.add(candidate)
                    self.checkpoint(lambda: MigrationJob.migrating_hv.discard(candidate))
                    MigrationJob.migrating_hv.add(vm['p'])
                    self.checkpoint(lambda: MigrationJob.migrating_hv.discard(vm['p']))
                    break

            # Prepare migration
            self.logger.info('Found candidate: %s', candidate)
            if live:
                self.title = 'Migration %s --> %s (live)' % (vm_id, candidate)
            else:
                self.title = 'Migration %s --> %s (offline)' % (vm_id, candidate)

            # Get the destination hv client:
            try:
                dest = server.get_client(candidate)
            except KeyError:
                raise JobCancelError('destination hypervisor is not connected')

            dest_tags = server.db.get_by_id(candidate, tags=['vir_uri'])

            self.checkpoint()

            # Check vm status:
            if live and vm['status'] != 'running':
                raise JobCancelError('vm is not running')
            elif not live and vm['status'] != 'stopped':
                raise JobCancelError('vm is not stopped')

            # Offline migration, doing the volume migration by ourself
            if not live:
                # Create storages on destination:
                self.report('create volumes')
                for disk in vm.get('disk', '').split():
                    if vm.get('disk%s_shared' % disk) == 'no':
                        # Getting informations about the disk:
                        pool = vm.get('disk%s_pool' % disk)
                        name = vm.get('disk%s_vol' % disk)
                        size = vm.get('disk%s_size' % disk)
                        assert pool is not None, 'pool tag doesn\'t exists'
                        assert name is not None, 'name tag doesn\'t exists'
                        assert size is not None, 'size tag doesn\'t exists'

                        # Create the volume on destination:
                        dest.proxy.vol_create(pool, name, int(size))
                        self.logger.info('Created volume %s/%s on destination '
                                         'hypervisor', pool, name)

                        # Rollback stuff for this action:
                        def rb_volcreate():
                            dest.proxy.vol_delete(pool, name)
                        self.checkpoint(rb_volcreate)

                # Copy all source disk on destination disk:
                for disk in vm.get('disk', '').split():
                    if vm.get('disk%s_shared' % disk) == 'no':
                        self._copy_disk(source, dest, vm, disk)

            # At this point, if operation is a success, all we need is just to
            # cleanup source hypervisor from disk and vm. This operation *CAN'T*
            # be cancelled or rollbacked if anything fails (unlikely). The
            # migration must be considered as a success, and the only way to
            # undo this is to start a new migration in the other way.

            # Delete the rollback list.
            # This is mandatory to avoid data loss if the cleanup
            # code below fail.
            self._wayback = []
            self.checkpoint(lambda: MigrationJob.migrating_hv.discard(candidate))
            self.checkpoint(lambda: MigrationJob.migrating_hv.discard(vm['p']))

            self.report('migration in progress')
            source.proxy.vm_migrate(vm['h'], dest_tags['vir_uri'], live=live, _timeout=None)

            if not live:
                # Cleanup the disks on source:
                for disk in vm.get('disk', '').split():
                    if vm.get('disk%s_shared' % disk) == 'no':
                        pool = vm.get('disk%s_pool' % disk)
                        name = vm.get('disk%s_vol' % disk)

                        source.proxy.vol_delete(pool, name)

            self.logger.info('Migration completed with success')

            # Release source and dest hypervisors:
            MigrationJob.migrating_hv.discard(candidate)
            MigrationJob.migrating_hv.discard(vm['p'])

    def _copy_disk(self, source, dest, vm, disk):
        """ Copy the specified disk name of the vm from source to dest.
        """

        # Get informations about the disk:
        pool = vm.get('disk%s_pool' % disk)
        name = vm.get('disk%s_vol' % disk)
        self.logger.info('Started copy for %s/%s', pool, name)
        self.report('copy %s/%s' % (pool, name))

        # Make the copy and wait for it end:
        xferprop = dest.proxy.vol_import(pool, name)

        # Register the cancel function:
        def cancel_xfer():
            dest.proxy.vol_import_cancel(xferprop['id'])
        self._func_cancel_xfer = cancel_xfer

        # Wait for the end of transfert:
        watcher = AsyncWatcher()
        watcher.register(source.conn, 'vol_export', pool, name, dest.ip, xferprop['port'])
        watcher.register(dest.conn, 'vol_import_wait', xferprop['id'])

        # Loop to poll status while job is running
        job_running = True
        while job_running:
            # Timeout to get transfered bytes and update status
            msgs = watcher.wait(timeout=10)
            if not msgs:
                _, percent = dest.proxy.vol_transfer_status(xferprop['id'])
                self.report('copy %s/%s: %s%% done' % (pool, name, percent))
            else:
                job_running = False
        self._func_cancel_xfer = None

        # Compare checksum of two answers:
        checksums = []
        assert len(msgs) == 2
        for msg in msgs:
            if msg.get('error') is not None:
                msg = 'error while copy: %s' % msg['error']['message']
                raise JobCancelError(msg)
            else:
                checksums.append(msg['return'].get('checksum'))
                self.checkpoint()

        if checksums[0] != checksums[1]:
            raise JobCancelError('checksum mismatches')

    def cancel(self):
        if self._func_cancel_xfer is not None:
            self._func_cancel_xfer()
        super(MigrationJob, self).cancel()