Loading ccnode/hypervisor/__init__.py +87 −17 Original line number Diff line number Diff line Loading @@ -17,26 +17,46 @@ from ccnode.hypervisor.domains import VirtualMachine logger = logging.getLogger(__name__) # FIXME find a way to refactor Handler and Hypervisor class class Handler(HostHandler): def __init__(self, *args, **kwargs): """ :param loop: MainLoop instance :param hypervisor_name: hypervisor name """ hypervisor_name = kwargs.pop('hypervisor_name') self.hypervisor_name = kwargs.pop('hypervisor_name') HostHandler.__init__(self, *args, **kwargs) #: keep index of asynchronous calls self.async_calls = dict() self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb) self.hypervisor = None self.virt_connected = False def start(self): self.timer.start() HostHandler.start(self) def stop(self): self.timer.stop() if self.hypervisor is not None: self.hypervisor.stop() HostHandler.stop(self) def virt_connect_cb(self, *args): # initialize hypervisor instance # FIXME this may block try: self.hypervisor = Hypervisor( name=hypervisor_name, loop=self.main, name=self.hypervisor_name, handler=self, ) except libvirt.libvirtError: logger.exception('Error while connecting to libvirt') return self.virt_connected = True # FIXME this may block # register hypervisor storage tags for name, storage in self.hypervisor.storage.storages.iteritems(): for t in ( Loading @@ -47,6 +67,7 @@ class Handler(HostHandler): lambda: storage.capacity - storage.available, 5), ): self.tag_db['__main__'][t.name] = t self.main.reset_tag(t) # register domains for dom in self.hypervisor.domains.itervalues(): Loading @@ -59,9 +80,12 @@ class Handler(HostHandler): 'vm', )] = name self.tag_db['__main__'].update(dict( hv_tags = dict( (t.name, t) for t in tag_inspector(tags, self), )) ) self.tag_db['__main__'].update(hv_tags) for tag in hv_tags.itervalues(): self.main.reset_tag(tag) self.rpc_handler.update(dict( vm_define=self.vm_define, Loading @@ -72,8 +96,53 @@ class Handler(HostHandler): vm_suspend=self.vm_suspend, vm_resume=self.vm_resume, )) self.main.reset_handler('vm_define', self.vm_define) self.main.reset_handler('vm_undefine', self.vm_undefine) self.main.reset_handler('vm_export', self.vm_export) self.main.reset_handler('vm_stop', self.vm_stop) self.main.reset_handler('vm_start', self.vm_start) self.main.reset_handler('vm_suspend', self.vm_suspend) self.main.reset_handler('vm_resume', self.vm_resume) # if everything went fine, unregister the timer self.timer.stop() def virt_connect_restart(self): """Restart libvirt connection. This method might be called when libvirt connection is lost. """ if not self.virt_connected: return logger.error('Connection to libvirt lost, trying to restart') # update connection state self.virt_connected = False # unregister tags that will be re register later for storage in self.hypervisor.storage.storages: self.main.remove_tag('sto%s_state' % storage) self.main.remove_tag('sto%s_size' % storage) self.main.remove_tag('sto%s_free' % storage) self.main.remove_tag('sto%s_used' % storage) self.tag_db['__main__'].pop('sto%s_state' % storage) self.tag_db['__main__'].pop('sto%s_size' % storage) self.tag_db['__main__'].pop('sto%s_free' % storage) self.tag_db['__main__'].pop('sto%s_used' % storage) # unregister sub objects (for the same reason) for sub_id in self.tag_db.keys(): if sub_id == '__main__': continue self.main.remove_sub_object(sub_id) self.tag_db.pop(sub_id) # stop and delete hypervisor instance self.hypervisor.stop() self.hypervisor = None # launch connection timer self.timer.start() # FIXME duplicate code def register_domain_cb(self, call_id, response=None, error=None): """RPC callback used when registering a domain on the cc-server.""" name = self.async_calls.pop(call_id) if error is not None: logger.error('Error while registering domain, %s("%s")', Loading Loading @@ -149,14 +218,13 @@ class Handler(HostHandler): class Hypervisor(object): """Container for all hypervisor related state.""" def __init__(self, name, loop): def __init__(self, name, handler): """ :param str name: name of hypervisor instance :param loop: MainLoop instance :param Handler handler: hypervisor handler """ #: parent MainLoop self.main = weakref.proxy(loop) self.rpc_con = loop.rpc_con self.handler = weakref.proxy(handler) self.rpc_con = handler.main.rpc_con self.async_calls = dict() #: hv attributes Loading @@ -164,7 +232,7 @@ class Hypervisor(object): self.type = u'kvm' # libvirt event loop abstraction self.vir_event_loop = VirEventLoop(self.main.evloop) self.vir_event_loop = VirEventLoop(self.handler.main.evloop) # This tells libvirt what event loop implementation it # should use libvirt.virEventRegisterImpl( Loading Loading @@ -241,6 +309,7 @@ class Hypervisor(object): vm.state = state def register_cb(self, call_id, response=None, error=None): """RPC callback for registering domain to cc-server.""" vm = self.domains[self.async_calls.pop(call_id)] if error is not None: logger.error('Error while registering domain to server, %s("%s")', Loading @@ -248,15 +317,16 @@ class Hypervisor(object): logger.info('Add domain: %s (%s)', vm.name, vm.uuid) # add tags for tag in vm.tags.itervalues(): self.main.reset_sub_tag(vm.name, tag) self.handler.main.reset_sub_tag(vm.name, tag) def unregister_cb(self, call_id, response=None, error=None): """RPC callback for unregistering domain to cc-server.""" vm_name = self.async_calls.pop(call_id) if error is not None: logger.error('Error while unregistering domain to server, %s("%s")', error['exception'], error['message']) logger.info('Delete domain: %s', vm_name) self.main.remove_sub_object(vm_name) self.handler.main.remove_sub_object(vm_name) def vm_define(self, xml_desc): """Create a VM on the Hypervisor Loading ccnode/hypervisor/domains/vm_tags.py +33 −0 Original line number Diff line number Diff line import logging from functools import wraps from xml.etree import cElementTree as et from StringIO import StringIO import libvirt from ccnode.tags import ttl, refresh logger = logging.getLogger(__name__) def _vir_tag(func): """Catches libvirt related exception. Decorator used for tag declarations that interacts with libvirt. """ @wraps(func) def decorated(dom): if not dom.hypervisor.handler.virt_connected: return try: func(dom) except libvirt.libvirtError: logger.exception('Unexpected libvirt error') dom.hypervisor.handler.virt_connect_restart() return decorated def uuid(dom): """Unique identifier of the domain.""" return dom.uuid Loading @@ -23,6 +49,7 @@ def htype(dom): return dom.hypervisor.type @_vir_tag def arch(dom): """VM CPU architecture.""" try: Loading @@ -37,6 +64,7 @@ def h(dom): return dom.name @_vir_tag def cpu(dom): """Number of CPU of the VM.""" return dom.lv_dom.info()[3] Loading @@ -48,16 +76,19 @@ def cpuuse(): pass @_vir_tag def mem(dom): """Memory currently allocated.""" return dom.lv_dom.info()[2] * 1024 @_vir_tag def memmax(dom): """Maximum memory allocation.""" return dom.lv_dom.info()[1] * 1024 @_vir_tag def vncport(dom): """VNC port for the VM console access.""" try: Loading @@ -70,6 +101,7 @@ def vncport(dom): @ttl(10) @refresh(10) @_vir_tag def disk(dom): """Get backend disks.""" return u' '.join(map(str, xrange(len(dom.disks)))) or None Loading @@ -77,6 +109,7 @@ def disk(dom): @ttl(10) @refresh(10) @_vir_tag def nic(dom): """VM network interfaces.""" return u' '.join(map(str, xrange(len(dom.nics)))) or None ccnode/hypervisor/lib.py +20 −5 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import logging from itertools import chain from functools import wraps import pyev import libvirt Loading @@ -10,11 +11,6 @@ import libvirt logger = logging.getLogger(__name__) # FIXME bad global variable #: connection to the libvirt connection = None # TODO create a watcher thread for this #: corresponding name for enum state of libvirt domains # see http://libvirt.org/html/libvirt-libvirt.html#virDomainState DOMAIN_STATES = ( Loading Loading @@ -52,6 +48,25 @@ EVENTS = ( ) def vir_tag(func): """Catches libvirt related exception. Decorator used for tag declarations that interacts with libvirt. """ @wraps(func) def decorated(handl): if not handl.virt_connected: return try: func(handl) except libvirt.libvirtError: logger.exception('Unexpected libvirt error') handl.vir_con_restart() return decorated # following event loop implementation was inspired by libvirt python example # but updated to work with libev class LoopHandler(object): Loading ccnode/hypervisor/tags.py +3 −0 Original line number Diff line number Diff line from ccnode.utils import and_ from ccnode.hypervisor.lib import vir_tag # hypervisor related tags Loading Loading @@ -38,11 +39,13 @@ def hvm(): return None @vir_tag def hvver(handl): """Hypervisor version.""" return handl.hypervisor.vir_con.getVersion() @vir_tag def libvirtver(handl): """Version of running libvirt.""" return handl.hypervisor.vir_con.getLibVersion() Loading Loading
ccnode/hypervisor/__init__.py +87 −17 Original line number Diff line number Diff line Loading @@ -17,26 +17,46 @@ from ccnode.hypervisor.domains import VirtualMachine logger = logging.getLogger(__name__) # FIXME find a way to refactor Handler and Hypervisor class class Handler(HostHandler): def __init__(self, *args, **kwargs): """ :param loop: MainLoop instance :param hypervisor_name: hypervisor name """ hypervisor_name = kwargs.pop('hypervisor_name') self.hypervisor_name = kwargs.pop('hypervisor_name') HostHandler.__init__(self, *args, **kwargs) #: keep index of asynchronous calls self.async_calls = dict() self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb) self.hypervisor = None self.virt_connected = False def start(self): self.timer.start() HostHandler.start(self) def stop(self): self.timer.stop() if self.hypervisor is not None: self.hypervisor.stop() HostHandler.stop(self) def virt_connect_cb(self, *args): # initialize hypervisor instance # FIXME this may block try: self.hypervisor = Hypervisor( name=hypervisor_name, loop=self.main, name=self.hypervisor_name, handler=self, ) except libvirt.libvirtError: logger.exception('Error while connecting to libvirt') return self.virt_connected = True # FIXME this may block # register hypervisor storage tags for name, storage in self.hypervisor.storage.storages.iteritems(): for t in ( Loading @@ -47,6 +67,7 @@ class Handler(HostHandler): lambda: storage.capacity - storage.available, 5), ): self.tag_db['__main__'][t.name] = t self.main.reset_tag(t) # register domains for dom in self.hypervisor.domains.itervalues(): Loading @@ -59,9 +80,12 @@ class Handler(HostHandler): 'vm', )] = name self.tag_db['__main__'].update(dict( hv_tags = dict( (t.name, t) for t in tag_inspector(tags, self), )) ) self.tag_db['__main__'].update(hv_tags) for tag in hv_tags.itervalues(): self.main.reset_tag(tag) self.rpc_handler.update(dict( vm_define=self.vm_define, Loading @@ -72,8 +96,53 @@ class Handler(HostHandler): vm_suspend=self.vm_suspend, vm_resume=self.vm_resume, )) self.main.reset_handler('vm_define', self.vm_define) self.main.reset_handler('vm_undefine', self.vm_undefine) self.main.reset_handler('vm_export', self.vm_export) self.main.reset_handler('vm_stop', self.vm_stop) self.main.reset_handler('vm_start', self.vm_start) self.main.reset_handler('vm_suspend', self.vm_suspend) self.main.reset_handler('vm_resume', self.vm_resume) # if everything went fine, unregister the timer self.timer.stop() def virt_connect_restart(self): """Restart libvirt connection. This method might be called when libvirt connection is lost. """ if not self.virt_connected: return logger.error('Connection to libvirt lost, trying to restart') # update connection state self.virt_connected = False # unregister tags that will be re register later for storage in self.hypervisor.storage.storages: self.main.remove_tag('sto%s_state' % storage) self.main.remove_tag('sto%s_size' % storage) self.main.remove_tag('sto%s_free' % storage) self.main.remove_tag('sto%s_used' % storage) self.tag_db['__main__'].pop('sto%s_state' % storage) self.tag_db['__main__'].pop('sto%s_size' % storage) self.tag_db['__main__'].pop('sto%s_free' % storage) self.tag_db['__main__'].pop('sto%s_used' % storage) # unregister sub objects (for the same reason) for sub_id in self.tag_db.keys(): if sub_id == '__main__': continue self.main.remove_sub_object(sub_id) self.tag_db.pop(sub_id) # stop and delete hypervisor instance self.hypervisor.stop() self.hypervisor = None # launch connection timer self.timer.start() # FIXME duplicate code def register_domain_cb(self, call_id, response=None, error=None): """RPC callback used when registering a domain on the cc-server.""" name = self.async_calls.pop(call_id) if error is not None: logger.error('Error while registering domain, %s("%s")', Loading Loading @@ -149,14 +218,13 @@ class Handler(HostHandler): class Hypervisor(object): """Container for all hypervisor related state.""" def __init__(self, name, loop): def __init__(self, name, handler): """ :param str name: name of hypervisor instance :param loop: MainLoop instance :param Handler handler: hypervisor handler """ #: parent MainLoop self.main = weakref.proxy(loop) self.rpc_con = loop.rpc_con self.handler = weakref.proxy(handler) self.rpc_con = handler.main.rpc_con self.async_calls = dict() #: hv attributes Loading @@ -164,7 +232,7 @@ class Hypervisor(object): self.type = u'kvm' # libvirt event loop abstraction self.vir_event_loop = VirEventLoop(self.main.evloop) self.vir_event_loop = VirEventLoop(self.handler.main.evloop) # This tells libvirt what event loop implementation it # should use libvirt.virEventRegisterImpl( Loading Loading @@ -241,6 +309,7 @@ class Hypervisor(object): vm.state = state def register_cb(self, call_id, response=None, error=None): """RPC callback for registering domain to cc-server.""" vm = self.domains[self.async_calls.pop(call_id)] if error is not None: logger.error('Error while registering domain to server, %s("%s")', Loading @@ -248,15 +317,16 @@ class Hypervisor(object): logger.info('Add domain: %s (%s)', vm.name, vm.uuid) # add tags for tag in vm.tags.itervalues(): self.main.reset_sub_tag(vm.name, tag) self.handler.main.reset_sub_tag(vm.name, tag) def unregister_cb(self, call_id, response=None, error=None): """RPC callback for unregistering domain to cc-server.""" vm_name = self.async_calls.pop(call_id) if error is not None: logger.error('Error while unregistering domain to server, %s("%s")', error['exception'], error['message']) logger.info('Delete domain: %s', vm_name) self.main.remove_sub_object(vm_name) self.handler.main.remove_sub_object(vm_name) def vm_define(self, xml_desc): """Create a VM on the Hypervisor Loading
ccnode/hypervisor/domains/vm_tags.py +33 −0 Original line number Diff line number Diff line import logging from functools import wraps from xml.etree import cElementTree as et from StringIO import StringIO import libvirt from ccnode.tags import ttl, refresh logger = logging.getLogger(__name__) def _vir_tag(func): """Catches libvirt related exception. Decorator used for tag declarations that interacts with libvirt. """ @wraps(func) def decorated(dom): if not dom.hypervisor.handler.virt_connected: return try: func(dom) except libvirt.libvirtError: logger.exception('Unexpected libvirt error') dom.hypervisor.handler.virt_connect_restart() return decorated def uuid(dom): """Unique identifier of the domain.""" return dom.uuid Loading @@ -23,6 +49,7 @@ def htype(dom): return dom.hypervisor.type @_vir_tag def arch(dom): """VM CPU architecture.""" try: Loading @@ -37,6 +64,7 @@ def h(dom): return dom.name @_vir_tag def cpu(dom): """Number of CPU of the VM.""" return dom.lv_dom.info()[3] Loading @@ -48,16 +76,19 @@ def cpuuse(): pass @_vir_tag def mem(dom): """Memory currently allocated.""" return dom.lv_dom.info()[2] * 1024 @_vir_tag def memmax(dom): """Maximum memory allocation.""" return dom.lv_dom.info()[1] * 1024 @_vir_tag def vncport(dom): """VNC port for the VM console access.""" try: Loading @@ -70,6 +101,7 @@ def vncport(dom): @ttl(10) @refresh(10) @_vir_tag def disk(dom): """Get backend disks.""" return u' '.join(map(str, xrange(len(dom.disks)))) or None Loading @@ -77,6 +109,7 @@ def disk(dom): @ttl(10) @refresh(10) @_vir_tag def nic(dom): """VM network interfaces.""" return u' '.join(map(str, xrange(len(dom.nics)))) or None
ccnode/hypervisor/lib.py +20 −5 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import logging from itertools import chain from functools import wraps import pyev import libvirt Loading @@ -10,11 +11,6 @@ import libvirt logger = logging.getLogger(__name__) # FIXME bad global variable #: connection to the libvirt connection = None # TODO create a watcher thread for this #: corresponding name for enum state of libvirt domains # see http://libvirt.org/html/libvirt-libvirt.html#virDomainState DOMAIN_STATES = ( Loading Loading @@ -52,6 +48,25 @@ EVENTS = ( ) def vir_tag(func): """Catches libvirt related exception. Decorator used for tag declarations that interacts with libvirt. """ @wraps(func) def decorated(handl): if not handl.virt_connected: return try: func(handl) except libvirt.libvirtError: logger.exception('Unexpected libvirt error') handl.vir_con_restart() return decorated # following event loop implementation was inspired by libvirt python example # but updated to work with libev class LoopHandler(object): Loading
ccnode/hypervisor/tags.py +3 −0 Original line number Diff line number Diff line from ccnode.utils import and_ from ccnode.hypervisor.lib import vir_tag # hypervisor related tags Loading Loading @@ -38,11 +39,13 @@ def hvm(): return None @vir_tag def hvver(handl): """Hypervisor version.""" return handl.hypervisor.vir_con.getVersion() @vir_tag def libvirtver(handl): """Version of running libvirt.""" return handl.hypervisor.vir_con.getLibVersion() Loading