-
Anael Beutot authored5a57e4d2
__init__.py 9.34 KiB
import logging
import weakref
from itertools import chain, imap
import libvirt
from ccnode.host import Handler as HostHandler
from ccnode.tags import Tag, tag_inspector, get_tags
from ccnode.hypervisor import tags
from ccnode.hypervisor import lib as _libvirt
from ccnode.hypervisor.lib import DOMAIN_STATES, EVENTS, STORAGE_STATES, EventLoop
from ccnode.hypervisor.domains import VirtualMachine
logger = logging.getLogger(__name__)
class Handler(HostHandler):
def __init__(self, *args, **kwargs):
"""
:param proxy: sjRpc proxy
:param hypervisor_name: hypervisor name
"""
HostHandler.__init__(self, *args, **kwargs)
for t in tag_inspector(tags, self):
self.tags[t.name] = t
# initialize hypervisor instance
global hypervisor
if hypervisor is None:
hypervisor = Hypervisor(
name=kwargs.pop('hypervisor_name', None),
proxy=kwargs['proxy'],
)
else:
hypervisor.sjproxy = weakref.proxy(kwargs['proxy'])
self.hypervisor = weakref.proxy(hypervisor)
# register hypervisor storage tags
for name, storage in self.hypervisor.storage.storages.iteritems():
for t in (
Tag('sto%s_state' % name, lambda: storage.state, 5),
Tag('sto%s_size' % name, lambda: storage.capacity, 5),
Tag('sto%s_free' % name, lambda: storage.available, 5),
Tag('sto%s_used' % name,
lambda: storage.capacity - storage.available, 5),
):
self.tags[t.name] = t
# register domains
proxy = kwargs.pop('proxy')
for dom in hypervisor.domains.itervalues():
name = dom.name
logger.debug('Registered domain %s', name)
proxy.register(name, 'vm')
def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
"""Get subtags."""
global hypervisor
domain = hypervisor.domains.get(sub_id)
if domain is None:
logger.debug('Failed to find domain with name %s.', sub_id)
return
logger.debug('Get tags for sub object: %s', sub_id)
return get_tags(domain, tags, noresolve_tags)
def iter_vms(self, vm_names):
"""Utility function to iterate over VM objects using their names."""
if vm_names is None:
return
# get_domain = self.hypervisor.domains.get
get_domain = self.hypervisor.domains.get
for name in vm_names:
dom = get_domain(name)
if dom is not None:
yield dom
def vm_define(self, data, format='xml'):
logger.debug('VM define')
if format != 'xml':
raise NotImplementedError('Format not supported')
try:
return _libvirt.connection.defineXML(data).name()
except libvirt.libvirtError:
logger.exception('Error while creating domain')
def vm_undefine(self, name):
logger.debug('VM undefin')
vm = self.hypervisor.domains.get(name)
if vm is not None:
vm.undefine()
def vm_export(self, name, format='xml'):
if format != 'xml':
raise NotImplementedError('Format not supported')
vm = self.hypervisor.domains.get(name)
if vm is None:
return
return vm.lv_dom.XMLDesc(0)
def vm_stop(self, vm_names=None, force=False):
logger.debug('VM stop')
for vm in self.iter_vms(vm_names):
if force:
vm.destroy()
else:
vm.stop()
def vm_start(self, vm_names=None):
logger.debug('VM start')
for vm in self.iter_vms(vm_names):
vm.start()
def vm_suspend(self, vm_names=None):
logger.debug('VM suspend')
for vm in self.iter_vms(vm_names):
vm.suspend()
def vm_resume(self, vm_names=None):
logger.debug('VM resume')
for vm in self.iter_vms(vm_names):
vm.resume()
class Hypervisor(object):
"""Container for all hypervisor related state."""
def __init__(self, name, proxy):
"""
:param str name: name of hypervisor instance
:param proxy: sjRpc proxy
"""
self.sjproxy = weakref.proxy(proxy)
#: hv attributes
self.name = name
self.type = u'kvm'
self.event_loop = EventLoop()
# This tells libvirt what event loop implementation it
# should use
libvirt.virEventRegisterImpl(
self.event_loop.add_handle,
self.event_loop.update_handle,
self.event_loop.remove_handle,
self.event_loop.add_timer,
self.event_loop.update_timer,
self.event_loop.remove_timer,
)
self.event_loop.start()
# TODO cleanup connection on stop
_libvirt.connection = libvirt.open('qemu:///system') # currently only support KVM
# findout storage
self.storage = StorageIndex(_libvirt.connection)
logger.debug('Storages: %s', self.storage.paths)
#: domains: vms, containers...
self.domains = dict()
# find defined domains
for dom_name in _libvirt.connection.listDefinedDomains():
dom = _libvirt.connection.lookupByName(dom_name)
self.domains[dom.name()] = VirtualMachine(dom, self)
# find started domains
for dom_id in _libvirt.connection.listDomainsID():
dom = _libvirt.connection.lookupByID(dom_id)
self.domains[dom.name()] = VirtualMachine(dom, self)
logger.debug('Domains: %s', self.domains)
self.event_loop.register_callbacks(self.callback)
def callback(self, conn, dom, event, detail, opaque):
"""Callback for libvirt event loop."""
logger.debug('Received event %s on domain %s, detail %s', event,
dom.name(), detail)
event = EVENTS[event]
if event == 'Added':
vm = VirtualMachine(dom, self)
self.domains[vm.name] = vm
self.sjproxy.register(vm.name, 'vm')
logger.info('Add domain: %s (%s)', vm.name, vm.uuid)
elif event == 'Removed':
logger.debug('About to remove domain')
vm = self.domains.pop(dom.name())
self.sjproxy.unregister(vm.name)
logger.info('Delete domain: %s (%s)', vm.name, vm.uuid)
elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved',
'Restored'):
vm = self.domains.get(dom.name())
# sometimes libvirt sent a start event before a created event so be
# careful
if vm is not None:
state = DOMAIN_STATES[dom.info()[0]]
logger.info('Domain change state from %s to %s', vm.state,
state)
vm.state = state
def _count_domain(self, filter=lambda d: True):
count = 0
for dom in self.domains.itervalues():
if filter(dom):
count += 1
return count
@property
def vm_started(self):
"""Number of VMs started."""
return self._count_domain(lambda d: d.state == 'running')
@property
def vm_stopped(self):
"""Number of VMs stopped."""
return self._count_domain(lambda d: d.state == 'stopped')
@property
def vm_paused(self):
"""Number of VMs paused."""
return self._count_domain(lambda d: d.state == 'paused')
@property
def vm_total(self):
"""Total number of VMs on the hypervisor."""
return self._count_domain()
hypervisor = None
class StorageIndex(object):
"""Keep an index of all storage volume paths."""
def __init__(self, lv_con):
"""
:param lv_con: Libvirt connection
"""
self.storages = dict(
(s.name, s) for s in imap(
Storage,
imap(
lv_con.storagePoolLookupByName,
chain(
lv_con.listDefinedStoragePools(),
lv_con.listStoragePools(),
),
),
),
)
self.paths = dict(
(v.path, v) for v in chain.from_iterable(map(
lambda s: s.volumes,
self.storages.itervalues(),
)),
)
def get_volume(self, path):
return self.paths.get(path)
def get_storage(self, name):
return self.Storage.get(name)
class Storage(object):
"""Storage abstraction."""
def __init__(self, lv_storage):
"""
:param lv_storage: Libvirt pool storage instance
"""
self.uuid = lv_storage.UUID()
self.name = lv_storage.name()
self.state, self.capacity, self.allocation, self.available = lv_storage.info()
self.state = STORAGE_STATES[self.state]
self.volumes = map(
Volume,
(lv_storage.storageVolLookupByName(n) for n in
lv_storage.listVolumes()),
)
class Volume(object):
"""Volume abstraction."""
def __init__(self, lv_volume):
"""
:param lv_volume: Libvirt volume instance
"""
self.storage = lv_volume.storagePoolLookupByVolume().name()
self.path = lv_volume.path()
self.name = lv_volume.name()
self.capacity, self.allocation = lv_volume.info()[1:]