Commit 6f0de0ef authored by Anael Beutot's avatar Anael Beutot
Browse files

Implemented live migration with libev child watcher

parent 2903d8d2
Loading
Loading
Loading
Loading
+11 −147
Original line number Diff line number Diff line
import logging
import os
import sys
import signal
import socket
import time
import traceback
from StringIO import StringIO
from xml.etree import cElementTree as et

@@ -14,14 +9,13 @@ from cloudcontrol.common.client.tags import Tag, tag_inspector

from cloudcontrol.node.host import Handler as HostHandler
from cloudcontrol.node.hypervisor import tags
from cloudcontrol.node.hypervisor.kvm import KVM
from cloudcontrol.node.hypervisor.kvm import KVM, LiveMigration
from cloudcontrol.node.exc import (
    UndefinedDomain, DRBDError, VMMigrationError, PoolStorageError
    UndefinedDomain, DRBDError, PoolStorageError
)
from cloudcontrol.node.hypervisor.jobs import (
    ImportVolume, ExportVolume, TCPTunnel, DRBD,
)
from cloudcontrol.node.utils import close_fds, set_signal_map


logger = logging.getLogger(__name__)
@@ -315,10 +309,10 @@ class Handler(HostHandler):
        :param name: VM name to migrate
        :param tun_res: result of tunnel_setup handler
        :param migtun_res: result of tunnel setup handler
        :param unsafe: for Libvirt >= 0.9.11, see
            http://libvirt.org/html/libvirt-libvirt.html#virDomainMigrateFlags
        :param bool unsafe: unsafe migration
        :param float timeout: timeout for libvirt migration (prevents libvirt
            from trying to acquire domain lock forever)
        :param float timeout: migration timeout in seconds
        """
        logger.debug('VM live migrate %s', name)

@@ -345,146 +339,16 @@ class Handler(HostHandler):
                             ' migration', name)
            raise

        # we open a new connection to libvirt and fork because sometimes libvirt
        # python binding, while doing a operation,
        # doesn't seem to realease CPython's GIL, therefore all node
        # operations are blocked
        # the only solution we have found right now is to use a dedicated
        # libvirt connection for the migration and fork, the migration operation
        # in itself is handled by the child while other threads can be scheduled

        try:
            pid = os.fork()
        except OSError:
            logger.error('Cannot fork before running live migration')
            raise

        if pid == 0:
            # child
            sys.stderr.write('Hello from child !\n')
            sys.stderr.write('Debug is %s\n' % self.main.config.debug)
            try:
                close_fds(debug=self.main.config.debug)
                set_signal_map({
                    signal.SIGTERM: lambda *args: os._exit(1),
                    signal.SIGUSR1: signal.SIG_IGN,
                    signal.SIGINT: signal.SIG_IGN,
                    # FIXME need more signal ?
                })
            except:
                sys.stderr.write('Error while performing post fork work\n')
                traceback.print_exc(file=sys.stderr)
            # create a new libvirt connection dedicated to migration
            sys.stderr.write('Open new connection to libvirt\n')
            try:
                new_con = libvirt.open('qemu:///system')
                domain = new_con.lookupByUUIDString(vm.uuid)
            except libvirt.libvirtError:
                sys.stderr.write('Cannot connect to libvirt\n')
                os._exit(4)
            except:
                # error
                traceback.print_exc(sys.stderr)
                os._exit(2)
            sys.stderr.write('Open destination libvirt connection\n')
        migration = LiveMigration(self.main, vm, remote_virt_port,
                                  remote_virt_port2, timeout, unsafe)
        try:
                dest_virt_con = libvirt.open(
                    'qemu+tcp://127.0.0.1:%d/system' % remote_virt_port)
            except libvirt.libvirtError:
                sys.stderr.write('Cannot connect to remote libvirt for live'
                                 ' migrating vm %s', name)
                os._exit(5)
            except:
                # error
                traceback.print_exc(file=sys.stderr)
                os._exit(2)

            try:
                if unsafe:
                    # VIR_MIGRATE_UNSAFE is not defined for libvirt < 0.9.11
                    append_flags = getattr(libvirt, 'VIR_MIGRATE_UNSAFE', 0)
                else:
                    append_flags = 0
                sys.stderr.write('Do migrate\n')
                domain.migrate(
                    dest_virt_con,
                    libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_PEER2PEER |
                    libvirt.VIR_MIGRATE_TUNNELLED |
                    libvirt.VIR_MIGRATE_PERSIST_DEST |
                    libvirt.VIR_MIGRATE_UNDEFINE_SOURCE |
                    append_flags,
                    None,
                    'qemu+tcp://127.0.0.1:%d/system' % remote_virt_port2,
                    0,
                )
            except libvirt.libvirtError:
                sys.stderr('libvirt error during migration\n')
                traceback.print_exc(file=sys.stderr)
                os._exit(1)
            except:
                # whatever the matter is we MUST NOT return to libev or sjRPC
                sys.stderr('error during migration\n')
                traceback.print_exc(file=sys.stderr)
                os._exit(2)
            else:
                os._exit(0)
            finally:
                new_con.close()
                dest_virt_con.close()
        else:
            # watch for migration status every second
            started_migration = time.time()
            while True:
                # wait timeout
                time.sleep(1.)

                # waitpid with no delay
                try:
                    rpid, status = os.waitpid(pid, os.WNOHANG)
                except OSError as exc:
                    logger.error('Error while waiting for child to terminate: %s',
                                 exc.strerror)
                    raise

                # convert status to return status
                status >>= 8
                logger.debug('Status: %s', status)
                if rpid == status == 0:
                    if time.time() - started_migration < timeout:
                        continue

                    # waitpid returned immediately, thus migration still running
                    # after timeout fired, we need to kill the child (term would
                    # have no effect)
                    os.kill(pid, signal.SIGKILL)

                    try:
                        rpid, status = os.waitpid(pid, 0)
                    except OSError as exc:
                        logger.error('Error while waiting for child after killing'
                                     ' it: %s', exc.strerror)
            migration.wait()
        except Exception:
            logger.exception('Error during live migration for vm %s', name)
            logger.debug('Exit status %d', migration.return_status)
            raise

                    assert rpid == pid, 'PID returned by waitpid is not valid'

                    logger.error('Migration timeout for vm %s', name)
                    raise VMMigrationError('Timeout')
                else:
                    if status == 4:
                        raise VMMigrationError('Cannot open new connection to'
                                               ' libvirt')
                    elif status == 5:
                        raise VMMigrationError('Cannot open connection to'
                                               ' remote libvirt')
                    elif status != 0:
                        # error
                        logger.error('Libvirt error while live migrating vm %s',
                                     name)
                        logger.debug('Exit status %s', status)
                        raise VMMigrationError('Migration failed')
                    else:
                        logger.info('Sucessfuly live migrated vm %s', name)
                        break
        logger.info('Sucessfuly migrated vm %s', name)

    @threadless
    @pass_connection
+196 −0
Original line number Diff line number Diff line
"""KVM hypervisor support."""

import re
import os
import sys
import signal
import logging
import weakref
import threading
import traceback

import libvirt
from cloudcontrol.common.client.utils import main_thread

from cloudcontrol.node.hypervisor.lib import (
    DOMAIN_STATES, EVENTS,
@@ -12,6 +18,8 @@ from cloudcontrol.node.hypervisor.lib import (
    StorageIndex,
)
from cloudcontrol.node.hypervisor.domains import VirtualMachine
from cloudcontrol.node.utils import close_fds, set_signal_map, num_to_sig
from cloudcontrol.node.exc import VMMigrationError


logger = logging.getLogger(__name__)
@@ -196,3 +204,191 @@ class KVM(object):
    def vm_total(self):
        """Total number of VMs on the hypervisor."""
        return self._count_domain()


class LiveMigration(object):
    def __init__(self, main_loop, vm, node2virt_port, virt2virt_port, timeout,
                 unsafe=False):
        """Performs live migration in a forked process.

        :param main_loop: instance of MainLoop
        :param vm: instance of VM to migrate
        :param node2virt_port: port for ccnode -> distant libvirt
        :param virt2virt_port: port for local libvirt -> distant libvirt
        :param float timeout: timeout for libvirt migration (prevents libvirt
            from trying to acquire domain lock forever)
        :param bool unsafe: for Libvirt >= 0.9.11, see
            http://libvirt.org/html/libvirt-libvirt.html#virDomainMigrateFlags
        """
        self.main = main_loop

        self.vm = vm

        self.node2virt_port = node2virt_port
        self.virt2virt_port = virt2virt_port
        self.timeout = timeout
        self.unsafe = unsafe
        #: child pid
        self.pid = None
        self.error_msg = None
        self.return_status = None
        # event for caller thread to wait migration termination
        self.event = threading.Event()

        self.do_fork()

    def create_watchers(self):
        self.timeout_watcher = self.main.evloop.timer(self.timeout, 0.,
                                                      self.timeout_cb)
        self.child_watcher = self.main.evloop.child(self.pid, False,
                                                    self.child_cb)
        self.timeout_watcher.start()
        self.child_watcher.start()

    def child_cb(self, watcher, revents):
        self.pid = None
        self.return_status = watcher.rstatus
        watcher.stop()
        if self.timeout_watcher.active:
            self.timeout_watcher.stop()
        self.child_watcher = None

        logger.debug('Status: %s', self.return_status)

        # test if killed, then set msg
        if os.WIFSIGNALED(self.return_status):
            signo = os.WTERMSIG(self.return_status)
            if signo == signal.SIGKILL:
                self.error_msg = 'Migration timeout for vm %s' % self.vm.name
            else:
                self.error_msg = 'Migration failed for vm %s, (%s)' % (
                    self.vm.name, num_to_sig(signo))
        else:
            # test status
            status = os.WEXITSTATUS(self.return_status)
            if status == 1:
                self.error_msg = (
                    'Migration failed for vm %s, due to libvirt error'
                    % self.vm.name)
            elif status == 4:
                self.error_msg = 'Cannot open new connection to libvirt'
            elif status == 5:
                self.error_msg = 'Cannot open connection to remote libvirt'
            elif status != 0:
                self.error_msg = 'Migration failed for vm %s (%d)' % (
                    self.vm.name, status)

        self.event.set()

    def timeout_cb(self, watcher, revents):
        # kill the young
        logger.debug('Killing child migration process')
        os.kill(self.pid, signal.SIGKILL)

    @main_thread
    def do_fork(self):
        # we fork and open a new connection to libvirt because sometimes libvirt
        # python binding, while doing a operation,
        # doesn't seem to realease CPython's GIL, therefore all node
        # operations are blocked
        # the only solution we have found right now is to use a dedicated
        # libvirt connection for the migration and fork, the migration operation
        # in itself is handled by the child while other threads can be scheduled

        try:
            pid = os.fork()
        except OSError:
            logger.error('Cannot fork before running live migration')
            raise

        if pid == 0:
            # child
            try:
                self.child_work()
            except:
                # whatever the matter is we MUST NOT return to libev or sjRPC
                traceback.print_exc('Error uncatched')
            finally:
                os._exit(42)

        self.pid = pid
        self.create_watchers()

    def child_work(self):
        # migration is performed here
        sys.stderr.write('Hello from child !\n')
        sys.stderr.write('Debug is %s\n' % self.main.config.debug)
        try:
            close_fds(debug=self.main.config.debug)
            set_signal_map({
                signal.SIGTERM: lambda *args: os._exit(1),
                signal.SIGUSR1: signal.SIG_IGN,
                signal.SIGINT: signal.SIG_IGN,
                # FIXME need more signal ?
            })
        except:
            sys.stderr.write('Error while performing post fork work\n')
            traceback.print_exc(file=sys.stderr)
        # create a new libvirt connection dedicated to migration
        sys.stderr.write('Open new connection to libvirt\n')
        try:
            new_con = libvirt.open('qemu:///system')
            domain = new_con.lookupByUUIDString(self.vm.uuid)
        except libvirt.libvirtError:
            sys.stderr.write('Cannot connect to libvirt\n')
            os._exit(4)
        except:
            # error
            traceback.print_exc(sys.stderr)
            os._exit(2)
        sys.stderr.write('Open destination libvirt connection\n')
        try:
            dest_virt_con = libvirt.open(
                'qemu+tcp://127.0.0.1:%d/system' % self.node2virt_port)
        except libvirt.libvirtError:
            sys.stderr.write('Cannot connect to remote libvirt for live'
                             ' migrating vm %s', self.vm.name)
            os._exit(5)
        except:
            # error
            traceback.print_exc(file=sys.stderr)
            os._exit(2)

        try:
            if self.unsafe:
                # VIR_MIGRATE_UNSAFE is not defined for libvirt < 0.9.11
                append_flags = getattr(libvirt, 'VIR_MIGRATE_UNSAFE', 0)
            else:
                append_flags = 0
            sys.stderr.write('Do migrate\n')
            domain.migrate(
                dest_virt_con,
                libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_PEER2PEER |
                libvirt.VIR_MIGRATE_TUNNELLED |
                libvirt.VIR_MIGRATE_PERSIST_DEST |
                libvirt.VIR_MIGRATE_UNDEFINE_SOURCE |
                append_flags,
                None,
                'qemu+tcp://127.0.0.1:%d/system' % self.virt2virt_port,
                0,
            )
        except libvirt.libvirtError:
            sys.stderr.write('libvirt error during migration\n')
            traceback.print_exc(file=sys.stderr)
            os._exit(1)
        except:
            # whatever the matter is we MUST NOT return to libev or sjRPC
            sys.stderr.write('error during migration\n')
            traceback.print_exc(file=sys.stderr)
            os._exit(2)
        else:
            os._exit(0)
        finally:
            new_con.close()
            dest_virt_con.close()

    def wait(self):
        self.event.wait()

        if self.return_status != 0:
            raise VMMigrationError(self.error_msg)