Loading ccnode/node.py +76 −8 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import time import signal import logging import logging.config from collections import defaultdict from functools import partial import pyev Loading @@ -11,6 +12,7 @@ from sjrpc.utils import ConnectionProxy, RpcHandler from ccnode import __version__ from ccnode.config import NodeConfigParser from ccnode.tags import Tag, get_tags from ccnode.exc import PluginError logger = logging.getLogger(__name__) Loading Loading @@ -98,17 +100,16 @@ class AuthHandler(object): if response == u'host': logger.debug('Role host affected') from ccnode.host import Handler as HostHandler self.loop.rpc_con.rpc.set_handler(HostHandler()) self.loop.role = u'host' self.loop.role = HostHandler() elif response == u'hv': logger.debug('Role hypervisor affected') from ccnode.hypervisor import Handler as HypervisorHandler self.loop.rpc_con.rpc.set_handler(HypervisorHandler( self.loop.role = HypervisorHandler( proxy=self.loop.proxy, hypervisor_name=self.loop.config.server_user)) self.loop.role = u'hv' hypervisor_name=self.loop.config.server_user, ) else: logger.info('Failed authentication, role returned: %s', response) logger.error('Failed authentication, role returned: %s', response) self.loop.role = None self.auth_id = None return Loading @@ -116,6 +117,7 @@ class AuthHandler(object): if self.loop.role is not None: self.watcher.stop() logger.info('Successfully authenticated with role %s', response) self.loop.register_plugin(self.loop.role) self.auth_id = None def start(self): Loading Loading @@ -155,7 +157,8 @@ class MainLoop(object): self.role = None # tag database self.tag_db = dict( self.tag_db = defaultdict( dict, __main__=dict((t.name, t) for t in DEFAULT_TAGS), # db for main objects # other sub-objects db can go here (for example VMs) Loading @@ -166,6 +169,9 @@ class MainLoop(object): sub_tags=self.sub_tags, ) # plugins self.registered_plugins = set() # RPC handlers definitions def sub_tags(self, sub_id, tags=None, noresolve_tags=None): if sub_id == '__main__': Loading @@ -182,6 +188,62 @@ class MainLoop(object): return get_tags(sub_db, tags, noresolve_tags) # End RPC handlers definitions def reset_tag(self, tag): """ :param tag: :class:`Tag` to add/replace """ # TODO tag register tag.start(self.loop) self.tag_db['__main__'][tag.name] = tag def remove_tag(self, tag_name): # TODO tag unregister self.tag_db['__main__'].pop(tag_name).stop() def reset_sub_tag(self, sub_id, tag): # TODO tag register tag.start(self.loop) self.tag_db[sub_id][tag.name] = tag def remove_sub_tag(self, sub_id, tag_name): # TODO tag unregister self.tag_db[sub_id].pop(tag_name).stop() def register_plugin(self, plugin): # keep track of registered plugins if plugin in self.registered_plugins: raise PluginError('Plugin was already registered') self.registered_plugins.add(plugin) # register tags for name, sub_db in plugin.tag_db.iteritems(): self.tag_db[name].update(sub_db) # register handler self.rpc_handler.update(plugin.rpc_handler) plugin.start(self.loop) def unregister_plugin(self, plugin): try: self.registered_plugins.remove(plugin) except KeyError: raise PluginError('Plugin was not registered, cannot remove') # remove tags for db_name, tag_db in plugin.tag_db: for tag_name in tag_db: del self.tag_db[db_name][tag_name] if not self.tag_db[db_name]: # if there's no more tag in the db del self.tag_db[db_name] # remove handlers for handler_name in plugin.rpc_handler: del self.rpc_handler[handler_name] plugin.stop() def rpc_connect(self): # TODO async and error handling self.rpc_con = RpcConnection.from_addr_ssl( Loading @@ -192,7 +254,6 @@ class MainLoop(object): ) self.proxy = ConnectionProxy(self.rpc_con) def start(self): for signal in self.signals.itervalues(): signal.start() Loading @@ -204,8 +265,15 @@ class MainLoop(object): def stop(self, watcher=None, revents=None): logger.info('Exiting node...') # close rpc if self.rpc_con is not None: self.rpc_con.shutdown() # close all plugins for plugin in self.registered_plugins: plugin.stop() self.registered_plugins = set() self.role = None self.loop.stop() def reload(self, watcher=None, revents=None): Loading Loading
ccnode/node.py +76 −8 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import time import signal import logging import logging.config from collections import defaultdict from functools import partial import pyev Loading @@ -11,6 +12,7 @@ from sjrpc.utils import ConnectionProxy, RpcHandler from ccnode import __version__ from ccnode.config import NodeConfigParser from ccnode.tags import Tag, get_tags from ccnode.exc import PluginError logger = logging.getLogger(__name__) Loading Loading @@ -98,17 +100,16 @@ class AuthHandler(object): if response == u'host': logger.debug('Role host affected') from ccnode.host import Handler as HostHandler self.loop.rpc_con.rpc.set_handler(HostHandler()) self.loop.role = u'host' self.loop.role = HostHandler() elif response == u'hv': logger.debug('Role hypervisor affected') from ccnode.hypervisor import Handler as HypervisorHandler self.loop.rpc_con.rpc.set_handler(HypervisorHandler( self.loop.role = HypervisorHandler( proxy=self.loop.proxy, hypervisor_name=self.loop.config.server_user)) self.loop.role = u'hv' hypervisor_name=self.loop.config.server_user, ) else: logger.info('Failed authentication, role returned: %s', response) logger.error('Failed authentication, role returned: %s', response) self.loop.role = None self.auth_id = None return Loading @@ -116,6 +117,7 @@ class AuthHandler(object): if self.loop.role is not None: self.watcher.stop() logger.info('Successfully authenticated with role %s', response) self.loop.register_plugin(self.loop.role) self.auth_id = None def start(self): Loading Loading @@ -155,7 +157,8 @@ class MainLoop(object): self.role = None # tag database self.tag_db = dict( self.tag_db = defaultdict( dict, __main__=dict((t.name, t) for t in DEFAULT_TAGS), # db for main objects # other sub-objects db can go here (for example VMs) Loading @@ -166,6 +169,9 @@ class MainLoop(object): sub_tags=self.sub_tags, ) # plugins self.registered_plugins = set() # RPC handlers definitions def sub_tags(self, sub_id, tags=None, noresolve_tags=None): if sub_id == '__main__': Loading @@ -182,6 +188,62 @@ class MainLoop(object): return get_tags(sub_db, tags, noresolve_tags) # End RPC handlers definitions def reset_tag(self, tag): """ :param tag: :class:`Tag` to add/replace """ # TODO tag register tag.start(self.loop) self.tag_db['__main__'][tag.name] = tag def remove_tag(self, tag_name): # TODO tag unregister self.tag_db['__main__'].pop(tag_name).stop() def reset_sub_tag(self, sub_id, tag): # TODO tag register tag.start(self.loop) self.tag_db[sub_id][tag.name] = tag def remove_sub_tag(self, sub_id, tag_name): # TODO tag unregister self.tag_db[sub_id].pop(tag_name).stop() def register_plugin(self, plugin): # keep track of registered plugins if plugin in self.registered_plugins: raise PluginError('Plugin was already registered') self.registered_plugins.add(plugin) # register tags for name, sub_db in plugin.tag_db.iteritems(): self.tag_db[name].update(sub_db) # register handler self.rpc_handler.update(plugin.rpc_handler) plugin.start(self.loop) def unregister_plugin(self, plugin): try: self.registered_plugins.remove(plugin) except KeyError: raise PluginError('Plugin was not registered, cannot remove') # remove tags for db_name, tag_db in plugin.tag_db: for tag_name in tag_db: del self.tag_db[db_name][tag_name] if not self.tag_db[db_name]: # if there's no more tag in the db del self.tag_db[db_name] # remove handlers for handler_name in plugin.rpc_handler: del self.rpc_handler[handler_name] plugin.stop() def rpc_connect(self): # TODO async and error handling self.rpc_con = RpcConnection.from_addr_ssl( Loading @@ -192,7 +254,6 @@ class MainLoop(object): ) self.proxy = ConnectionProxy(self.rpc_con) def start(self): for signal in self.signals.itervalues(): signal.start() Loading @@ -204,8 +265,15 @@ class MainLoop(object): def stop(self, watcher=None, revents=None): logger.info('Exiting node...') # close rpc if self.rpc_con is not None: self.rpc_con.shutdown() # close all plugins for plugin in self.registered_plugins: plugin.stop() self.registered_plugins = set() self.role = None self.loop.stop() def reload(self, watcher=None, revents=None): Loading