import time import signal import logging import logging.config from threading import Thread from collections import defaultdict from functools import partial import pyev from sjrpc.core import RpcConnection from sjrpc.utils import ConnectionProxy, RpcHandler, threadless from ccnode import __version__ from ccnode.config import NodeConfigParser from ccnode.tags import Tag, get_tags, TagDB from ccnode.exc import PluginError logger = logging.getLogger(__name__) DEFAULT_TAGS = (Tag(u'version', __version__),) class RPCStartHandler(Thread): """Handles rpc connection and authentication to the remote cc-server. This class inherits from :class:`Thread` but only the connection part is run in a background thread. """ def __init__(self, loop): """ :param loop: MainLoop instance """ Thread.__init__(self) self.daemon = True self.run = self.rpc_connect self.loop = loop # signal to the main thread when connection is done self.async = loop.evloop.async(self.async_cb) self.timer = loop.evloop.timer(5., 5., self.in_progress_cb) self.rpc_con = None # id for async rpc call self.auth_id = None # Method run in the thread def rpc_connect(self): while True: try: logger.debug('About to create connection') self.rpc_con = RpcConnection.from_addr_ssl( addr=self.loop.config.server_host, port=self.loop.config.server_port, handler=self.loop.rpc_handler, loop=self.loop.evloop, ) logger.debug('Connection created') except IOError: logger.exception('Error while connecting to the cc-server') time.sleep(5) else: break logger.debug('Async send') self.async.send() # success def start(self): self.timer.start() self.async.start() Thread.start(self) def stop(self): self.timer.stop() self.async.stop() def in_progress_cb(self, *args): logger.info('Connection to the cc-server still in progress') def async_cb(self, *args): logger.debug('Async callback') self.timer.stop() # connect is done self.loop.rpc_con = self.rpc_con # start authentication self.timer = self.loop.evloop.timer(.5, 5., self.auth_cb) self.timer.start() def auth_cb(self, *args): logger.debug('Callback auth') # check is fallback mode on sjrpc is set otherwise our call would block # the loop if not self.rpc_con._event_fallback.is_set(): logger.debug('Will try authentication again latter') return if self.auth_id is not None: logger.error('Authentication is taking longer than expected') return # try to authenticate self.auth_id = self.rpc_con.rpc.async_call_cb( self.auth_done_cb, 'authentify', self.loop.config.server_user, self.loop.config.server_passwd, ) def auth_done_cb(self, call_id, response=None, error=None): assert call_id == self.auth_id if error is not None: # we got an error logger.error('Error while authenticating with cc-server: %s("%s")', error['exception'], error.get('message', '')) # try to reconnect in 5 seconds return # set handler according to which role was returned by the cc-server if response == u'host': logger.debug('Role host affected') from ccnode.host import Handler as HostHandler self.loop.role = HostHandler(loop=self.loop) elif response == u'hv': logger.debug('Role hypervisor affected') from ccnode.hypervisor import Handler as HypervisorHandler self.loop.role = HypervisorHandler( hypervisor_name=self.loop.config.server_user, loop=self.loop, ) else: logger.error('Failed authentication, role returned: %s', response) self.loop.role = None self.auth_id = None return if self.loop.role is not None: self.stop() logger.info('Successfully authenticated with role %s', response) self.loop.register_plugin(self.loop.role) self.auth_id = None class MainLoop(object): def __init__(self, config_path): self.evloop = pyev.default_loop(debug=True) self.config_path = config_path # set signal watchers self.signals = { signal.SIGINT: self.stop, signal.SIGTERM: self.stop, signal.SIGUSR1: self.reload, } # turn into real watchers self.signals = dict(( signal, self.evloop.signal(signal, cb), ) for signal, cb in self.signals.iteritems()) # load config variables self.config = NodeConfigParser(self.config_path) # configure logging logging.config.fileConfig(self.config_path) # rpc connection self.rpc_con = None self.connect = RPCStartHandler(self) # role self.role = None # tag database self.tag_db = TagDB(self, tags=DEFAULT_TAGS) # handlers self.rpc_handler = dict( get_tags=partial(threadless(get_tags), self.tag_db['__main__']), sub_tags=self.sub_tags, ) # plugins self.registered_plugins = set() @property def rpc_connected(self): return self.rpc_con is not None @property def rpc_authenticated(self): return self.rpc_connected and self.role is not None # RPC handlers definitions @threadless def sub_tags(self, sub_id, tags=None, noresolve_tags=None): if sub_id == '__main__': # FIXME should we raise ? logger.debug('Invalid request for sub object') return sub_db = self.tag_db.get(sub_id) if sub_db is None: # FIXME should we also raise here ? logger.debug('Failed to find sub_id %s', sub_id) return {} return get_tags(sub_db, tags, noresolve_tags) # End RPC handlers definitions def reset_handler(self, name, handl): self.rpc_handler[name] = handl def remove_handler(self, name): self.rpc_handler.pop(name, None) def register_plugin(self, plugin): # keep track of registered plugins if plugin in self.registered_plugins: raise PluginError('Plugin was already registered') self.registered_plugins.add(plugin) # register tags self.tag_db.update_from_db(plugin.tag_db) # register handler self.rpc_handler.update(plugin.rpc_handler) plugin.start() def unregister_plugin(self, plugin): try: self.registered_plugins.remove(plugin) except KeyError: raise PluginError('Plugin was not registered, cannot remove') # remove tags for db_name, tag_db in plugin.tag_db: for tag_name in tag_db: del self.tag_db[db_name][tag_name] if not self.tag_db[db_name]: # if there's no more tag in the db del self.tag_db[db_name] self.tag_db.delete_from_db(plugin.tag_db) # remove handlers for handler_name in plugin.rpc_handler: del self.rpc_handler[handler_name] plugin.stop() def start(self): logger.info('Starting node') for signal in self.signals.itervalues(): signal.start() logger.debug('About to connect') self.connect.start() logger.debug('About to start ev_loop') self.evloop.start() def stop(self, watcher=None, revents=None): logger.info('Exiting node...') if self.connect is not None: self.connect.stop() # close rpc if self.rpc_con is not None: self.rpc_con.shutdown() # close all plugins for plugin in self.registered_plugins: plugin.stop() self.registered_plugins = set() self.role = None self.evloop.stop() def reload(self, watcher=None, revents=None): logger.info(u'Reloading logging configuration...') logging.config.fileConfig(self.config_path)