diff --git a/ccnode/hypervisor/__init__.py b/ccnode/hypervisor/__init__.py index 347fcbdf25ad2c2ba847f4b6f639fa9b55b5d719..8386298b559de033ef2a4cbca6edc449cd94bd75 100644 --- a/ccnode/hypervisor/__init__.py +++ b/ccnode/hypervisor/__init__.py @@ -163,7 +163,7 @@ class Hypervisor(object): self.type = u'kvm' # libvirt event loop abstraction - self.vir_event_loop = VirEventLoop(self.main.loop) + self.vir_event_loop = VirEventLoop(self.main.evloop) # This tells libvirt what event loop implementation it # should use libvirt.virEventRegisterImpl( diff --git a/ccnode/node.py b/ccnode/node.py index 211c83094f61da6dcccf6e5e54c5325da9a9254c..d46523ab360f72accb40689c76d9d426c2a0ad8d 100644 --- a/ccnode/node.py +++ b/ccnode/node.py @@ -2,6 +2,7 @@ import time import signal import logging import logging.config +from threading import Thread from collections import defaultdict from functools import partial @@ -21,55 +22,78 @@ logger = logging.getLogger(__name__) DEFAULT_TAGS = (Tag(u'version', __version__),) -class RPCHandler(object): - """Handles rpc connection to the remote cc-server.""" +class RPCStartHandler(Thread): + """Handles rpc connection and authentication to the remote cc-server. + + This class inherits from :class:`Thread` but only the connection part is run + in a background thread. + + """ def __init__(self, loop): + """ + :param loop: MainLoop instance + """ + Thread.__init__(self) + self.daemon = True + self.run = self.rpc_connect + self.loop = loop - self.watcher = loop.loop.io(self.cb) - self.timer = loop.loop.timer(0, 5, self.timeout) + # signal to the main thread when connection is done + self.async = loop.evloop.async(self.async_cb) + self.timer = loop.evloop.timer(5., 5., self.in_progress_cb) - self.connection = None - self.proxy = None + self.rpc_con = None + # id for async rpc call + self.auth_id = None - # configure socket - self.sock = None - self.sock.setblocking(0) + # Method run in the thread + def rpc_connect(self): + while True: + try: + logger.debug('About to create connection') + self.rpc_con = RpcConnection.from_addr_ssl( + addr=self.loop.config.server_host, + port=self.loop.config.server_port, + handler=self.loop.rpc_handler, + loop=self.loop.evloop, + ) + logger.debug('Connection created') + except IOError: + logger.exception('Error while connecting to the cc-server') + time.sleep(5) + else: + break + + logger.debug('Async send') + self.async.send() # success - def cb(self, watcher, revents): - if revents & pyev.EV_WRITE: - self.loop.connection = RpcConnection( - sock=ssl.wrap_socket(self.sock, ssl_version=ssl.PROTOCOL_TLSv1), - loop=self.loop.loop, - ) - self.loop.proxy = ConnectionProxy(self.loop.connection) - - def timeout(self, watcher, revents): - self.sock.close() - - def run(self): - self.watcher.start() - self.timeout.start() - self.connection = RpcConnection.from_addr_ssl( - addr=self.loop.config.server_host, - port=self.loop.config.server_port, - handler=self.loop.handlers, - ) - self.proxy = ConnectionProxy(self.connection) - #self.loop. + def start(self): + self.timer.start() + self.async.start() + Thread.start(self) -class AuthHandler(object): - """Handles rpc authentication to the remote cc-server.""" - def __init__(self, loop): - self.loop = loop - self.watcher = loop.loop.timer(.5, 5, self.cb) - self.auth_id = None - - def cb(self, watcher, revents): + def stop(self): + self.timer.stop() + self.async.stop() + + def in_progress_cb(self, *args): + logger.info('Connection to the cc-server still in progress') + + def async_cb(self, *args): + logger.debug('Async callback') + self.timer.stop() + # connect is done + self.loop.rpc_con = self.rpc_con + # start authentication + self.timer = self.loop.evloop.timer(.5, 5., self.auth_cb) + self.timer.start() + + def auth_cb(self, *args): logger.debug('Callback auth') # check is fallback mode on sjrpc is set otherwise our call would block # the loop - if not self.loop.rpc_con._event_fallback.is_set(): + if not self.rpc_con._event_fallback.is_set(): logger.debug('Will try authentication again latter') return @@ -78,14 +102,14 @@ class AuthHandler(object): return # try to authenticate - self.auth_id = self.loop.rpc_con.rpc.async_call_cb( - self.cb_auth, + self.auth_id = self.rpc_con.rpc.async_call_cb( + self.auth_done_cb, 'authentify', self.loop.config.server_user, self.loop.config.server_passwd, ) - def cb_auth(self, call_id, response=None, error=None): + def auth_done_cb(self, call_id, response=None, error=None): assert call_id == self.auth_id if error is not None: @@ -115,21 +139,15 @@ class AuthHandler(object): return if self.loop.role is not None: - self.watcher.stop() + self.stop() logger.info('Successfully authenticated with role %s', response) self.loop.register_plugin(self.loop.role) self.auth_id = None - def start(self): - self.watcher.start() - - def stop(self): - self.watcher.stop() - class MainLoop(object): def __init__(self, config_path): - self.loop = pyev.default_loop(debug=True) + self.evloop = pyev.default_loop(debug=True) self.config_path = config_path # set signal watchers @@ -142,7 +160,7 @@ class MainLoop(object): # turn into real watchers self.signals = dict(( signal, - self.loop.signal(signal, cb), + self.evloop.signal(signal, cb), ) for signal, cb in self.signals.iteritems()) # load config variables @@ -154,7 +172,7 @@ class MainLoop(object): # rpc connection self.rpc_con = None - self.auth = AuthHandler(self) + self.connect = RPCStartHandler(self) # role self.role = None @@ -197,7 +215,7 @@ class MainLoop(object): :param tag: :class:`Tag` to add/replace """ # TODO tag register - tag.start(self.loop) + tag.start(self.evloop) self.tag_db['__main__'][tag.name] = tag def remove_tag(self, tag_name): @@ -206,7 +224,7 @@ class MainLoop(object): def reset_sub_tag(self, sub_id, tag): # TODO tag register - tag.start(self.loop) + tag.start(self.evloop) self.tag_db[sub_id][tag.name] = tag def remove_sub_tag(self, sub_id, tag_name): @@ -252,29 +270,19 @@ class MainLoop(object): plugin.stop() - def rpc_connect(self): - # TODO async and error handling - self.rpc_con = RpcConnection.from_addr_ssl( - addr=self.config.server_host, - port=self.config.server_port, - handler=self.rpc_handler, - loop=self.loop, - ) - self.proxy = ConnectionProxy(self.rpc_con) - def start(self): logger.info('Starting node') for signal in self.signals.itervalues(): signal.start() logger.debug('About to connect') - self.rpc_connect() - self.auth.start() + self.connect.start() logger.debug('About to start ev_loop') - self.loop.start() + self.evloop.start() def stop(self, watcher=None, revents=None): logger.info('Exiting node...') - self.auth.stop() + if self.connect is not None: + self.connect.stop() # close rpc if self.rpc_con is not None: self.rpc_con.shutdown() @@ -284,7 +292,7 @@ class MainLoop(object): self.registered_plugins = set() self.role = None - self.loop.stop() + self.evloop.stop() def reload(self, watcher=None, revents=None): logger.info(u'Reloading logging configuration...') diff --git a/ccnode/plugins.py b/ccnode/plugins.py index 9d689c40c9f9d86f8bd48fd8adeb46ddd7ea0fe7..3a1a8011e3c7aa10a8781749f49a6438d157d61d 100644 --- a/ccnode/plugins.py +++ b/ccnode/plugins.py @@ -14,6 +14,7 @@ class Base(object): """ :param loop: MainLoop instance """ + #: MainLoop instance self.main = kwargs.pop('loop') # plugins may define tags (see :mod:`ccnode.tags`) @@ -44,7 +45,7 @@ class Base(object): for tag in chain.from_iterable( imap(lambda d: d.itervalues(), self.tag_db.itervalues()), ): - tag.start(self.main.loop) + tag.start(self.main.evloop) def stop(self): """Cleanup for plugins, can be used to clean pyev watchers."""