Skip to content
kvm.py 17 KiB
Newer Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl.  If not, see <http://www.gnu.org/licenses/>.


"""KVM hypervisor support."""

import re
import os
import sys
import signal
import logging
import weakref
import threading
import traceback
from cloudcontrol.common.client.utils import main_thread
from cloudcontrol.common.client.tags import ParentWrapper

from cloudcontrol.node.hypervisor.lib import (
    DOMAIN_STATES, EVENTS,
    EventLoop as VirEventLoop,
    StorageIndex,
)
from cloudcontrol.node.hypervisor.domains import VirtualMachine
from cloudcontrol.node.utils import close_fds, set_signal_map, num_to_sig
from cloudcontrol.node.exc import VMMigrationError


logger = logging.getLogger(__name__)
BAD_VM_NAME = re.compile('^job-(0|([1-9]\d*))$')


# FIXME create abstract base class for any hypervisor
class KVM(object):
    """Container for all hypervisor related state."""
    def __init__(self, name, handler):
        """
        :param str name: name of hypervisor instance
        :param Handler handler: hypervisor handler
        """
        self.handler = weakref.proxy(handler)

        #: hv attributes
        self.name = name
        self.type = u'kvm'

        # register libvirt error handler
        libvirt.registerErrorHandler(self.vir_error_cb, None)
        # libvirt event loop abstraction
        self.vir_event_loop = VirEventLoop(self.handler.main.evloop)

        self.vir_con = libvirt.open('qemu:///system')  # currently only support KVM

        # findout storage
        self.storage = StorageIndex(handler, self.vir_con)

        logger.debug('Storages: %s', self.storage.paths)

        #: domains: vms, containers...
        self.domains = dict()
        # find defined domains
        for dom_name in self.vir_con.listDefinedDomains():
            dom = self.vir_con.lookupByName(dom_name)
            self.domains[dom.name()] = VirtualMachine(dom, self)
        # find started domains
        for dom_id in self.vir_con.listDomainsID():
            dom = self.vir_con.lookupByID(dom_id)
            self.domains[dom.name()] = VirtualMachine(dom, self)

        logger.debug('Domains: %s', self.domains)

Anael Beutot's avatar
Anael Beutot committed
        self.vir_con.domainEventRegister(self.vir_cb, None)
        self.vir_cb_id_dev_add = self.vir_con.domainEventRegisterAny(None,
                                                                     libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_ADDED,
                                                                     self.vir_cb_devices, None)
        self.vir_cb_id_dev_del = self.vir_con.domainEventRegisterAny(None,
                                                                     libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED,
                                                                     self.vir_cb_devices, None)

    def stop(self):
        self.vir_event_loop.stop()
        # unregister callback
        try:
            self.vir_con.domainEventDeregister(self.vir_cb)
            self.vir_con.domainEventDeregisterAny(self.vir_cb_id_dev_add)
            self.vir_con.domainEventDeregisterAny(self.vir_cb_id_dev_del)
        except libvirt.libvirtError:
            # in case the libvirt connection is broken, it will raise the error
            pass
        ret = self.vir_con.close()
        logger.debug('Libvirt still handling %s ref connections', ret)

    def vir_error_cb(self, ctxt, err):
        """Libvirt error callback.

        See http://libvirt.org/errors.html for more informations.

        :param ctxt: arbitrary context data (not needed because context is
            givent by self
        :param err: libvirt error code
        """
        logger.error('Libvirt error %s', err)

    def vir_cb(self, conn, dom, event, detail, opaque):
        """Callback for libvirt event loop."""
        logger.debug('Received event %s on domain %s, detail %s', event,
                     dom.name(), detail)

        event = EVENTS[event]

        if event == 'Added':
            # prevents name conflicts with others cc-node objects
            if BAD_VM_NAME.match(dom.name()):
                logger.error('Cannot register VM %s as its name would '
                             'conflits with others cc-node objects')
                return
            # update Storage pools in case VM has volumes that were created
            self.storage.update(retry=10)
            # if vm is redefined while running we need to refresh its devices
            # when stopped
            redefine_on_stop = False
            if dom.name() in self.domains:
                # sometimes libvirt send us the same event multiple times
                # this can be the result of a change in the domain configuration
                # we first remove the old domain
                vm = self.domains.pop(dom.name())
                vm.tag_db.set_parent(None)
                if vm.state not in ('stopped', 'crashed'):
                    # if the vm was updated while it was "on", then the
                    # modifications will not be reflected since we construct the
                    # object/tags from running XML
                    redefine_on_stop = True
                logger.debug('Domain %s recreated', dom.name())
            self.vm_register(dom, redefine_on_stop)
        elif event == 'Removed':
            vm_name = dom.name()
            self.vm_unregister(vm_name)
            logger.info('Removed domain %s', vm_name)
        elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved',
                       'Restored'):
            vm = self.domains.get(dom.name())
            # sometimes libvirt sent a start event before a created event so be
            # careful
            if vm is not None:
                try:
                    state = DOMAIN_STATES[dom.info()[0]]
                except libvirt.libvirtError as exc:
                    # checks that domain was not previously removed
                    # seems to happen only in libvirt 0.8.8
                    if 'Domain not found' in str(exc):
                        self.vm_unregister(dom.name())
                    else:
                        raise
                else:
Antoine Millet's avatar
Antoine Millet committed
                    logger.info('Domain change state from %s to %s', vm.state, state)
                    if event == 'Stopped' and vm.redefine_on_stop:
                        # if the vm was changed while it was running, then we
                        # need to recreate it now as stated above
                        self.vm_unregister(vm.name)
                        self.vm_register(dom)
                    else:
                        vm.state = state
    def vir_cb_devices(self, conn, dom, device, opaque):
        """Callback for device add/removed from a domain."""
        logger.debug('Received device event on domain %s, dev %s', dom.name(), device)
        vm = self.domains.get(dom.name())
        if vm is not None:
            # Unregister/register VM to update devices list:
            self.vm_unregister(vm.name)
            self.vm_register(dom)

    def vm_register(self, dom, redefine_on_stop=False):
        """Register a VM to the hypervisor object.

        :param dom: libvirt domain instance
        :param redefine_on_stop: if we need to reread the domain XML on stop
        """
        vm = VirtualMachine(dom, self)
        logger.info('Created domain %s', vm.name)
        vm.redefine_on_stop = redefine_on_stop
        self.domains[vm.name] = vm
        vm.tag_db.set_parent(ParentWrapper(vm.name, 'vm', self.handler.tag_db))
        self.update_domain_count()

    def vm_unregister(self, name):
        """Unregister a VM from the cc-server and remove it from the index."""
        try:
            vm = self.domains.pop(name)
        except KeyError:
            # domain already removed, see hypervisor/domains/vm_tags.py
            # sometimes libvirt send us the remove event too late
            # we still update storage and tag attributes
            pass
        else:
            vm.tag_db.set_parent(None)
            # update Storage pools in case VM had volumes that were deleted
            self.storage.update()
            self.update_domain_count()

    def update_domain_count(self):
        """Update domain state count tags."""
        # update domain state counts
        for tag in ('nvm', 'vmpaused', 'vmstarted', 'vmstopped', 'cpualloc',
                    'cpurunning', 'cpuremaining', 'cpuallocratio', 'memalloc',
                    'memrunning', 'memremaining', 'memallocratio'):
            self.handler.tag_db['__main__'][tag].update_value()

    def vm_define(self, xml_desc):
        """Create a VM on the Hypervisor
        :param str xml_desc: XML description in libvirt format
        :return: VM name created
        """
        try:
            return self.vir_con.defineXML(xml_desc).UUIDString()
        except libvirt.libvirtError:
            logger.exception('Error while creating domain')
            # reraise exception for the cc-server
            raise

    def _count_domain(self, filter=lambda d: True):
        count = 0

        for dom in self.domains.itervalues():
            if filter(dom):
                count += 1

        return count

    @property
    def vm_started(self):
        """Number of VMs started."""
        return self._count_domain(lambda d: d.state == 'running')

    @property
    def vm_stopped(self):
        """Number of VMs stopped."""
        return self._count_domain(lambda d: d.state == 'stopped')

    @property
    def vm_paused(self):
        """Number of VMs paused."""
        return self._count_domain(lambda d: d.state == 'paused')

    @property
    def vm_total(self):
        """Total number of VMs on the hypervisor."""
        return self._count_domain()


class LiveMigration(object):
    def __init__(self, main_loop, vm, node2virt_port, virt2virt_port, timeout,
                 unsafe=False):
        """Performs live migration in a forked process.

        :param main_loop: instance of MainLoop
        :param vm: instance of VM to migrate
        :param node2virt_port: port for ccnode -> distant libvirt
        :param virt2virt_port: port for local libvirt -> distant libvirt
        :param float timeout: timeout for libvirt migration (prevents libvirt
            from trying to acquire domain lock forever)
        :param bool unsafe: for Libvirt >= 0.9.11, see
            http://libvirt.org/html/libvirt-libvirt.html#virDomainMigrateFlags
        """
        self.main = main_loop

        self.vm = vm

        self.node2virt_port = node2virt_port
        self.virt2virt_port = virt2virt_port
        self.timeout = timeout
        self.unsafe = unsafe
        #: child pid
        self.pid = None
        self.error_msg = None
        self.return_status = None
        # event for caller thread to wait migration termination
        self.event = threading.Event()

        self.do_fork()

    def create_watchers(self):
        self.timeout_watcher = self.main.evloop.timer(self.timeout, 0.,
                                                      self.timeout_cb)
        self.child_watcher = self.main.evloop.child(self.pid, False,
                                                    self.child_cb)
        self.timeout_watcher.start()
        self.child_watcher.start()

    def child_cb(self, watcher, revents):
        self.pid = None
        self.return_status = watcher.rstatus
        watcher.stop()
        if self.timeout_watcher.active:
            self.timeout_watcher.stop()
        self.child_watcher = None

        logger.debug('Status: %s', self.return_status)

        # test if killed, then set msg
        if os.WIFSIGNALED(self.return_status):
            signo = os.WTERMSIG(self.return_status)
            if signo == signal.SIGKILL:
                self.error_msg = 'Migration timeout for vm %s' % self.vm.name
            else:
                self.error_msg = 'Migration failed for vm %s, (%s)' % (
                    self.vm.name, num_to_sig(signo))
        else:
            # test status
            status = os.WEXITSTATUS(self.return_status)
            if status == 1:
                self.error_msg = (
                    'Migration failed for vm %s, due to libvirt error'
                    % self.vm.name)
            elif status == 4:
                self.error_msg = 'Cannot open new connection to libvirt'
            elif status == 5:
                self.error_msg = 'Cannot open connection to remote libvirt'
            elif status != 0:
                self.error_msg = 'Migration failed for vm %s (%d)' % (
                    self.vm.name, status)

        self.event.set()

    def timeout_cb(self, watcher, revents):
        # kill the young
        logger.debug('Killing child migration process')
        os.kill(self.pid, signal.SIGKILL)

    @main_thread
    def do_fork(self):
        # we fork and open a new connection to libvirt because sometimes libvirt
        # python binding, while doing a operation,
        # doesn't seem to realease CPython's GIL, therefore all node
        # operations are blocked
        # the only solution we have found right now is to use a dedicated
        # libvirt connection for the migration and fork, the migration operation
        # in itself is handled by the child while other threads can be scheduled

        try:
            pid = os.fork()
        except OSError:
            logger.error('Cannot fork before running live migration')
            raise

        if pid == 0:
            # child
            try:
                self.child_work()
            except:
                # whatever the matter is we MUST NOT return to libev or sjRPC
                traceback.print_exc('Error uncatched')
            finally:
                os._exit(42)

        self.pid = pid
        self.create_watchers()

    def child_work(self):
        # migration is performed here
        sys.stderr.write('Hello from child !\n')
        sys.stderr.write('Debug is %s\n' % self.main.config.debug)
        try:
            close_fds(debug=self.main.config.debug)
            set_signal_map({
                signal.SIGTERM: lambda *args: os._exit(1),
                signal.SIGUSR1: signal.SIG_IGN,
                signal.SIGINT: signal.SIG_IGN,
                # FIXME need more signal ?
            })
        except:
            sys.stderr.write('Error while performing post fork work\n')
            traceback.print_exc(file=sys.stderr)
        # create a new libvirt connection dedicated to migration
        sys.stderr.write('Open new connection to libvirt\n')
        try:
            new_con = libvirt.open('qemu:///system')
            domain = new_con.lookupByUUIDString(self.vm.uuid)
        except libvirt.libvirtError:
            sys.stderr.write('Cannot connect to libvirt\n')
            os._exit(4)
        except:
            # error
            traceback.print_exc(sys.stderr)
            os._exit(2)
        sys.stderr.write('Open destination libvirt connection\n')
        try:
            dest_virt_con = libvirt.open(
                'qemu+tcp://127.0.0.1:%d/system' % self.node2virt_port)
        except libvirt.libvirtError:
            sys.stderr.write('Cannot connect to remote libvirt for live'
                             ' migrating vm %s', self.vm.name)
            os._exit(5)
        except:
            # error
            traceback.print_exc(file=sys.stderr)
            os._exit(2)

        try:
            if self.unsafe:
                # VIR_MIGRATE_UNSAFE is not defined for libvirt < 0.9.11
                append_flags = getattr(libvirt, 'VIR_MIGRATE_UNSAFE', 0)
            else:
                append_flags = 0
            sys.stderr.write('Do migrate\n')
            domain.migrate(
                dest_virt_con,
                libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_PEER2PEER |
                libvirt.VIR_MIGRATE_TUNNELLED |
                libvirt.VIR_MIGRATE_PERSIST_DEST |
                libvirt.VIR_MIGRATE_UNDEFINE_SOURCE |
                append_flags,
                None,
                'qemu+tcp://127.0.0.1:%d/system' % self.virt2virt_port,
                0,
            )
        except libvirt.libvirtError:
            sys.stderr.write('libvirt error during migration\n')
            traceback.print_exc(file=sys.stderr)
            os._exit(1)
        except:
            # whatever the matter is we MUST NOT return to libev or sjRPC
            sys.stderr.write('error during migration\n')
            traceback.print_exc(file=sys.stderr)
            os._exit(2)
        else:
            os._exit(0)
        finally:
            new_con.close()
            dest_virt_con.close()

    def wait(self):
        self.event.wait()

        if self.return_status != 0:
            raise VMMigrationError(self.error_msg)