import logging import weakref from functools import partial from itertools import chain, imap import libvirt from ccnode.host import Handler as HostHandler from ccnode.tags import tag_inspector, get_tags from ccnode.hypervisor import tags from ccnode.hypervisor import lib as _libvirt from ccnode.hypervisor.lib import EVENTS, EventLoop from ccnode.hypervisor.domains import VirtualMachine logger = logging.getLogger(__name__) class Handler(HostHandler): def __init__(self, *args, **kwargs): """ :param proxy: sjRpc proxy :param hypervisor_name: hypervisor name """ HostHandler.__init__(self, *args, **kwargs) for t in tag_inspector(tags, self): self.tags[t.name] = t # set tag hv # self.tags['hv'] = Tag('hv', ) # initialize hypervisor instance global hypervisor if hypervisor is None: hypervisor = Hypervisor(kwargs.pop('hypervisor_name', None)) self.hypervisor = weakref.proxy(hypervisor) # register domains proxy = kwargs.pop('proxy') for dom in hypervisor.domains.itervalues(): name = dom.name logger.debug(u'Registered domain %s' % name) proxy.register(name, 'vm') def sub_tags(self, sub_id, tags=None, noresolve_tags=None): """Get subtags.""" global hypervisor domain = hypervisor.get_domain_by_name(sub_id) if domain is None: logger.debug(u'Failed to find domain with name %s.' % sub_id) return return get_tags(domain, tags, noresolve_tags) def vm_define(self, name): pass def vm_undefine(self, name): pass def vm_export(self, name, format='xml'): pass def vm_stop(self, vm_names=None, force=False): pass def vm_start(self, vm_names=None): pass def vm_suspend(self, vm_names=None): pass def vm_resume(self, vm_names=None): pass class Hypervisor(object): """Container for all hypervisor related state.""" def __init__(self, name=None): #: hv attributes self.name = name self.type = u'kvm' self.event_loop = EventLoop() # This tells libvirt what event loop implementation it # should use libvirt.virEventRegisterImpl( self.event_loop.add_handle, self.event_loop.update_handle, self.event_loop.remove_handle, self.event_loop.add_timer, self.event_loop.update_timer, self.event_loop.remove_timer, ) self.event_loop.start() # TODO cleanup connection on stop _libvirt.connection = libvirt.open('qemu:///system') # currently only support KVM # findout storage self.storage = StorageIndex(_libvirt.connection) logger.debug('Storages: %s', self.storage.paths) #: domains: vms, containers... self.domains = dict() # find defined domains for dom_name in _libvirt.connection.listDefinedDomains(): dom = _libvirt.connection.lookupByName(dom_name) self.domains[dom.UUID()] = VirtualMachine(dom, self) # find started domains for dom_id in _libvirt.connection.listDomainsID(): dom = _libvirt.connection.lookupByID(dom_id) self.domains[dom.UUID()] = VirtualMachine(dom, self) logger.debug('Domains: %s', self.domains) self.event_loop.register_callbacks(self.callback) def callback(self, conn, dom, event, detail, opaque): """Callback for libvirt event loop.""" logger.debug("myDomainEventCallback1 EVENT: Domain %s(%s) %s %d" % (dom.name(), dom.UUIDString(), EVENTS[event], detail)) event = EVENTS[event] if event == 'Added': self.domains[dom.UUID()] = VirtualMachine(dom) def get_domain_by_name(self, name): """Get a domain by name.""" for d in self.domains.itervalues(): if d.name == name: return d return None def _count_domain(self, filter=lambda d: True): count = 0 for dom in self.domains.itervalues(): if filter(dom): count += 1 return count @property def vm_started(self): """Number of VMs started.""" return self._count_domain(lambda d: d.state == 'running') @property def vm_stopped(self): """Number of VMs stopped.""" return self._count_domain(lambda d: d.state == 'stopped') @property def vm_paused(self): """Number of VMs paused.""" return self._count_domain(lambda d: d.state == 'paused') @property def vm_total(self): """Total number of VMs on the hypervisor.""" return self._count_domain() hypervisor = None class StorageIndex(object): """Keep an index of all storage volume paths.""" def __init__(self, lv_con): """ :param lv_con: Libvirt connection """ self.storages = dict( (s.name, s) for s in imap( Storage, imap( lv_con.storagePoolLookupByName, chain( lv_con.listDefinedStoragePools(), lv_con.listStoragePools(), ), ), ), ) self.paths = dict( (v.path, v) for v in chain.from_iterable(map( lambda s: s.volumes, self.storages.itervalues(), )), ) def get_volume(self, path): return self.paths.get(path) def get_storage(self, name): return self.Storage.get(name) class Storage(object): """Storage abstraction.""" def __init__(self, lv_storage): """ :param lv_storage: Libvirt pool storage instance """ self.uuid = lv_storage.UUID() self.name = lv_storage.name() self.volumes = map( partial(Volume, storage=self), (lv_storage.storageVolLookupByName(n) for n in lv_storage.listVolumes()), ) class Volume(object): """Volume abstraction.""" def __init__(self, lv_volume, storage): """ :param lv_volume: Libvirt volume instance :param storage: parent storage instance """ # self.storage = None if storage is None else weakref.proxy(storage) self.storage = lv_volume.storagePoolLookupByVolume().name() self.path = lv_volume.path() self.name = lv_volume.name() self.capacity, self.allocation = lv_volume.info()[1:]