From ca3a36aa78c95cc14f5642694db212459d41ca62 Mon Sep 17 00:00:00 2001 From: Anael Beutot Date: Tue, 15 May 2012 10:27:24 +0200 Subject: [PATCH] Handle libvirt connection with errors. Try to connect to libvirt while no exception is thrown. Clean global variable for libvirt connection. --- ccnode/hypervisor/__init__.py | 104 ++++++++++++++++++++++----- ccnode/hypervisor/domains/vm_tags.py | 33 +++++++++ ccnode/hypervisor/lib.py | 25 +++++-- ccnode/hypervisor/tags.py | 3 + 4 files changed, 143 insertions(+), 22 deletions(-) diff --git a/ccnode/hypervisor/__init__.py b/ccnode/hypervisor/__init__.py index bf9b2eb..4678b77 100644 --- a/ccnode/hypervisor/__init__.py +++ b/ccnode/hypervisor/__init__.py @@ -17,26 +17,46 @@ from ccnode.hypervisor.domains import VirtualMachine logger = logging.getLogger(__name__) +# FIXME find a way to refactor Handler and Hypervisor class class Handler(HostHandler): def __init__(self, *args, **kwargs): """ :param loop: MainLoop instance :param hypervisor_name: hypervisor name """ - hypervisor_name = kwargs.pop('hypervisor_name') + self.hypervisor_name = kwargs.pop('hypervisor_name') HostHandler.__init__(self, *args, **kwargs) #: keep index of asynchronous calls self.async_calls = dict() + self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb) + self.hypervisor = None + self.virt_connected = False + + def start(self): + self.timer.start() + HostHandler.start(self) + + def stop(self): + self.timer.stop() + if self.hypervisor is not None: + self.hypervisor.stop() + HostHandler.stop(self) + + def virt_connect_cb(self, *args): # initialize hypervisor instance - # FIXME this may block - self.hypervisor = Hypervisor( - name=hypervisor_name, - loop=self.main, - ) + try: + self.hypervisor = Hypervisor( + name=self.hypervisor_name, + handler=self, + ) + except libvirt.libvirtError: + logger.exception('Error while connecting to libvirt') + return + + self.virt_connected = True - # FIXME this may block # register hypervisor storage tags for name, storage in self.hypervisor.storage.storages.iteritems(): for t in ( @@ -47,6 +67,7 @@ class Handler(HostHandler): lambda: storage.capacity - storage.available, 5), ): self.tag_db['__main__'][t.name] = t + self.main.reset_tag(t) # register domains for dom in self.hypervisor.domains.itervalues(): @@ -59,9 +80,12 @@ class Handler(HostHandler): 'vm', )] = name - self.tag_db['__main__'].update(dict( + hv_tags = dict( (t.name, t) for t in tag_inspector(tags, self), - )) + ) + self.tag_db['__main__'].update(hv_tags) + for tag in hv_tags.itervalues(): + self.main.reset_tag(tag) self.rpc_handler.update(dict( vm_define=self.vm_define, @@ -72,8 +96,53 @@ class Handler(HostHandler): vm_suspend=self.vm_suspend, vm_resume=self.vm_resume, )) + self.main.reset_handler('vm_define', self.vm_define) + self.main.reset_handler('vm_undefine', self.vm_undefine) + self.main.reset_handler('vm_export', self.vm_export) + self.main.reset_handler('vm_stop', self.vm_stop) + self.main.reset_handler('vm_start', self.vm_start) + self.main.reset_handler('vm_suspend', self.vm_suspend) + self.main.reset_handler('vm_resume', self.vm_resume) + + # if everything went fine, unregister the timer + self.timer.stop() + + def virt_connect_restart(self): + """Restart libvirt connection. + This method might be called when libvirt connection is lost. + """ + if not self.virt_connected: + return + + logger.error('Connection to libvirt lost, trying to restart') + # update connection state + self.virt_connected = False + # unregister tags that will be re register later + for storage in self.hypervisor.storage.storages: + self.main.remove_tag('sto%s_state' % storage) + self.main.remove_tag('sto%s_size' % storage) + self.main.remove_tag('sto%s_free' % storage) + self.main.remove_tag('sto%s_used' % storage) + self.tag_db['__main__'].pop('sto%s_state' % storage) + self.tag_db['__main__'].pop('sto%s_size' % storage) + self.tag_db['__main__'].pop('sto%s_free' % storage) + self.tag_db['__main__'].pop('sto%s_used' % storage) + # unregister sub objects (for the same reason) + for sub_id in self.tag_db.keys(): + if sub_id == '__main__': + continue + self.main.remove_sub_object(sub_id) + self.tag_db.pop(sub_id) + # stop and delete hypervisor instance + self.hypervisor.stop() + self.hypervisor = None + # launch connection timer + self.timer.start() + + # FIXME duplicate code def register_domain_cb(self, call_id, response=None, error=None): + """RPC callback used when registering a domain on the cc-server.""" name = self.async_calls.pop(call_id) if error is not None: logger.error('Error while registering domain, %s("%s")', @@ -149,14 +218,13 @@ class Handler(HostHandler): class Hypervisor(object): """Container for all hypervisor related state.""" - def __init__(self, name, loop): + def __init__(self, name, handler): """ :param str name: name of hypervisor instance - :param loop: MainLoop instance + :param Handler handler: hypervisor handler """ - #: parent MainLoop - self.main = weakref.proxy(loop) - self.rpc_con = loop.rpc_con + self.handler = weakref.proxy(handler) + self.rpc_con = handler.main.rpc_con self.async_calls = dict() #: hv attributes @@ -164,7 +232,7 @@ class Hypervisor(object): self.type = u'kvm' # libvirt event loop abstraction - self.vir_event_loop = VirEventLoop(self.main.evloop) + self.vir_event_loop = VirEventLoop(self.handler.main.evloop) # This tells libvirt what event loop implementation it # should use libvirt.virEventRegisterImpl( @@ -241,6 +309,7 @@ class Hypervisor(object): vm.state = state def register_cb(self, call_id, response=None, error=None): + """RPC callback for registering domain to cc-server.""" vm = self.domains[self.async_calls.pop(call_id)] if error is not None: logger.error('Error while registering domain to server, %s("%s")', @@ -248,15 +317,16 @@ class Hypervisor(object): logger.info('Add domain: %s (%s)', vm.name, vm.uuid) # add tags for tag in vm.tags.itervalues(): - self.main.reset_sub_tag(vm.name, tag) + self.handler.main.reset_sub_tag(vm.name, tag) def unregister_cb(self, call_id, response=None, error=None): + """RPC callback for unregistering domain to cc-server.""" vm_name = self.async_calls.pop(call_id) if error is not None: logger.error('Error while unregistering domain to server, %s("%s")', error['exception'], error['message']) logger.info('Delete domain: %s', vm_name) - self.main.remove_sub_object(vm_name) + self.handler.main.remove_sub_object(vm_name) def vm_define(self, xml_desc): """Create a VM on the Hypervisor diff --git a/ccnode/hypervisor/domains/vm_tags.py b/ccnode/hypervisor/domains/vm_tags.py index a89ce4d..4222c77 100644 --- a/ccnode/hypervisor/domains/vm_tags.py +++ b/ccnode/hypervisor/domains/vm_tags.py @@ -1,9 +1,35 @@ +import logging +from functools import wraps from xml.etree import cElementTree as et from StringIO import StringIO +import libvirt + from ccnode.tags import ttl, refresh +logger = logging.getLogger(__name__) + + +def _vir_tag(func): + """Catches libvirt related exception. + + Decorator used for tag declarations that interacts with libvirt. + + """ + @wraps(func) + def decorated(dom): + if not dom.hypervisor.handler.virt_connected: + return + try: + func(dom) + except libvirt.libvirtError: + logger.exception('Unexpected libvirt error') + dom.hypervisor.handler.virt_connect_restart() + + return decorated + + def uuid(dom): """Unique identifier of the domain.""" return dom.uuid @@ -23,6 +49,7 @@ def htype(dom): return dom.hypervisor.type +@_vir_tag def arch(dom): """VM CPU architecture.""" try: @@ -37,6 +64,7 @@ def h(dom): return dom.name +@_vir_tag def cpu(dom): """Number of CPU of the VM.""" return dom.lv_dom.info()[3] @@ -48,16 +76,19 @@ def cpuuse(): pass +@_vir_tag def mem(dom): """Memory currently allocated.""" return dom.lv_dom.info()[2] * 1024 +@_vir_tag def memmax(dom): """Maximum memory allocation.""" return dom.lv_dom.info()[1] * 1024 +@_vir_tag def vncport(dom): """VNC port for the VM console access.""" try: @@ -70,6 +101,7 @@ def vncport(dom): @ttl(10) @refresh(10) +@_vir_tag def disk(dom): """Get backend disks.""" return u' '.join(map(str, xrange(len(dom.disks)))) or None @@ -77,6 +109,7 @@ def disk(dom): @ttl(10) @refresh(10) +@_vir_tag def nic(dom): """VM network interfaces.""" return u' '.join(map(str, xrange(len(dom.nics)))) or None diff --git a/ccnode/hypervisor/lib.py b/ccnode/hypervisor/lib.py index cbb2e34..86c0f0b 100644 --- a/ccnode/hypervisor/lib.py +++ b/ccnode/hypervisor/lib.py @@ -2,6 +2,7 @@ import logging from itertools import chain +from functools import wraps import pyev import libvirt @@ -10,11 +11,6 @@ import libvirt logger = logging.getLogger(__name__) -# FIXME bad global variable -#: connection to the libvirt -connection = None # TODO create a watcher thread for this - - #: corresponding name for enum state of libvirt domains # see http://libvirt.org/html/libvirt-libvirt.html#virDomainState DOMAIN_STATES = ( @@ -52,6 +48,25 @@ EVENTS = ( ) +def vir_tag(func): + """Catches libvirt related exception. + + Decorator used for tag declarations that interacts with libvirt. + + """ + @wraps(func) + def decorated(handl): + if not handl.virt_connected: + return + try: + func(handl) + except libvirt.libvirtError: + logger.exception('Unexpected libvirt error') + handl.vir_con_restart() + + return decorated + + # following event loop implementation was inspired by libvirt python example # but updated to work with libev class LoopHandler(object): diff --git a/ccnode/hypervisor/tags.py b/ccnode/hypervisor/tags.py index a3990f9..09e7d4f 100644 --- a/ccnode/hypervisor/tags.py +++ b/ccnode/hypervisor/tags.py @@ -1,4 +1,5 @@ from ccnode.utils import and_ +from ccnode.hypervisor.lib import vir_tag # hypervisor related tags @@ -38,11 +39,13 @@ def hvm(): return None +@vir_tag def hvver(handl): """Hypervisor version.""" return handl.hypervisor.vir_con.getVersion() +@vir_tag def libvirtver(handl): """Version of running libvirt.""" return handl.hypervisor.vir_con.getLibVersion() -- GitLab