Skip to content
kvm.py 14.1 KiB
Newer Older
"""KVM hypervisor support."""

import re
import os
import sys
import signal
import logging
import weakref
import threading
import traceback
from cloudcontrol.common.client.utils import main_thread

from cloudcontrol.node.hypervisor.lib import (
    DOMAIN_STATES, EVENTS,
    EventLoop as VirEventLoop,
    StorageIndex,
)
from cloudcontrol.node.hypervisor.domains import VirtualMachine
from cloudcontrol.node.utils import close_fds, set_signal_map, num_to_sig
from cloudcontrol.node.exc import VMMigrationError


logger = logging.getLogger(__name__)
BAD_VM_NAME = re.compile('^job-(0|([1-9]\d*))$')


# FIXME create abstract base class for any hypervisor
class KVM(object):
    """Container for all hypervisor related state."""
    def __init__(self, name, handler):
        """
        :param str name: name of hypervisor instance
        :param Handler handler: hypervisor handler
        """
        self.handler = weakref.proxy(handler)

        #: hv attributes
        self.name = name
        self.type = u'kvm'

        # register libvirt error handler
        libvirt.registerErrorHandler(self.vir_error_cb, None)
        # libvirt event loop abstraction
        self.vir_event_loop = VirEventLoop(self.handler.main.evloop)

        self.vir_con = libvirt.open('qemu:///system')  # currently only support KVM

        # findout storage
        self.storage = StorageIndex(handler, self.vir_con)

        logger.debug('Storages: %s', self.storage.paths)

        #: domains: vms, containers...
        self.domains = dict()
        # find defined domains
        for dom_name in self.vir_con.listDefinedDomains():
            dom = self.vir_con.lookupByName(dom_name)
            self.domains[dom.name()] = VirtualMachine(dom, self)
        # find started domains
        for dom_id in self.vir_con.listDomainsID():
            dom = self.vir_con.lookupByID(dom_id)
            self.domains[dom.name()] = VirtualMachine(dom, self)

        logger.debug('Domains: %s', self.domains)

Anael Beutot's avatar
Anael Beutot committed
        self.vir_con.domainEventRegister(self.vir_cb, None)

    def stop(self):
        self.vir_event_loop.stop()
        # unregister callback
        try:
            self.vir_con.domainEventDeregister(self.vir_cb)
        except libvirt.libvirtError:
            # in case the libvirt connection is broken, it will raise the error
            pass
        ret = self.vir_con.close()
        logger.debug('Libvirt still handling %s ref connections', ret)

    def vir_error_cb(self, ctxt, err):
        """Libvirt error callback.

        See http://libvirt.org/errors.html for more informations.

        :param ctxt: arbitrary context data (not needed because context is
            givent by self
        :param err: libvirt error code
        """
        logger.error('Libvirt error %s', err)

    def vir_cb(self, conn, dom, event, detail, opaque):
        """Callback for libvirt event loop."""
        logger.debug('Received event %s on domain %s, detail %s', event,
                     dom.name(), detail)

        event = EVENTS[event]

        if event == 'Added':
            # prevents name conflicts with others cc-node objects
            if BAD_VM_NAME.match(dom.name()):
                logger.error('Cannot register VM %s as its name would '
                             'conflits with others cc-node objects')
                return
            # update Storage pools in case VM has volumes that were created
            self.storage.update()
            if dom.name() in self.domains:
                # sometimes libvirt send us the same event multiple times
                # this can be the result of a change in the domain configuration
                # we first remove the old domain
                vm = self.domains.pop(dom.name())
                self.handler.tag_db.remove_sub_object(vm.name)
                logger.debug('Domain %s recreated', dom.name())
            vm = VirtualMachine(dom, self)
            logger.info('Created domain %s', vm.name)
            self.domains[vm.name] = vm
            self.handler.tag_db.add_sub_object(vm.name, vm.tags.itervalues(),
                                               'vm')
            self.update_domain_count()
        elif event == 'Removed':
            vm_name = dom.name()
            self.vm_unregister(vm_name)
            logger.info('Removed domain %s', vm_name)
        elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved',
                       'Restored'):
            vm = self.domains.get(dom.name())
            # sometimes libvirt sent a start event before a created event so be
            # careful
            if vm is not None:
                try:
                    state = DOMAIN_STATES[dom.info()[0]]
                except libvirt.libvirtError as exc:
                    # checks that domain was not previously removed
                    # seems to happen only in libvirt 0.8.8
                    if 'Domain not found' in str(exc):
                        self.vm_unregister(dom.name())
                    else:
                        raise
                else:
                    logger.info('Domain change state from %s to %s', vm.state,
                                 state)
                    vm.state = state
                    self.update_domain_count()

    def vm_unregister(self, name):
        """Unregister a VM from the cc-server and remove it from the index."""
        try:
            vm = self.domains.pop(name)
        except KeyError:
            # domain already removed, see hypervisor/domains/vm_tags.py
            # sometimes libvirt send us the remove event too late
            # we still update storage and tag attributes
            pass
        else:
            self.handler.tag_db.remove_sub_object(vm.name)
            # update Storage pools in case VM had volumes that were deleted
            self.storage.update()
            self.update_domain_count()

    def update_domain_count(self):
        """Update domain state count tags."""
        # update domain state counts
        for tag in ('nvm', 'vmpaused', 'vmstarted', 'vmstopped', 'cpualloc',
                    'cpurunning', 'memalloc', 'memrunning'):
            self.handler.tag_db['__main__'][tag].update_value()

    def vm_define(self, xml_desc):
        """Create a VM on the Hypervisor
        :param str xml_desc: XML description in libvirt format
        :return: VM name created
        """
        try:
            return self.vir_con.defineXML(xml_desc).name()
        except libvirt.libvirtError:
            logger.exception('Error while creating domain')
            # reraise exception for the cc-server
            raise

    def _count_domain(self, filter=lambda d: True):
        count = 0

        for dom in self.domains.itervalues():
            if filter(dom):
                count += 1

        return count

    @property
    def vm_started(self):
        """Number of VMs started."""
        return self._count_domain(lambda d: d.state == 'running')

    @property
    def vm_stopped(self):
        """Number of VMs stopped."""
        return self._count_domain(lambda d: d.state == 'stopped')

    @property
    def vm_paused(self):
        """Number of VMs paused."""
        return self._count_domain(lambda d: d.state == 'paused')

    @property
    def vm_total(self):
        """Total number of VMs on the hypervisor."""
        return self._count_domain()


class LiveMigration(object):
    def __init__(self, main_loop, vm, node2virt_port, virt2virt_port, timeout,
                 unsafe=False):
        """Performs live migration in a forked process.

        :param main_loop: instance of MainLoop
        :param vm: instance of VM to migrate
        :param node2virt_port: port for ccnode -> distant libvirt
        :param virt2virt_port: port for local libvirt -> distant libvirt
        :param float timeout: timeout for libvirt migration (prevents libvirt
            from trying to acquire domain lock forever)
        :param bool unsafe: for Libvirt >= 0.9.11, see
            http://libvirt.org/html/libvirt-libvirt.html#virDomainMigrateFlags
        """
        self.main = main_loop

        self.vm = vm

        self.node2virt_port = node2virt_port
        self.virt2virt_port = virt2virt_port
        self.timeout = timeout
        self.unsafe = unsafe
        #: child pid
        self.pid = None
        self.error_msg = None
        self.return_status = None
        # event for caller thread to wait migration termination
        self.event = threading.Event()

        self.do_fork()

    def create_watchers(self):
        self.timeout_watcher = self.main.evloop.timer(self.timeout, 0.,
                                                      self.timeout_cb)
        self.child_watcher = self.main.evloop.child(self.pid, False,
                                                    self.child_cb)
        self.timeout_watcher.start()
        self.child_watcher.start()

    def child_cb(self, watcher, revents):
        self.pid = None
        self.return_status = watcher.rstatus
        watcher.stop()
        if self.timeout_watcher.active:
            self.timeout_watcher.stop()
        self.child_watcher = None

        logger.debug('Status: %s', self.return_status)

        # test if killed, then set msg
        if os.WIFSIGNALED(self.return_status):
            signo = os.WTERMSIG(self.return_status)
            if signo == signal.SIGKILL:
                self.error_msg = 'Migration timeout for vm %s' % self.vm.name
            else:
                self.error_msg = 'Migration failed for vm %s, (%s)' % (
                    self.vm.name, num_to_sig(signo))
        else:
            # test status
            status = os.WEXITSTATUS(self.return_status)
            if status == 1:
                self.error_msg = (
                    'Migration failed for vm %s, due to libvirt error'
                    % self.vm.name)
            elif status == 4:
                self.error_msg = 'Cannot open new connection to libvirt'
            elif status == 5:
                self.error_msg = 'Cannot open connection to remote libvirt'
            elif status != 0:
                self.error_msg = 'Migration failed for vm %s (%d)' % (
                    self.vm.name, status)

        self.event.set()

    def timeout_cb(self, watcher, revents):
        # kill the young
        logger.debug('Killing child migration process')
        os.kill(self.pid, signal.SIGKILL)

    @main_thread
    def do_fork(self):
        # we fork and open a new connection to libvirt because sometimes libvirt
        # python binding, while doing a operation,
        # doesn't seem to realease CPython's GIL, therefore all node
        # operations are blocked
        # the only solution we have found right now is to use a dedicated
        # libvirt connection for the migration and fork, the migration operation
        # in itself is handled by the child while other threads can be scheduled

        try:
            pid = os.fork()
        except OSError:
            logger.error('Cannot fork before running live migration')
            raise

        if pid == 0:
            # child
            try:
                self.child_work()
            except:
                # whatever the matter is we MUST NOT return to libev or sjRPC
                traceback.print_exc('Error uncatched')
            finally:
                os._exit(42)

        self.pid = pid
        self.create_watchers()

    def child_work(self):
        # migration is performed here
        sys.stderr.write('Hello from child !\n')
        sys.stderr.write('Debug is %s\n' % self.main.config.debug)
        try:
            close_fds(debug=self.main.config.debug)
            set_signal_map({
                signal.SIGTERM: lambda *args: os._exit(1),
                signal.SIGUSR1: signal.SIG_IGN,
                signal.SIGINT: signal.SIG_IGN,
                # FIXME need more signal ?
            })
        except:
            sys.stderr.write('Error while performing post fork work\n')
            traceback.print_exc(file=sys.stderr)
        # create a new libvirt connection dedicated to migration
        sys.stderr.write('Open new connection to libvirt\n')
        try:
            new_con = libvirt.open('qemu:///system')
            domain = new_con.lookupByUUIDString(self.vm.uuid)
        except libvirt.libvirtError:
            sys.stderr.write('Cannot connect to libvirt\n')
            os._exit(4)
        except:
            # error
            traceback.print_exc(sys.stderr)
            os._exit(2)
        sys.stderr.write('Open destination libvirt connection\n')
        try:
            dest_virt_con = libvirt.open(
                'qemu+tcp://127.0.0.1:%d/system' % self.node2virt_port)
        except libvirt.libvirtError:
            sys.stderr.write('Cannot connect to remote libvirt for live'
                             ' migrating vm %s', self.vm.name)
            os._exit(5)
        except:
            # error
            traceback.print_exc(file=sys.stderr)
            os._exit(2)

        try:
            if self.unsafe:
                # VIR_MIGRATE_UNSAFE is not defined for libvirt < 0.9.11
                append_flags = getattr(libvirt, 'VIR_MIGRATE_UNSAFE', 0)
            else:
                append_flags = 0
            sys.stderr.write('Do migrate\n')
            domain.migrate(
                dest_virt_con,
                libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_PEER2PEER |
                libvirt.VIR_MIGRATE_TUNNELLED |
                libvirt.VIR_MIGRATE_PERSIST_DEST |
                libvirt.VIR_MIGRATE_UNDEFINE_SOURCE |
                append_flags,
                None,
                'qemu+tcp://127.0.0.1:%d/system' % self.virt2virt_port,
                0,
            )
        except libvirt.libvirtError:
            sys.stderr.write('libvirt error during migration\n')
            traceback.print_exc(file=sys.stderr)
            os._exit(1)
        except:
            # whatever the matter is we MUST NOT return to libev or sjRPC
            sys.stderr.write('error during migration\n')
            traceback.print_exc(file=sys.stderr)
            os._exit(2)
        else:
            os._exit(0)
        finally:
            new_con.close()
            dest_virt_con.close()

    def wait(self):
        self.event.wait()

        if self.return_status != 0:
            raise VMMigrationError(self.error_msg)