# This file is part of CloudControl. # # CloudControl is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # CloudControl is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with CloudControl. If not, see . """KVM hypervisor support.""" import re import os import sys import signal import logging import weakref import threading import traceback import libvirt from cloudcontrol.common.client.utils import main_thread from cloudcontrol.node.hypervisor.lib import ( DOMAIN_STATES, EVENTS, EventLoop as VirEventLoop, StorageIndex, ) from cloudcontrol.node.hypervisor.domains import VirtualMachine from cloudcontrol.node.utils import close_fds, set_signal_map, num_to_sig from cloudcontrol.node.exc import VMMigrationError logger = logging.getLogger(__name__) BAD_VM_NAME = re.compile('^job-(0|([1-9]\d*))$') # FIXME create abstract base class for any hypervisor class KVM(object): """Container for all hypervisor related state.""" def __init__(self, name, handler): """ :param str name: name of hypervisor instance :param Handler handler: hypervisor handler """ self.handler = weakref.proxy(handler) #: hv attributes self.name = name self.type = u'kvm' # register libvirt error handler libvirt.registerErrorHandler(self.vir_error_cb, None) # libvirt event loop abstraction self.vir_event_loop = VirEventLoop(self.handler.main.evloop) self.vir_con = libvirt.open('qemu:///system') # currently only support KVM # findout storage self.storage = StorageIndex(handler, self.vir_con) logger.debug('Storages: %s', self.storage.paths) #: domains: vms, containers... self.domains = dict() # find defined domains for dom_name in self.vir_con.listDefinedDomains(): dom = self.vir_con.lookupByName(dom_name) self.domains[dom.name()] = VirtualMachine(dom, self) # find started domains for dom_id in self.vir_con.listDomainsID(): dom = self.vir_con.lookupByID(dom_id) self.domains[dom.name()] = VirtualMachine(dom, self) logger.debug('Domains: %s', self.domains) self.vir_con.domainEventRegister(self.vir_cb, None) def stop(self): self.vir_event_loop.stop() # unregister callback try: self.vir_con.domainEventDeregister(self.vir_cb) except libvirt.libvirtError: # in case the libvirt connection is broken, it will raise the error pass ret = self.vir_con.close() logger.debug('Libvirt still handling %s ref connections', ret) def vir_error_cb(self, ctxt, err): """Libvirt error callback. See http://libvirt.org/errors.html for more informations. :param ctxt: arbitrary context data (not needed because context is givent by self :param err: libvirt error code """ logger.error('Libvirt error %s', err) def vir_cb(self, conn, dom, event, detail, opaque): """Callback for libvirt event loop.""" logger.debug('Received event %s on domain %s, detail %s', event, dom.name(), detail) event = EVENTS[event] if event == 'Added': # prevents name conflicts with others cc-node objects if BAD_VM_NAME.match(dom.name()): logger.error('Cannot register VM %s as its name would ' 'conflits with others cc-node objects') return # update Storage pools in case VM has volumes that were created self.storage.update() if dom.name() in self.domains: # sometimes libvirt send us the same event multiple times # this can be the result of a change in the domain configuration # we first remove the old domain vm = self.domains.pop(dom.name()) self.handler.tag_db.remove_sub_object(vm.name) logger.debug('Domain %s recreated', dom.name()) vm = VirtualMachine(dom, self) logger.info('Created domain %s', vm.name) self.domains[vm.name] = vm self.handler.tag_db.add_sub_object(vm.name, vm.tags.itervalues(), 'vm') self.update_domain_count() elif event == 'Removed': vm_name = dom.name() self.vm_unregister(vm_name) logger.info('Removed domain %s', vm_name) elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved', 'Restored'): vm = self.domains.get(dom.name()) # sometimes libvirt sent a start event before a created event so be # careful if vm is not None: try: state = DOMAIN_STATES[dom.info()[0]] except libvirt.libvirtError as exc: # checks that domain was not previously removed # seems to happen only in libvirt 0.8.8 if 'Domain not found' in str(exc): self.vm_unregister(dom.name()) else: raise else: logger.info('Domain change state from %s to %s', vm.state, state) vm.state = state self.update_domain_count() def vm_unregister(self, name): """Unregister a VM from the cc-server and remove it from the index.""" try: vm = self.domains.pop(name) except KeyError: # domain already removed, see hypervisor/domains/vm_tags.py # sometimes libvirt send us the remove event too late # we still update storage and tag attributes pass else: self.handler.tag_db.remove_sub_object(vm.name) # update Storage pools in case VM had volumes that were deleted self.storage.update() self.update_domain_count() def update_domain_count(self): """Update domain state count tags.""" # update domain state counts for tag in ('nvm', 'vmpaused', 'vmstarted', 'vmstopped', 'cpualloc', 'cpurunning', 'memalloc', 'memrunning'): self.handler.tag_db['__main__'][tag].update_value() def vm_define(self, xml_desc): """Create a VM on the Hypervisor :param str xml_desc: XML description in libvirt format :return: VM name created """ try: return self.vir_con.defineXML(xml_desc).name() except libvirt.libvirtError: logger.exception('Error while creating domain') # reraise exception for the cc-server raise def _count_domain(self, filter=lambda d: True): count = 0 for dom in self.domains.itervalues(): if filter(dom): count += 1 return count @property def vm_started(self): """Number of VMs started.""" return self._count_domain(lambda d: d.state == 'running') @property def vm_stopped(self): """Number of VMs stopped.""" return self._count_domain(lambda d: d.state == 'stopped') @property def vm_paused(self): """Number of VMs paused.""" return self._count_domain(lambda d: d.state == 'paused') @property def vm_total(self): """Total number of VMs on the hypervisor.""" return self._count_domain() class LiveMigration(object): def __init__(self, main_loop, vm, node2virt_port, virt2virt_port, timeout, unsafe=False): """Performs live migration in a forked process. :param main_loop: instance of MainLoop :param vm: instance of VM to migrate :param node2virt_port: port for ccnode -> distant libvirt :param virt2virt_port: port for local libvirt -> distant libvirt :param float timeout: timeout for libvirt migration (prevents libvirt from trying to acquire domain lock forever) :param bool unsafe: for Libvirt >= 0.9.11, see http://libvirt.org/html/libvirt-libvirt.html#virDomainMigrateFlags """ self.main = main_loop self.vm = vm self.node2virt_port = node2virt_port self.virt2virt_port = virt2virt_port self.timeout = timeout self.unsafe = unsafe #: child pid self.pid = None self.error_msg = None self.return_status = None # event for caller thread to wait migration termination self.event = threading.Event() self.do_fork() def create_watchers(self): self.timeout_watcher = self.main.evloop.timer(self.timeout, 0., self.timeout_cb) self.child_watcher = self.main.evloop.child(self.pid, False, self.child_cb) self.timeout_watcher.start() self.child_watcher.start() def child_cb(self, watcher, revents): self.pid = None self.return_status = watcher.rstatus watcher.stop() if self.timeout_watcher.active: self.timeout_watcher.stop() self.child_watcher = None logger.debug('Status: %s', self.return_status) # test if killed, then set msg if os.WIFSIGNALED(self.return_status): signo = os.WTERMSIG(self.return_status) if signo == signal.SIGKILL: self.error_msg = 'Migration timeout for vm %s' % self.vm.name else: self.error_msg = 'Migration failed for vm %s, (%s)' % ( self.vm.name, num_to_sig(signo)) else: # test status status = os.WEXITSTATUS(self.return_status) if status == 1: self.error_msg = ( 'Migration failed for vm %s, due to libvirt error' % self.vm.name) elif status == 4: self.error_msg = 'Cannot open new connection to libvirt' elif status == 5: self.error_msg = 'Cannot open connection to remote libvirt' elif status != 0: self.error_msg = 'Migration failed for vm %s (%d)' % ( self.vm.name, status) self.event.set() def timeout_cb(self, watcher, revents): # kill the young logger.debug('Killing child migration process') os.kill(self.pid, signal.SIGKILL) @main_thread def do_fork(self): # we fork and open a new connection to libvirt because sometimes libvirt # python binding, while doing a operation, # doesn't seem to realease CPython's GIL, therefore all node # operations are blocked # the only solution we have found right now is to use a dedicated # libvirt connection for the migration and fork, the migration operation # in itself is handled by the child while other threads can be scheduled try: pid = os.fork() except OSError: logger.error('Cannot fork before running live migration') raise if pid == 0: # child try: self.child_work() except: # whatever the matter is we MUST NOT return to libev or sjRPC traceback.print_exc('Error uncatched') finally: os._exit(42) self.pid = pid self.create_watchers() def child_work(self): # migration is performed here sys.stderr.write('Hello from child !\n') sys.stderr.write('Debug is %s\n' % self.main.config.debug) try: close_fds(debug=self.main.config.debug) set_signal_map({ signal.SIGTERM: lambda *args: os._exit(1), signal.SIGUSR1: signal.SIG_IGN, signal.SIGINT: signal.SIG_IGN, # FIXME need more signal ? }) except: sys.stderr.write('Error while performing post fork work\n') traceback.print_exc(file=sys.stderr) # create a new libvirt connection dedicated to migration sys.stderr.write('Open new connection to libvirt\n') try: new_con = libvirt.open('qemu:///system') domain = new_con.lookupByUUIDString(self.vm.uuid) except libvirt.libvirtError: sys.stderr.write('Cannot connect to libvirt\n') os._exit(4) except: # error traceback.print_exc(sys.stderr) os._exit(2) sys.stderr.write('Open destination libvirt connection\n') try: dest_virt_con = libvirt.open( 'qemu+tcp://127.0.0.1:%d/system' % self.node2virt_port) except libvirt.libvirtError: sys.stderr.write('Cannot connect to remote libvirt for live' ' migrating vm %s', self.vm.name) os._exit(5) except: # error traceback.print_exc(file=sys.stderr) os._exit(2) try: if self.unsafe: # VIR_MIGRATE_UNSAFE is not defined for libvirt < 0.9.11 append_flags = getattr(libvirt, 'VIR_MIGRATE_UNSAFE', 0) else: append_flags = 0 sys.stderr.write('Do migrate\n') domain.migrate( dest_virt_con, libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_PEER2PEER | libvirt.VIR_MIGRATE_TUNNELLED | libvirt.VIR_MIGRATE_PERSIST_DEST | libvirt.VIR_MIGRATE_UNDEFINE_SOURCE | append_flags, None, 'qemu+tcp://127.0.0.1:%d/system' % self.virt2virt_port, 0, ) except libvirt.libvirtError: sys.stderr.write('libvirt error during migration\n') traceback.print_exc(file=sys.stderr) os._exit(1) except: # whatever the matter is we MUST NOT return to libev or sjRPC sys.stderr.write('error during migration\n') traceback.print_exc(file=sys.stderr) os._exit(2) else: os._exit(0) finally: new_con.close() dest_virt_con.close() def wait(self): self.event.wait() if self.return_status != 0: raise VMMigrationError(self.error_msg)