diff --git a/ccnode/host/__init__.py b/ccnode/host/__init__.py index 8a6004679a7ebb84eb840d5df22562e0d0b0bdf8..ff36df9806dc68b9eb9716aad2384dea9dfe5cef 100644 --- a/ccnode/host/__init__.py +++ b/ccnode/host/__init__.py @@ -32,15 +32,13 @@ class Handler(BasePlugin): BasePlugin.__init__(self, *args, **kwargs) # add plugin tags - self.tag_db['__main__'].update(dict( - (t.name, t) for t in tag_inspector(tags), - )) + self.tag_db.add_tags(tag_inspector(tags)) # disk related tags - self.tag_db['__main__'].update(dict((t.name, t) for t in imap( + self.tag_db.add_tags(imap( lambda d: Tag('disk%s_size' % d, disk_tag_value(d), 60), self.tag_db['__main__']['disk']._calculate_value().split(), - ))) + )) # rpc handler self.rpc_handler.update(dict( diff --git a/ccnode/hypervisor/__init__.py b/ccnode/hypervisor/__init__.py index 4678b7742ddb3e4b9e247e81c58b95499b14eb92..653b55e254012d565fda093b8012d34790d91c36 100644 --- a/ccnode/hypervisor/__init__.py +++ b/ccnode/hypervisor/__init__.py @@ -32,7 +32,21 @@ class Handler(HostHandler): self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb) self.hypervisor = None - self.virt_connected = False + self._virt_connected = False + + # register tags + self.tag_db.add_tags(tag_inspector(tags, self)) + + @property + def virt_connected(self): + return self._virt_connected + + @virt_connected.setter + def virt_connected(self, value): + self._virt_connected = value + # update tags + for tag in ('vir_status', 'hvver', 'libvirtver'): + self.tag_db['__main__'][tag].calculate_value() def start(self): self.timer.start() @@ -59,33 +73,17 @@ class Handler(HostHandler): # register hypervisor storage tags for name, storage in self.hypervisor.storage.storages.iteritems(): - for t in ( - Tag('sto%s_state' % name, lambda: storage.state, 5), - Tag('sto%s_size' % name, lambda: storage.capacity, 5), - Tag('sto%s_free' % name, lambda: storage.available, 5), + self.tag_db.add_tags(( + Tag('sto%s_state' % name, lambda: storage.state, 5, 5), + Tag('sto%s_size' % name, lambda: storage.capacity, 5, 5), + Tag('sto%s_free' % name, lambda: storage.available, 5, 5), Tag('sto%s_used' % name, - lambda: storage.capacity - storage.available, 5), - ): - self.tag_db['__main__'][t.name] = t - self.main.reset_tag(t) + lambda: storage.capacity - storage.available, 5, 5), + )) # register domains for dom in self.hypervisor.domains.itervalues(): - name = dom.name - # proxy.register(name, 'vm') - self.async_calls[self.main.rpc_con.rpc.async_call_cb( - self.register_domain_cb, - 'register', - name, - 'vm', - )] = name - - hv_tags = dict( - (t.name, t) for t in tag_inspector(tags, self), - ) - self.tag_db['__main__'].update(hv_tags) - for tag in hv_tags.itervalues(): - self.main.reset_tag(tag) + self.tag_db.add_sub_object(dom.name, dom.tags.itervalues()) self.rpc_handler.update(dict( vm_define=self.vm_define, @@ -118,42 +116,34 @@ class Handler(HostHandler): logger.error('Connection to libvirt lost, trying to restart') # update connection state self.virt_connected = False - # unregister tags that will be re register later + # unregister tags that will be re registered later for storage in self.hypervisor.storage.storages: - self.main.remove_tag('sto%s_state' % storage) - self.main.remove_tag('sto%s_size' % storage) - self.main.remove_tag('sto%s_free' % storage) - self.main.remove_tag('sto%s_used' % storage) - self.tag_db['__main__'].pop('sto%s_state' % storage) - self.tag_db['__main__'].pop('sto%s_size' % storage) - self.tag_db['__main__'].pop('sto%s_free' % storage) - self.tag_db['__main__'].pop('sto%s_used' % storage) + self.tag_db.remove_tags(( + 'sto%s_state' % storage, + 'sto%s_size' % storage, + 'sto%s_free' % storage, + 'sto%s_used' % storage, + )) # unregister sub objects (for the same reason) for sub_id in self.tag_db.keys(): if sub_id == '__main__': continue - self.main.remove_sub_object(sub_id) - self.tag_db.pop(sub_id) + self.tag_db.remove_sub_object(sub_id) # stop and delete hypervisor instance self.hypervisor.stop() self.hypervisor = None + + # remove handlers related to libvirt + self.main.remove_handler('vm_define') + self.main.remove_handler('vm_undefine') + self.main.remove_handler('vm_export') + self.main.remove_handler('vm_stop') + self.main.remove_handler('vm_start') + self.main.remove_handler('vm_suspend') + self.main.remove_handler('vm_resume') # launch connection timer self.timer.start() - # FIXME duplicate code - def register_domain_cb(self, call_id, response=None, error=None): - """RPC callback used when registering a domain on the cc-server.""" - name = self.async_calls.pop(call_id) - if error is not None: - logger.error('Error while registering domain, %s("%s")', - error['exception'], error['message']) - return - - logger.debug('Registered domain %s', name) - domain = self.hypervisor.domains[name] - for tag in domain.tags.itervalues(): - self.main.reset_sub_tag(domain.name, tag) - def iter_vms(self, vm_names): """Utility function to iterate over VM objects using their names.""" if vm_names is None: @@ -224,8 +214,6 @@ class Hypervisor(object): :param Handler handler: hypervisor handler """ self.handler = weakref.proxy(handler) - self.rpc_con = handler.main.rpc_con - self.async_calls = dict() #: hv attributes self.name = name @@ -280,23 +268,13 @@ class Hypervisor(object): if event == 'Added': vm = VirtualMachine(dom, self) + logger.info('Created domain %s', vm.name) self.domains[vm.name] = vm - # self.sjproxy.register(vm.name, 'vm') - self.async_calls[self.rpc_con.rpc.async_call_cb( - self.register_cb, - 'register', - vm.name, - 'vm', - )] = vm.name + self.handler.tag_db.add_sub_object(vm.name, vm.tags.itervalues()) elif event == 'Removed': - logger.debug('About to remove domain') vm = self.domains.pop(dom.name()) - # self.sjproxy.unregister(vm.name) - self.async_calls[self.rpc_con.rpc.async_call_cb( - self.unregister_cb, - 'unregister', - vm.name, - )] = vm.name + logger.info('Removed domain %s', vm.name) + self.handler.tag_db.remove_sub_object(vm.name) elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved', 'Restored'): vm = self.domains.get(dom.name()) @@ -308,26 +286,6 @@ class Hypervisor(object): state) vm.state = state - def register_cb(self, call_id, response=None, error=None): - """RPC callback for registering domain to cc-server.""" - vm = self.domains[self.async_calls.pop(call_id)] - if error is not None: - logger.error('Error while registering domain to server, %s("%s")', - error['exception'], error['message']) - logger.info('Add domain: %s (%s)', vm.name, vm.uuid) - # add tags - for tag in vm.tags.itervalues(): - self.handler.main.reset_sub_tag(vm.name, tag) - - def unregister_cb(self, call_id, response=None, error=None): - """RPC callback for unregistering domain to cc-server.""" - vm_name = self.async_calls.pop(call_id) - if error is not None: - logger.error('Error while unregistering domain to server, %s("%s")', - error['exception'], error['message']) - logger.info('Delete domain: %s', vm_name) - self.handler.main.remove_sub_object(vm_name) - def vm_define(self, xml_desc): """Create a VM on the Hypervisor :param str xml_desc: XML description in libvirt format diff --git a/ccnode/hypervisor/domains/__init__.py b/ccnode/hypervisor/domains/__init__.py index 3a7c00d9752e169be4ca3468f76dbb6ccaedb395..6f120fc5c89d5d3242c1e53864dd783f867b44c0 100644 --- a/ccnode/hypervisor/domains/__init__.py +++ b/ccnode/hypervisor/domains/__init__.py @@ -31,6 +31,7 @@ class VirtualMachine(object): #: state of VM: started, stoped, paused self._state = STATE[dom.info()[0]] #: tags for this VM + # FIXME use a tag db instance self.tags = dict((t.name, t) for t in tag_inspector(vm_tags, self)) # define dynamic tags i = 0 diff --git a/ccnode/hypervisor/lib.py b/ccnode/hypervisor/lib.py index 86c0f0bd28b59af9b651cfb1629b5f976442525d..d0dfa4cb6c1db5166254f51f1b6682503eab1e54 100644 --- a/ccnode/hypervisor/lib.py +++ b/ccnode/hypervisor/lib.py @@ -2,7 +2,6 @@ import logging from itertools import chain -from functools import wraps import pyev import libvirt @@ -48,25 +47,6 @@ EVENTS = ( ) -def vir_tag(func): - """Catches libvirt related exception. - - Decorator used for tag declarations that interacts with libvirt. - - """ - @wraps(func) - def decorated(handl): - if not handl.virt_connected: - return - try: - func(handl) - except libvirt.libvirtError: - logger.exception('Unexpected libvirt error') - handl.vir_con_restart() - - return decorated - - # following event loop implementation was inspired by libvirt python example # but updated to work with libev class LoopHandler(object): diff --git a/ccnode/hypervisor/tags.py b/ccnode/hypervisor/tags.py index 95cbad8e60b9ce75e5845b7401313d2f3ff958a7..4552744c3cfd43533e62a49b73bd0e94d97929e6 100644 --- a/ccnode/hypervisor/tags.py +++ b/ccnode/hypervisor/tags.py @@ -1,5 +1,40 @@ +import logging +from functools import wraps + +import libvirt + from ccnode.utils import and_ -from ccnode.hypervisor.lib import vir_tag + + +logger = logging.getLogger(__name__) + + +def _virt_tag(func): + """Catches libvirt related exception. + + Decorator used for tag declarations that interacts with libvirt. + + """ + @wraps(func) + def decorated(handl): + if not handl.virt_connected: + return + try: + return func(handl) + except libvirt.libvirtError: + logger.exception('Unexpected libvirt error') + handl.vir_con_restart() + + return decorated + + +def _check_virt_connected(func): + """Check is libvirt is connected before caculating tag.""" + @wraps(func) + def decorated(handl): + if not handl.virt_connected: + return + return func(handl) def vir_status(handl): @@ -14,6 +49,7 @@ def htype(): return u'kvm' +@_check_virt_connected def hv(handl): """Hypervisor name.""" # What is the point of this tag ? if the information not already in a and id @@ -44,13 +80,13 @@ def hvm(): return None -@vir_tag +@_virt_tag def hvver(handl): """Hypervisor version.""" return handl.hypervisor.vir_con.getVersion() -@vir_tag +@_virt_tag def libvirtver(handl): """Version of running libvirt.""" return handl.hypervisor.vir_con.getLibVersion() @@ -62,27 +98,32 @@ def rjobs(): # storage pools +@_check_virt_connected def sto(handl): """Storage pool names.""" return u' '.join(handl.hypervisor.storage.storages.iterkeys()) # Vm related tags +@_check_virt_connected def nvm(handl): """Number of VMS in the current hypervisor.""" return handl.hypervisor.vm_total +@_check_virt_connected def vmpaused(handl): """Count of VMs paused.""" return handl.hypervisor.vm_paused +@_check_virt_connected def vmstarted(handl): """Count of VMs started.""" return handl.hypervisor.vm_started +@_check_virt_connected def vmstopped(handl): """Count of VMs Stopped.""" return handl.hypervisor.vm_stopped diff --git a/ccnode/node.py b/ccnode/node.py index 25a0454006c36a4ba99780196cf79e555c11a3ad..6869f033450c88f5273b9465f494eb46690710fd 100644 --- a/ccnode/node.py +++ b/ccnode/node.py @@ -12,7 +12,7 @@ from sjrpc.utils import ConnectionProxy, RpcHandler, threadless from ccnode import __version__ from ccnode.config import NodeConfigParser -from ccnode.tags import Tag, get_tags +from ccnode.tags import Tag, get_tags, TagDB from ccnode.exc import PluginError @@ -178,12 +178,7 @@ class MainLoop(object): self.role = None # tag database - self.tag_db = defaultdict( - dict, - __main__=dict((t.name, t) - for t in DEFAULT_TAGS), # db for main objects - # other sub-objects db can go here (for example VMs) - ) + self.tag_db = TagDB(self, tags=DEFAULT_TAGS) # handlers self.rpc_handler = dict( get_tags=partial(threadless(get_tags), self.tag_db['__main__']), @@ -218,31 +213,6 @@ class MainLoop(object): return get_tags(sub_db, tags, noresolve_tags) # End RPC handlers definitions - def reset_tag(self, tag): - """ - :param tag: :class:`Tag` to add/replace - """ - # TODO tag register - tag.start(self.evloop) - self.tag_db['__main__'][tag.name] = tag - - def remove_tag(self, tag_name): - # TODO tag unregister - self.tag_db['__main__'].pop(tag_name).stop() - - def reset_sub_tag(self, sub_id, tag): - # TODO tag register - tag.start(self.evloop) - self.tag_db[sub_id][tag.name] = tag - - def remove_sub_tag(self, sub_id, tag_name): - # TODO tag unregister - self.tag_db[sub_id].pop(tag_name).stop() - - def remove_sub_object(self, sub_id): - for tag in self.tag_db.pop(sub_id, {}).itervalues(): - tag.stop() - def reset_handler(self, name, handl): self.rpc_handler[name] = handl @@ -256,8 +226,7 @@ class MainLoop(object): self.registered_plugins.add(plugin) # register tags - for name, sub_db in plugin.tag_db.iteritems(): - self.tag_db[name].update(sub_db) + self.tag_db.update_from_db(plugin.tag_db) # register handler self.rpc_handler.update(plugin.rpc_handler) @@ -278,6 +247,8 @@ class MainLoop(object): if not self.tag_db[db_name]: # if there's no more tag in the db del self.tag_db[db_name] + self.tag_db.delete_from_db(plugin.tag_db) + # remove handlers for handler_name in plugin.rpc_handler: del self.rpc_handler[handler_name] diff --git a/ccnode/plugins.py b/ccnode/plugins.py index 3a1a8011e3c7aa10a8781749f49a6438d157d61d..1e9999ebae5ee3ca95c49e77eaca0927fccea11a 100644 --- a/ccnode/plugins.py +++ b/ccnode/plugins.py @@ -2,6 +2,8 @@ from itertools import chain, imap +from ccnode.tags import TagDB + class Base(object): """Example skeleton plugin for cc-node. @@ -18,10 +20,7 @@ class Base(object): self.main = kwargs.pop('loop') # plugins may define tags (see :mod:`ccnode.tags`) - self.tag_db = dict( - __main__=dict(), - # subobjects tags go here - ) + self.tag_db = TagDB(self.main, parent_db=self.main.tag_db) # plugins may define handler functions that would be called by the # server @@ -41,11 +40,7 @@ class Base(object): def start(self): """Used to start pyev watchers.""" - # start tags - for tag in chain.from_iterable( - imap(lambda d: d.itervalues(), self.tag_db.itervalues()), - ): - tag.start(self.main.evloop) + pass def stop(self): """Cleanup for plugins, can be used to clean pyev watchers.""" diff --git a/ccnode/tags.py b/ccnode/tags.py index 9eedba6507beca9644b325793a05e3d3521dbd2b..3ed75477660be4c48f673c22c0d47cac252016ac 100644 --- a/ccnode/tags.py +++ b/ccnode/tags.py @@ -3,6 +3,7 @@ import logging import time import weakref from functools import partial +from collections import defaultdict logger = logging.getLogger(__name__) @@ -48,6 +49,10 @@ class Tag(object): self.watcher = None + # FIXME special arguments for tag db + self.db = None + self.sub_id = None + def calculate_value(self): self.value = self._calculate_value() # logger.debug('Calculate Tag(%s) = %s', self.name, self.value) @@ -155,3 +160,225 @@ def get_tags(tags_dict, tags=None, noresolve_tags=None): logger.debug('Profiling: %f seconds', time.time() - profile) logger.debug('Returning: %s', unicode(result)) return result + + +class TagDB(object): + """Tag database. FIXME comment + + Handles common operations such as registering tag on the cc-server, updating + its values, etc. + + TagDB can have a parent TagDB, in this case, the latter will handle tag + registration on the cc-server. + + """ + def __init__(self, main, parent_db=None, tags=None, sub_tags=None): + """ + :param main: MainLoop instance + :param TagDB parent_db: TagDB parent object + :param iterable tags: initial tags + :param dict sub_tags: initial subtags + """ + self.main = weakref.proxy(main) + self.parent = parent_db + + if tags is None: + tags = tuple() + if sub_tags is None: + sub_tags = {} + + self.db = defaultdict( + dict, + __main__=dict(), # tags for main object + # others objects + ) + + for tag in tags: + self.add_tag(tag) + + for sub_id, tags in sub_tags.iteritems(): + for tag in tags: + self.add_sub_tag(sub_id, tag) + + #: dict for async call storage, keep a part of log message + self.async_calls = dict() + + # RPC part + def rpc_register(self): + """Register all objects and tags on the cc-server. + + This is used just after a (re)connection to the cc-server is made. + """ + for sub_id in self.db: + if sub_id == '__main__': + continue + self.rpc_register_sub_object(sub_id) + + # TODO register tags + + def rpc_register_sub_object(self, sub_id): + # register object on the cc-server + if self.main.rpc_authenticated: + self.async_calls[self.main.rpc_con.rpc.async_call_cb( + self.rpc_object_register_cb, + 'register', + sub_id, + 'vm', # FIXME generalization + )] = sub_id + + def rpc_unregister_sub_object(self, sub_id): + if self.main.rpc_authenticated: + self.async_calls[self.main.rpc_con.rpc.async_call_cb( + self.rpc_object_unregister_cb, + 'unregister', + sub_id, + )] = sub_id + + def rpc_object_register_cb(self, call_id, response, error): + name = self.async_calls.pop(call_id) + if error is not None: + logger.error('Error while trying to register the object %s,' + ' %s("%s")', name, error['exception'], + error['message']) + return + + logger.info('Successfully registered object %s', name) + + def rpc_object_unregister_cb(self, call_id, response, error): + name = self.async_calls.pop(call_id) + if error is not None: + logger.error('Error while trying to unregister the object %s,' + ' %s("%s")', name, error['exception'], + error['message']) + return + + logger.info('Successfully unregistered object %s', name) + + def rpc_tag_register_cb(self, call_id, response, error): + pass + + def rpc_tag_unregister_cb(self, call_id, response, error): + pass + + def rpc_sub_tag_register_cb(self, call_id, response, error): + pass + + def rpc_sub_tag_unregister_cb(self, call_id, response, error): + pass + # end RPC part + + # tag handling part, used by plugins + def add_tags(self, tags): + """ + :param iterable tags: list of tags to add + """ + for tag in tags: + self.add_tag(tag) + + def add_sub_tags(self, sub_id, tags): + for tag in tags: + self.add_sub_tag(sub_id, tag) + + def remove_tags(self, tag_names): + """ + :param iterable tag_names: list of tag names to remove + """ + for name in tag_names: + self.remove_tag(name) + + def add_tag(self, tag): + # set special attributes on tag instance + if self.parent is None: + tag.db = self + tag.sub_id = '__main__' + tag.start(self.main.evloop) + # TODO register tag + else: + self.parent.add_tag(tag) + self.db['__main__'][tag.name] = tag + + def remove_tag(self, tag_name): + tag = self.db['__main__'].pop(tag_name) + if self.parent is None: + tag.db = None + tag.sub_id = None + tag.stop() + # TODO unregister tag + else: + self.parent.remove_tag(tag_name) + + def add_sub_tag(self, sub_id, tag): + if self.parent is None: + tag.db = self + tag.sub_id = sub_id + tag.start(self.main.evloop) + # TODO register tag + else: + self.parent.add_sub_tag(sub_id, tag) + self.db[sub_id][tag.name] = tag + + def remove_sub_tag(self, sub_id, tag_name): + tag = self.db[sub_id].pop(tag_name) + if self.parent is None: + tag.db = None + tag.sub_id = None + tag.stop() + # TODO unregister tag + else: + self.parent.remove_sub_tag(sub_id, tag_name) + + def add_sub_object(self, sub_id, tags): + if self.parent is None: + self.rpc_register_sub_object(sub_id) + else: + # tags will be registered after + self.parent.add_sub_object(sub_id, tuple()) + # register tags + for t in tags: + self.add_sub_tag(sub_id, t) + + def remove_sub_object(self, sub_id): + if self.parent is None: + for name in self.db[sub_id].keys(): + self.remove_sub_tag(sub_id, name) + del self.db[sub_id] + self.rpc_unregister_sub_object(sub_id) + else: + sub_tags = self.db.pop(sub_id) + self.parent.remove_sub_object(sub_id) + + def update_tag_value(self, tag): + """Update tag value on cc-server.""" + # TODO + + # dict like + def get(self, key, default=None): + return self.db.get(key, default) + + def __getitem__(self, key): + return self.db[key] + + def keys(self): + return self.db.keys() + + def iteritems(self): + return self.db.iteritems() + + def itervalues(self): + return self.db.itervalues() + + def update_from_db(self, tag_db): + """Update self tag db from other instance.""" + self.add_tags(tag_db['__main__'].itervalues()) + + for sub_id, db in tag_db.iteritems(): + if sub_id == '__main__': + continue + self.add_sub_tags(sub_id, db.itervalues()) + + def delete_from_db(self, tag_db): + self.remove_tags(tag_db['__main__']) + for sub_id in tag_db: + if sub_id == '__main__': + continue + self.remove_suobject(sub_id)