import time import signal import logging import logging.config from collections import defaultdict from functools import partial import pyev from sjrpc.core import RpcConnection from sjrpc.utils import ConnectionProxy, RpcHandler from ccnode import __version__ from ccnode.config import NodeConfigParser from ccnode.tags import Tag, get_tags from ccnode.exc import PluginError logger = logging.getLogger(__name__) DEFAULT_TAGS = (Tag(u'version', __version__, -1),) class RPCHandler(object): """Handles rpc connection to the remote cc-server.""" def __init__(self, loop): self.loop = loop self.watcher = loop.loop.io(self.cb) self.timer = loop.loop.timer(0, 5, self.timeout) self.connection = None self.proxy = None # configure socket self.sock = None self.sock.setblocking(0) def cb(self, watcher, revents): if revents & pyev.EV_WRITE: self.loop.connection = RpcConnection( sock=ssl.wrap_socket(self.sock, ssl_version=ssl.PROTOCOL_TLSv1), loop=self.loop.loop, ) self.loop.proxy = ConnectionProxy(self.loop.connection) def timeout(self, watcher, revents): self.sock.close() def run(self): self.watcher.start() self.timeout.start() self.connection = RpcConnection.from_addr_ssl( addr=self.loop.config.server_host, port=self.loop.config.server_port, handler=self.loop.handlers, ) self.proxy = ConnectionProxy(self.connection) #self.loop. class AuthHandler(object): """Handles rpc authentication to the remote cc-server.""" def __init__(self, loop): self.loop = loop self.watcher = loop.loop.timer(.5, 5, self.cb) self.auth_id = None def cb(self, watcher, revents): logger.debug('Callback auth') # check is fallback mode on sjrpc is set otherwise our call would block # the loop if not self.loop.rpc_con._event_fallback.is_set(): logger.debug('Will try authentication again latter') return if self.auth_id is not None: logger.error('Authentication is taking longer than expected') return # try to authenticate self.auth_id = self.loop.rpc_con.rpc.async_call_cb( self.cb_auth, 'authentify', self.loop.config.server_user, self.loop.config.server_passwd, ) def cb_auth(self, call_id, response=None, error=None): assert call_id == self.auth_id if error is not None: # we got an error logger.error('Error while authenticating with cc-server: %s("%s")', error['exception'], error.get('message', '')) # try to recconnect in 5 seconds return # set handler according to which role was returned by the cc-server if response == u'host': logger.debug('Role host affected') from ccnode.host import Handler as HostHandler self.loop.role = HostHandler(loop=self.loop.loop) elif response == u'hv': logger.debug('Role hypervisor affected') from ccnode.hypervisor import Handler as HypervisorHandler self.loop.role = HypervisorHandler( proxy=self.loop.proxy, hypervisor_name=self.loop.config.server_user, loop=self.loop.loop, ) else: logger.error('Failed authentication, role returned: %s', response) self.loop.role = None self.auth_id = None return if self.loop.role is not None: self.watcher.stop() logger.info('Successfully authenticated with role %s', response) self.loop.register_plugin(self.loop.role) self.auth_id = None def start(self): self.watcher.start() class MainLoop(object): def __init__(self, config_path): self.loop = pyev.default_loop() self.config_path = config_path # set signal watchers self.signals = { signal.SIGINT: self.stop, signal.SIGTERM: self.stop, signal.SIGUSR1: self.reload, } # turn into real watchers self.signals = dict(( signal, self.loop.signal(signal, cb), ) for signal, cb in self.signals.iteritems()) # load config variables self.config = NodeConfigParser(self.config_path) # configure logging logging.config.fileConfig(self.config_path) # rpc connection self.rpc_con = None self.auth = AuthHandler(self) # role self.role = None # tag database self.tag_db = defaultdict( dict, __main__=dict((t.name, t) for t in DEFAULT_TAGS), # db for main objects # other sub-objects db can go here (for example VMs) ) # handlers self.rpc_handler = dict( get_tags=partial(get_tags, self.tag_db['__main__']), sub_tags=self.sub_tags, ) # plugins self.registered_plugins = set() # RPC handlers definitions def sub_tags(self, sub_id, tags=None, noresolve_tags=None): if sub_id == '__main__': # FIXME should we raise ? logger.debug('Invalid request for sub object') return sub_db = self.tag_db.get(sub_id) if sub_db is None: # FIXME should we also raise here ? logger.debug('Failed to find sub_id %s', sub_id) return return get_tags(sub_db, tags, noresolve_tags) # End RPC handlers definitions def reset_tag(self, tag): """ :param tag: :class:`Tag` to add/replace """ # TODO tag register tag.start(self.loop) self.tag_db['__main__'][tag.name] = tag def remove_tag(self, tag_name): # TODO tag unregister self.tag_db['__main__'].pop(tag_name).stop() def reset_sub_tag(self, sub_id, tag): # TODO tag register tag.start(self.loop) self.tag_db[sub_id][tag.name] = tag def remove_sub_tag(self, sub_id, tag_name): # TODO tag unregister self.tag_db[sub_id].pop(tag_name).stop() def register_plugin(self, plugin): # keep track of registered plugins if plugin in self.registered_plugins: raise PluginError('Plugin was already registered') self.registered_plugins.add(plugin) # register tags for name, sub_db in plugin.tag_db.iteritems(): self.tag_db[name].update(sub_db) # register handler self.rpc_handler.update(plugin.rpc_handler) plugin.start() def unregister_plugin(self, plugin): try: self.registered_plugins.remove(plugin) except KeyError: raise PluginError('Plugin was not registered, cannot remove') # remove tags for db_name, tag_db in plugin.tag_db: for tag_name in tag_db: del self.tag_db[db_name][tag_name] if not self.tag_db[db_name]: # if there's no more tag in the db del self.tag_db[db_name] # remove handlers for handler_name in plugin.rpc_handler: del self.rpc_handler[handler_name] plugin.stop() def rpc_connect(self): # TODO async and error handling self.rpc_con = RpcConnection.from_addr_ssl( addr=self.config.server_host, port=self.config.server_port, handler=self.rpc_handler, loop=self.loop, ) self.proxy = ConnectionProxy(self.rpc_con) def start(self): for signal in self.signals.itervalues(): signal.start() logger.debug('About to connect') self.rpc_connect() self.auth.start() logger.debug('About to start ev_loop') self.loop.start() def stop(self, watcher=None, revents=None): logger.info('Exiting node...') # close rpc if self.rpc_con is not None: self.rpc_con.shutdown() # close all plugins for plugin in self.registered_plugins: plugin.stop() self.registered_plugins = set() self.role = None self.loop.stop() def reload(self, watcher=None, revents=None): logger.info(u'Reloading logging configuration...') logging.config.fileConfig(self.config_path)