import logging import weakref from itertools import chain, imap import libvirt from ccnode.host import Handler as HostHandler from ccnode.tags import Tag, tag_inspector, get_tags from ccnode.hypervisor import tags from ccnode.hypervisor import lib as _libvirt from ccnode.hypervisor.lib import DOMAIN_STATES, EVENTS, STORAGE_STATES, EventLoop from ccnode.hypervisor.domains import VirtualMachine logger = logging.getLogger(__name__) class Handler(HostHandler): def __init__(self, *args, **kwargs): """ :param proxy: sjRpc proxy :param hypervisor_name: hypervisor name """ HostHandler.__init__(self, *args, **kwargs) for t in tag_inspector(tags, self): self.tags[t.name] = t # initialize hypervisor instance global hypervisor if hypervisor is None: hypervisor = Hypervisor( name=kwargs.pop('hypervisor_name', None), proxy=kwargs['proxy'], ) else: hypervisor.sjproxy = weakref.proxy(kwargs['proxy']) self.hypervisor = weakref.proxy(hypervisor) # register hypervisor storage tags for name, storage in self.hypervisor.storage.storages.iteritems(): for t in ( Tag('sto%s_state' % name, lambda: storage.state, 5), Tag('sto%s_size' % name, lambda: storage.capacity, 5), Tag('sto%s_free' % name, lambda: storage.available, 5), Tag('sto%s_used' % name, lambda: storage.capacity - storage.available, 5), ): self.tags[t.name] = t # register domains proxy = kwargs.pop('proxy') for dom in hypervisor.domains.itervalues(): name = dom.name logger.debug('Registered domain %s', name) proxy.register(name, 'vm') def sub_tags(self, sub_id, tags=None, noresolve_tags=None): """Get subtags.""" global hypervisor domain = hypervisor.domains.get(sub_id) if domain is None: logger.debug('Failed to find domain with name %s.', sub_id) return logger.debug('Get tags for sub object: %s', sub_id) return get_tags(domain, tags, noresolve_tags) def iter_vms(self, vm_names): """Utility function to iterate over VM objects using their names.""" if vm_names is None: return # get_domain = self.hypervisor.domains.get get_domain = self.hypervisor.domains.get for name in vm_names: dom = get_domain(name) if dom is not None: yield dom def vm_define(self, data, format='xml'): logger.debug('VM define') if format != 'xml': raise NotImplementedError('Format not supported') try: return _libvirt.connection.defineXML(data).name() except libvirt.libvirtError: logger.exception('Error while creating domain') def vm_undefine(self, name): logger.debug('VM undefin') vm = self.hypervisor.domains.get(name) if vm is not None: vm.undefine() def vm_export(self, name, format='xml'): if format != 'xml': raise NotImplementedError('Format not supported') vm = self.hypervisor.domains.get(name) if vm is None: return return vm.lv_dom.XMLDesc(0) def vm_stop(self, vm_names=None, force=False): logger.debug('VM stop') for vm in self.iter_vms(vm_names): if force: vm.destroy() else: vm.stop() def vm_start(self, vm_names=None): logger.debug('VM start') for vm in self.iter_vms(vm_names): vm.start() def vm_suspend(self, vm_names=None): logger.debug('VM suspend') for vm in self.iter_vms(vm_names): vm.suspend() def vm_resume(self, vm_names=None): logger.debug('VM resume') for vm in self.iter_vms(vm_names): vm.resume() class Hypervisor(object): """Container for all hypervisor related state.""" def __init__(self, name, proxy): """ :param str name: name of hypervisor instance :param proxy: sjRpc proxy """ self.sjproxy = weakref.proxy(proxy) #: hv attributes self.name = name self.type = u'kvm' self.event_loop = EventLoop() # This tells libvirt what event loop implementation it # should use libvirt.virEventRegisterImpl( self.event_loop.add_handle, self.event_loop.update_handle, self.event_loop.remove_handle, self.event_loop.add_timer, self.event_loop.update_timer, self.event_loop.remove_timer, ) self.event_loop.start() # TODO cleanup connection on stop _libvirt.connection = libvirt.open('qemu:///system') # currently only support KVM # findout storage self.storage = StorageIndex(_libvirt.connection) logger.debug('Storages: %s', self.storage.paths) #: domains: vms, containers... self.domains = dict() # find defined domains for dom_name in _libvirt.connection.listDefinedDomains(): dom = _libvirt.connection.lookupByName(dom_name) self.domains[dom.name()] = VirtualMachine(dom, self) # find started domains for dom_id in _libvirt.connection.listDomainsID(): dom = _libvirt.connection.lookupByID(dom_id) self.domains[dom.name()] = VirtualMachine(dom, self) logger.debug('Domains: %s', self.domains) self.event_loop.register_callbacks(self.callback) def callback(self, conn, dom, event, detail, opaque): """Callback for libvirt event loop.""" logger.debug('Received event %s on domain %s, detail %s', event, dom.name(), detail) event = EVENTS[event] if event == 'Added': vm = VirtualMachine(dom, self) self.domains[vm.name] = vm self.sjproxy.register(vm.name, 'vm') logger.info('Add domain: %s (%s)', vm.name, vm.uuid) elif event == 'Removed': logger.debug('About to remove domain') vm = self.domains.pop(dom.name()) self.sjproxy.unregister(vm.name) logger.info('Delete domain: %s (%s)', vm.name, vm.uuid) elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved', 'Restored'): vm = self.domains.get(dom.name()) # sometimes libvirt sent a start event before a created event so be # careful if vm is not None: state = DOMAIN_STATES[dom.info()[0]] logger.info('Domain change state from %s to %s', vm.state, state) vm.state = state def _count_domain(self, filter=lambda d: True): count = 0 for dom in self.domains.itervalues(): if filter(dom): count += 1 return count @property def vm_started(self): """Number of VMs started.""" return self._count_domain(lambda d: d.state == 'running') @property def vm_stopped(self): """Number of VMs stopped.""" return self._count_domain(lambda d: d.state == 'stopped') @property def vm_paused(self): """Number of VMs paused.""" return self._count_domain(lambda d: d.state == 'paused') @property def vm_total(self): """Total number of VMs on the hypervisor.""" return self._count_domain() hypervisor = None class StorageIndex(object): """Keep an index of all storage volume paths.""" def __init__(self, lv_con): """ :param lv_con: Libvirt connection """ self.storages = dict( (s.name, s) for s in imap( Storage, imap( lv_con.storagePoolLookupByName, chain( lv_con.listDefinedStoragePools(), lv_con.listStoragePools(), ), ), ), ) self.paths = dict( (v.path, v) for v in chain.from_iterable(map( lambda s: s.volumes, self.storages.itervalues(), )), ) def get_volume(self, path): return self.paths.get(path) def get_storage(self, name): return self.Storage.get(name) class Storage(object): """Storage abstraction.""" def __init__(self, lv_storage): """ :param lv_storage: Libvirt pool storage instance """ self.uuid = lv_storage.UUID() self.name = lv_storage.name() self.state, self.capacity, self.allocation, self.available = lv_storage.info() self.state = STORAGE_STATES[self.state] self.volumes = map( Volume, (lv_storage.storageVolLookupByName(n) for n in lv_storage.listVolumes()), ) class Volume(object): """Volume abstraction.""" def __init__(self, lv_volume): """ :param lv_volume: Libvirt volume instance """ self.storage = lv_volume.storagePoolLookupByVolume().name() self.path = lv_volume.path() self.name = lv_volume.name() self.capacity, self.allocation = lv_volume.info()[1:]