import time import signal import logging import logging.config import pyev from sjrpc.core import RpcConnection from sjrpc.utils import ConnectionProxy, RpcHandler from ccnode import __version__ from ccnode.config import NodeConfigParser from ccnode.tags import Tag, get_tags logger = logging.getLogger(__name__) DEFAULT_TAGS = (Tag(u'version', __version__, -1),) class DefaultHandler(RpcHandler): """Base handler for :class:`Node` objects. Containing only a ``version`` tag that returns current ``cc-node`` version. See `sjRpc documentation <http://google.fr>`_ for more information. """ def __init__(self, *args, **kwargs): RpcHandler.__init__(self) self.tags = dict((t.name, t) for t in DEFAULT_TAGS) def get_tags(self, tags=None, noresolve_tags=None): """Method used from the ``cc-server`` to get tags. :param iterable tags: list of tags to return :param iterable noresolve_tags: list of tags to not return """ return get_tags(self, tags, noresolve_tags) def set_tag(self, tag): self.tags[tag.name] = tag def remove_tag(self, tag_name): self.tags.pop(tag_name, None) class AuthHandler(object): """Handles rpc authentication to the remote cc-server.""" def __init__(self, loop): self.loop = loop self.watcher = loop.loop.timer(.5, 5, self.cb) self.auth_id = None def cb(self, watcher, revents): logger.debug('Callback auth') # check is fallback mode on sjrpc is set otherwise our call would block # the loop if not self.loop.rpc_con._event_fallback.is_set(): logger.debug('Will try authentication again latter') return if self.auth_id is not None: logger.error('Authentication is taking longer than expected') return # try to authenticate self.auth_id = self.loop.rpc_con.rpc.async_call_cb( self.cb_auth, 'authentify', self.loop.config.server_user, self.loop.config.server_passwd, ) def cb_auth(self, call_id, response=None, error=None): assert call_id == self.auth_id if error is not None: # we got an error logger.error('Error while authenticating with cc-server: %s("%s")', error['exception'], error.get('message', '')) # try to recconnect in 5 seconds return # set handler according to which role was returned by the cc-server if response == u'host': logger.debug('Role host affected') from ccnode.host import Handler as HostHandler self.loop.rpc_con.rpc.set_handler(HostHandler()) self.loop.role = u'host' elif response == u'hv': logger.debug('Role hypervisor affected') from ccnode.hypervisor import Handler as HypervisorHandler self.loop.rpc_con.rpc.set_handler(HypervisorHandler( proxy=self.loop.proxy, hypervisor_name=self.loop.config.server_user)) self.loop.role = u'hv' else: logger.info('Failed authentication, role returned: %s', response) self.loop.role = None self.auth_id = None return if self.loop.role is not None: self.watcher.stop() logger.info('Successfully authenticated with role %s', response) self.auth_id = None def start(self): self.watcher.start() class MainLoop(object): def __init__(self, config_path): self.loop = pyev.default_loop() self.config_path = config_path # set signal watchers self.signals = { signal.SIGINT: self.stop, signal.SIGTERM: self.stop, signal.SIGUSR1: self.reload, } # turn into real watchers self.signals = dict(( signal, self.loop.signal(signal, cb), ) for signal, cb in self.signals.iteritems()) # load config variables self.config = NodeConfigParser(self.config_path) # configure logging logging.config.fileConfig(self.config_path) # rpc connection self.rpc_con = None self.auth = AuthHandler(self) # role self.role = None def rpc_connect(self): # TODO async and error handling self.rpc_con = RpcConnection.from_addr_ssl( addr=self.config.server_host, port=self.config.server_port, handler=DefaultHandler(), loop=self.loop, ) self.proxy = ConnectionProxy(self.rpc_con) def start(self): for signal in self.signals.itervalues(): signal.start() logger.debug('About to connect') self.rpc_connect() self.auth.start() logger.debug('About to start ev_loop') self.loop.start() def stop(self, watcher=None, revents=None): logger.info('Exiting node...') if self.rpc_con is not None: self.rpc_con.shutdown() self.loop.stop() def reload(self, watcher=None, revents=None): logger.info(u'Reloading logging configuration...') logging.config.fileConfig(self.config_path)