import logging import weakref from itertools import chain, imap import libvirt from sjrpc.utils import threadless from cloudcontrol.node.host import Handler as HostHandler from cloudcontrol.node.tags import Tag, tag_inspector, get_tags from cloudcontrol.node.hypervisor import tags from cloudcontrol.node.hypervisor.lib import ( DOMAIN_STATES, EVENTS, STORAGE_STATES, EventLoop as VirEventLoop, ) from cloudcontrol.node.hypervisor.domains import VirtualMachine from cloudcontrol.node.exc import UndefinedDomain, PoolStorageError from cloudcontrol.node.hypervisor.jobs import ImportVolume, ExportVolume, TCPTunnel logger = logging.getLogger(__name__) # FIXME find a way to refactor Handler and Hypervisor class class Handler(HostHandler): def __init__(self, *args, **kwargs): """ :param loop: MainLoop instance :param hypervisor_name: hypervisor name """ self.hypervisor_name = kwargs.pop('hypervisor_name') HostHandler.__init__(self, *args, **kwargs) #: keep index of asynchronous calls self.async_calls = dict() self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb) self.hypervisor = None self._virt_connected = False # register tags self.tag_db.add_tags(tag_inspector(tags, self)) @property def virt_connected(self): return self._virt_connected @virt_connected.setter def virt_connected(self, value): self._virt_connected = value # update tags for tag in ('vir_status', 'sto', 'nvm', 'vmpaused', 'vmstarted', 'vmstopped', 'hvver', 'libvirtver', 'hv'): self.tag_db['__main__'][tag].update_value() def start(self): self.timer.start() HostHandler.start(self) def stop(self): self.timer.stop() if self.hypervisor is not None: self.hypervisor.stop() HostHandler.stop(self) def virt_connect_cb(self, *args): # initialize hypervisor instance try: self.hypervisor = Hypervisor( name=self.hypervisor_name, handler=self, ) except libvirt.libvirtError: logger.exception('Error while connecting to libvirt') return self.virt_connected = True # register hypervisor storage tags for name, storage in self.hypervisor.storage.storages.iteritems(): self.tag_db.add_tags(( Tag('sto%s_state' % name, lambda: storage.state, 5, 5), Tag('sto%s_size' % name, lambda: storage.capacity, 5, 5), Tag('sto%s_free' % name, lambda: storage.available, 5, 5), Tag('sto%s_used' % name, lambda: storage.capacity - storage.available, 5, 5), )) # register domains for dom in self.hypervisor.domains.itervalues(): self.tag_db.add_sub_object(dom.name, dom.tags.itervalues()) self.rpc_handler.update(dict( vm_define=self.vm_define, vm_undefine=self.vm_undefine, vm_export=self.vm_export, vm_stop=self.vm_stop, vm_start=self.vm_start, vm_suspend=self.vm_suspend, vm_resume=self.vm_resume, )) self.main.reset_handler('vm_define', self.vm_define) self.main.reset_handler('vm_undefine', self.vm_undefine) self.main.reset_handler('vm_export', self.vm_export) self.main.reset_handler('vm_stop', self.vm_stop) self.main.reset_handler('vm_destroy', self.vm_destroy) self.main.reset_handler('vm_start', self.vm_start) self.main.reset_handler('vm_suspend', self.vm_suspend) self.main.reset_handler('vm_resume', self.vm_resume) self.main.reset_handler('vm_migrate_tunneled', self.vm_migrate_tunneled) self.main.reset_handler('vol_create', self.vol_create) self.main.reset_handler('vol_delete', self.vol_delete) self.main.reset_handler('vol_import', self.vol_import) self.main.reset_handler('vol_import_wait', self.vol_import_wait) self.main.reset_handler('vol_export', self.vol_export) self.main.reset_handler('tun_setup', self.tun_setup) self.main.reset_handler('tun_connect', self.tun_connect) self.main.reset_handler('tun_connect_hv', self.tun_connect_hv) self.main.reset_handler('tun_destroy', self.tun_destroy) # if everything went fine, unregister the timer self.timer.stop() def virt_connect_restart(self): """Restart libvirt connection. This method might be called when libvirt connection is lost. """ if not self.virt_connected: return logger.error('Connection to libvirt lost, trying to restart') # update connection state self.virt_connected = False # unregister tags that will be re registered later for storage in self.hypervisor.storage.storages: self.tag_db.remove_tags(( 'sto%s_state' % storage, 'sto%s_size' % storage, 'sto%s_free' % storage, 'sto%s_used' % storage, )) # unregister sub objects (for the same reason) for sub_id in self.tag_db.keys(): if sub_id == '__main__': continue self.tag_db.remove_sub_object(sub_id) # stop and delete hypervisor instance self.hypervisor.stop() self.hypervisor = None # remove handlers related to libvirt self.main.remove_handler('vm_define') self.main.remove_handler('vm_undefine') self.main.remove_handler('vm_export') self.main.remove_handler('vm_stop') self.main.remove_handler('vm_destroy') self.main.remove_handler('vm_start') self.main.remove_handler('vm_suspend') self.main.remove_handler('vm_resume') self.main.remove_handler('vm_migrate_tunneled') self.main.remove_handler('vol_create') self.main.remove_handler('vol_delete') self.main.remove_handler('vol_import') self.main.remove_handler('vol_import_wait') self.main.remove_handler('vol_export') self.main.remove_handler('tun_setup') self.main.remove_handler('tun_connect') self.main.remove_handler('tun_connect_hv') self.main.remove_handler('tun_destroy') # launch connection timer self.timer.start() def iter_vms(self, vm_names): """Utility function to iterate over VM objects using their names.""" if vm_names is None: return get_domain = self.hypervisor.domains.get for name in vm_names: dom = get_domain(name) if dom is not None: yield dom def vm_define(self, data, format='xml'): logger.debug('VM define') if format != 'xml': raise NotImplementedError('Format not supported') return self.hypervisor.vm_define(data) def vm_undefine(self, name): logger.debug('VM undefine %s', name) vm = self.hypervisor.domains.get(name) if vm is not None: vm.undefine() def vm_export(self, name, format='xml'): logger.debug('VM export %s', name) if format != 'xml': raise NotImplementedError('Format not supported') vm = self.hypervisor.domains.get(name) if vm is None: return return vm.lv_dom.XMLDesc(0) def vm_stop(self, name): logger.debug('VM stop %s', name) try: self.hypervisor.domains[name].stop() except libvirt.libvirtError: logger.exception('Error while stopping VM %s', name) raise except KeyError: msg = 'Cannot stop VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) def vm_destroy(self, name): logger.debug('VM destroy %s', name) try: self.hypervisor.domains[name].destroy() except libvirt.libvirtError as exc: # Libvirt raises exception 'domain is not running' event is domain # is running, might be a bug in libvirt if 'domain is not running' not in str(exc) or ( self.hypervisor.domains[name].state != 'running'): logger.exception('Error while destroying VM %s', name) raise except KeyError: msg = 'Cannot destroy VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) def vm_start(self, name): logger.debug('VM start %s', name) try: self.hypervisor.domains[name].start() except libvirt.libvirtError: logger.exception('Error while starting VM %s', name) raise except KeyError: msg = 'Cannot start VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) def vm_suspend(self, name): logger.debug('VM suspend %s', name) try: self.hypervisor.domains[name].suspend() except libvirt.libvirtError: logger.exception('Error while suspending VM %s', name) raise except KeyError: msg = 'Cannot suspend VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) def vm_resume(self, name): logger.debug('VM resume %s', name) try: self.hypervisor.domains[name].resume() except libvirt.libvirtError: logger.exception('Error while resuming VM %s', name) raise except KeyError: msg = 'Cannot resume VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) def vm_migrate_tunneled(self, name, tun_res, migtun_res, unsafe=False): """Live migrate VM through TCP tunnel. :param name: VM name to migrate :param tun_res: result of tunnel_setup handler :param migtun_res: result of tunnel setup handler :param unsafe: for Libvirt >= 0.9.11, see http://libvirt.org/html/libvirt-libvirt.html#virDomainMigrateFlags """ logger.debug('VM live migrate %s', name) try: remote_virt_port = tun_res['port'] except KeyError: logger.error('Invalid formatted argument tun_res for live' ' migration') raise try: remote_hv_port = migtun_res['port'] except KeyError: logger.error('Invalid formatted argument migtun_res for live' ' migration') raise try: vm = self.hypervisor.domains[name] except KeyError: logger.exception('Cannot find domain %s on hypervisor for live' ' migration', name) raise try: dest_virt_con = libvirt.open( 'qemu+tcp://127.0.0.1:%d/system' % remote_virt_port) except libvirt.libvirtError: logger.exception('Cannot connect to remote libvirt for live' ' migrating vm %s', name) raise try: if unsafe: # VIR_MIGRATE_UNSAFE is not defined for libvirt < 0.9.11 append_flags = getattr(libvirt, 'VIR_MIGRATE_UNSAFE', 0) else: append_flags = 0 vm.lv_dom.migrate( dest_virt_con, libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_PEER2PEER | libvirt.VIR_MIGRATE_TUNNELLED | libvirt.VIR_MIGRATE_PERSIST_DEST | libvirt.VIR_MIGRATE_UNDEFINE_SOURCE | append_flags, None, 'qemu+tcp://127.0.0.1:%d/system' % remote_hv_port, 0, ) except libvirt.libvirtError: # FIXME maybe we should catch some weird crap libvirt bad exception logger.exception('Libvirt error while live migrating vm %s', name) raise finally: dest_virt_con.close() logger.debug('Sucessfuly live migrated vm %s', name) def vol_create(self, pool, name, size): logger.debug('Volume create %s, pool %s, size %s', name, pool, size) try: self.hypervisor.storage.create_volume(pool, name, size) except Exception: logger.exception('Error while creating volume') raise def vol_delete(self, pool, name): logger.debug('Volume delete %s, pool %s', name, pool) try: self.hypervisor.storage.delete_volume(pool, name) except Exception: logger.exception('Error while deleting volume') raise def vol_import(self, pool, name): """ :param pool: pool name where the volume is :param name: name of the volume """ logger.debug('Volume import pool = %s, volume = %s', pool, name) try: pool = self.hypervisor.storage.get_storage(pool) if pool is None: raise Exception('Pool storage does not exist') # TODO exc volume = pool.volumes.get(name) if volume is None: raise Exception('Volume does not exist') # create the job job = self.main.job_manager.create(ImportVolume, volume) job.start() except Exception: logger.exception('Error while starting import job') raise return dict(id=job.id, port=job.port) def vol_import_wait(self, job_id): """Block until completion of the given job id.""" job = self.main.job_manager.get(job_id) logger.debug('Waiting for import job to terminate') job.join() logger.debug('Import job terminated') return dict(id=job.id, log='', checksum=job.checksum) def vol_import_cancel(self, job_id): """Cancel import job.""" logger.debug('Cancel import job') self.main.job_manager.cancel(job_id) def vol_export(self, pool, name, raddr, rport): """ :param pool: pool name where the volume is :param name: name of the volume :param raddr: IP address of the destination to send the volume to :param rport: TCP port of the destination """ pool = self.hypervisor.storage.get_storage(pool) if pool is None: raise Exception('Pool storage does not exist') volume = pool.volumes.get(name) if volume is None: raise Exception('Volume does not exist') try: job = self.main.job_manager.create(ExportVolume, volume, raddr, rport) # we don't run the job in a background thread thus exceptions are # sent to the sjRPC job.start_current() # otherwise we would do # job.start() # job.join() except Exception: logger.exception('Error while exporting volume') raise logger.debug('Export volume successfull') return dict(id=job.id, log='', checksum=job.checksum) @threadless def tun_setup(self, local=True): """Set up local tunnel and listen on a random port. :param local: indicate if we should listen on localhost or all interfaces """ logger.debug('Tunnel setup: local = %s', local) # create job job = self.main.job_manager.create(TCPTunnel, self.main.evloop) job.setup_listen('127.0.0.1' if local else '0.0.0.0') return dict( jid=job.id, key='FIXME', port=job.port, ) @threadless def tun_connect(self, res, remote_res, remote_ip): """Connect tunnel to the other end. :param res: previous result of `tun_setup` handler :param remote_res: other end result of `tun_setup` handler :param remote_ip: where to connect """ logger.debug('Tunnel connect %s %s', res['jid'], remote_ip) job = self.main.job_manager.get(res['jid']) job.setup_connect((remote_ip, remote_res['port'])) @threadless def tun_connect_hv(self, res, migration=False): """Connect tunnel to local libvirt Unix socket. :param res: previous result of `tun_setup` handler """ logger.debug('Tunnel connect hypervisor %s', res['jid']) job = self.main.job_manager.get(res['jid']) job.setup_connect('/var/run/libvirt/libvirt-sock') @threadless def tun_destroy(self, res): """Close given tunnel. :param res: previous result as givent by `tun_setup` handler """ logger.debug('Tunnel destroy %s', res['jid']) self.main.job_manager.cancel(res['jid']) self.main.job_manager.remove(res['jid']) class Hypervisor(object): """Container for all hypervisor related state.""" def __init__(self, name, handler): """ :param str name: name of hypervisor instance :param Handler handler: hypervisor handler """ self.handler = weakref.proxy(handler) #: hv attributes self.name = name self.type = u'kvm' # register libvirt error handler libvirt.registerErrorHandler(self.vir_error_cb, None) # libvirt event loop abstraction self.vir_event_loop = VirEventLoop(self.handler.main.evloop) # This tells libvirt what event loop implementation it # should use libvirt.virEventRegisterImpl( self.vir_event_loop.add_handle, self.vir_event_loop.update_handle, self.vir_event_loop.remove_handle, self.vir_event_loop.add_timer, self.vir_event_loop.update_timer, self.vir_event_loop.remove_timer, ) self.vir_con = libvirt.open('qemu:///system') # currently only support KVM # findout storage self.storage = StorageIndex(self.vir_con) logger.debug('Storages: %s', self.storage.paths) #: domains: vms, containers... self.domains = dict() # find defined domains for dom_name in self.vir_con.listDefinedDomains(): dom = self.vir_con.lookupByName(dom_name) self.domains[dom.name()] = VirtualMachine(dom, self) # find started domains for dom_id in self.vir_con.listDomainsID(): dom = self.vir_con.lookupByID(dom_id) self.domains[dom.name()] = VirtualMachine(dom, self) logger.debug('Domains: %s', self.domains) self.vir_con.domainEventRegister(self.vir_cb, None) # TODO find out args def stop(self): self.vir_event_loop.stop() # unregister callback try: self.vir_con.domainEventDeregister(self.vir_cb) except libvirt.libvirtError: # in case the libvirt connection is broken, it will raise the error pass ret = self.vir_con.close() logger.debug('Libvirt still handling %s ref connections', ret) # TODO delet objects def vir_error_cb(self, ctxt, err): """Libvirt error callback. See http://libvirt.org/errors.html for more informations. :param ctxt: arbitrary context data (not needed because context is givent by self :param err: libvirt error code """ logger.error('Libvirt error %s', err) def vir_cb(self, conn, dom, event, detail, opaque): """Callback for libvirt event loop.""" logger.debug('Received event %s on domain %s, detail %s', event, dom.name(), detail) event = EVENTS[event] if event == 'Added': vm = VirtualMachine(dom, self) logger.info('Created domain %s', vm.name) self.domains[vm.name] = vm self.handler.tag_db.add_sub_object(vm.name, vm.tags.itervalues()) elif event == 'Removed': vm = self.domains.pop(dom.name()) logger.info('Removed domain %s', vm.name) self.handler.tag_db.remove_sub_object(vm.name) elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved', 'Restored'): vm = self.domains.get(dom.name()) # sometimes libvirt sent a start event before a created event so be # careful if vm is not None: state = DOMAIN_STATES[dom.info()[0]] logger.info('Domain change state from %s to %s', vm.state, state) vm.state = state # update domain state counts for tag in ('nvm', 'vmpaused', 'vmstarted', 'vmstopped'): self.handler.tag_db['__main__'][tag].update_value() def vm_define(self, xml_desc): """Create a VM on the Hypervisor :param str xml_desc: XML description in libvirt format :return: VM name created """ try: return self.vir_con.defineXML(xml_desc).name() except libvirt.libvirtError: logger.exception('Error while creating domain') # reraise exception for the cc-server raise def _count_domain(self, filter=lambda d: True): count = 0 for dom in self.domains.itervalues(): if filter(dom): count += 1 return count @property def vm_started(self): """Number of VMs started.""" return self._count_domain(lambda d: d.state == 'running') @property def vm_stopped(self): """Number of VMs stopped.""" return self._count_domain(lambda d: d.state == 'stopped') @property def vm_paused(self): """Number of VMs paused.""" return self._count_domain(lambda d: d.state == 'paused') @property def vm_total(self): """Total number of VMs on the hypervisor.""" return self._count_domain() class StorageIndex(object): """Keep an index of all storage volume paths.""" def __init__(self, lv_con): """ :param lv_con: Libvirt connection """ self.storages = dict( (s.name, s) for s in imap( Storage, imap( lv_con.storagePoolLookupByName, chain( lv_con.listDefinedStoragePools(), lv_con.listStoragePools(), ), ), ), ) self.paths = dict( (v.path, v) for v in chain.from_iterable(imap( lambda s: s.volumes.itervalues(), self.storages.itervalues(), )), ) def get_volume(self, path): return self.paths.get(path) def get_storage(self, name): return self.storages.get(name) def create_volume(self, pool_name, volume_name, capacity): """Create a new volume in the storage pool. :param str name: name for the volume :param int capacity: size for the volume """ # get volume logger.debug('asked pool %s', pool_name) logger.debug('Pool state %s', self.storages) try: pool = self.storages[pool_name] except KeyError: raise PoolStorageError('Invalid pool name') if pool is None: raise Exception('Storage pool not found') try: new_volume = pool.lv_storage.createXML(""" %s %d """ % (volume_name, capacity), 0) except libvirt.libvirtError: logger.exception('Error while creating volume') raise new_volume = Volume(new_volume) # if success add the volume to the index self.paths[new_volume.path] = new_volume # and also to its storage pool self.storages[new_volume.storage].volumes[new_volume.name] = new_volume def delete_volume(self, pool_name, volume_name): """Delete a volume in the givent storage pool. :param str pool_name: name for the storage pool :param str volume_name: name for the volume """ # get volume try: pool = self.storages[pool_name] except KeyError: raise PoolStorageError('Invalid pool name') try: volume = pool.volumes[volume_name] except KeyError: raise PoolStorageError('Invalid volume name') # delete from index del self.paths[volume.path] del self.storages[pool_name].volumes[volume_name] # delete volume try: volume.lv_volume.delete(0) except libvirt.libvirtError: logger.exception('Error while deleting volume') raise class Storage(object): """Storage abstraction.""" def __init__(self, lv_storage): """ :param lv_storage: Libvirt pool storage instance """ self.uuid = lv_storage.UUID() self.name = lv_storage.name() self.lv_storage = lv_storage self.state, self.capacity, self.allocation, self.available = lv_storage.info() self.state = STORAGE_STATES[self.state] self.volumes = dict((v.name, v) for v in imap( Volume, (lv_storage.storageVolLookupByName(n) for n in lv_storage.listVolumes()), )) class Volume(object): """Volume abstraction.""" def __init__(self, lv_volume): """ :param lv_volume: Libvirt volume instance """ self.storage = lv_volume.storagePoolLookupByVolume().name() self.path = lv_volume.path() self.name = lv_volume.name() self.capacity, self.allocation = lv_volume.info()[1:] self.lv_volume = lv_volume