Commit 8eff778e authored by Anael Beutot's avatar Anael Beutot
Browse files

Do rpc connection in background.

Plus error handling.
parent b5c666e3
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -163,7 +163,7 @@ class Hypervisor(object):
        self.type = u'kvm'

        # libvirt event loop abstraction
        self.vir_event_loop = VirEventLoop(self.main.loop)
        self.vir_event_loop = VirEventLoop(self.main.evloop)
        # This tells libvirt what event loop implementation it
        # should use
        libvirt.virEventRegisterImpl(
+77 −69
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ import time
import signal
import logging
import logging.config
from threading import Thread
from collections import defaultdict
from functools import partial

@@ -21,55 +22,78 @@ logger = logging.getLogger(__name__)
DEFAULT_TAGS = (Tag(u'version', __version__),)


class RPCHandler(object):
    """Handles rpc connection to the remote cc-server."""
    def __init__(self, loop):
        self.loop = loop
        self.watcher = loop.loop.io(self.cb)
        self.timer = loop.loop.timer(0, 5, self.timeout)
class RPCStartHandler(Thread):
    """Handles rpc connection and authentication to the remote cc-server.

        self.connection = None
        self.proxy = None
    This class inherits from :class:`Thread` but only the connection part is run
    in a background thread.

        # configure socket
        self.sock = None
        self.sock.setblocking(0)
    """
    def __init__(self, loop):
        """
        :param loop: MainLoop instance
        """
        Thread.__init__(self)
        self.daemon = True
        self.run = self.rpc_connect

    def cb(self, watcher, revents):
        if revents & pyev.EV_WRITE:
            self.loop.connection = RpcConnection(
                sock=ssl.wrap_socket(self.sock, ssl_version=ssl.PROTOCOL_TLSv1),
                loop=self.loop.loop,
            )
            self.loop.proxy = ConnectionProxy(self.loop.connection)
        self.loop = loop
        # signal to the main thread when connection is done
        self.async = loop.evloop.async(self.async_cb)
        self.timer = loop.evloop.timer(5., 5., self.in_progress_cb)

    def timeout(self, watcher, revents):
        self.sock.close()
        self.rpc_con = None
        # id for async rpc call
        self.auth_id = None

    def run(self):
        self.watcher.start()
        self.timeout.start()
        self.connection = RpcConnection.from_addr_ssl(
    # Method run in the thread
    def rpc_connect(self):
        while True:
            try:
                logger.debug('About to create connection')
                self.rpc_con = RpcConnection.from_addr_ssl(
                    addr=self.loop.config.server_host,
                    port=self.loop.config.server_port,
            handler=self.loop.handlers,
                    handler=self.loop.rpc_handler,
                    loop=self.loop.evloop,
                )
        self.proxy = ConnectionProxy(self.connection)
        #self.loop.
                logger.debug('Connection created')
            except IOError:
                logger.exception('Error while connecting to the cc-server')
                time.sleep(5)
            else:
                break

        logger.debug('Async send')
        self.async.send()  # success

class AuthHandler(object):
    """Handles rpc authentication to the remote cc-server."""
    def __init__(self, loop):
        self.loop = loop
        self.watcher = loop.loop.timer(.5, 5, self.cb)
        self.auth_id = None
    def start(self):
        self.timer.start()
        self.async.start()

        Thread.start(self)

    def cb(self, watcher, revents):
    def stop(self):
        self.timer.stop()
        self.async.stop()

    def in_progress_cb(self, *args):
        logger.info('Connection to the cc-server still in progress')

    def async_cb(self, *args):
        logger.debug('Async callback')
        self.timer.stop()
        # connect is done
        self.loop.rpc_con = self.rpc_con
        # start authentication
        self.timer = self.loop.evloop.timer(.5, 5., self.auth_cb)
        self.timer.start()

    def auth_cb(self, *args):
        logger.debug('Callback auth')
        # check is fallback mode on sjrpc is set otherwise our call would block
        # the loop
        if not self.loop.rpc_con._event_fallback.is_set():
        if not self.rpc_con._event_fallback.is_set():
            logger.debug('Will try authentication again latter')
            return

@@ -78,14 +102,14 @@ class AuthHandler(object):
            return

        # try to authenticate
        self.auth_id = self.loop.rpc_con.rpc.async_call_cb(
            self.cb_auth,
        self.auth_id = self.rpc_con.rpc.async_call_cb(
            self.auth_done_cb,
            'authentify',
            self.loop.config.server_user,
            self.loop.config.server_passwd,
        )

    def cb_auth(self, call_id, response=None, error=None):
    def auth_done_cb(self, call_id, response=None, error=None):
        assert call_id == self.auth_id

        if error is not None:
@@ -115,21 +139,15 @@ class AuthHandler(object):
            return

        if self.loop.role is not None:
            self.watcher.stop()
            self.stop()
            logger.info('Successfully authenticated with role %s', response)
            self.loop.register_plugin(self.loop.role)
        self.auth_id = None

    def start(self):
        self.watcher.start()

    def stop(self):
        self.watcher.stop()


class MainLoop(object):
    def __init__(self, config_path):
        self.loop = pyev.default_loop(debug=True)
        self.evloop = pyev.default_loop(debug=True)
        self.config_path = config_path

        # set signal watchers
@@ -142,7 +160,7 @@ class MainLoop(object):
        # turn into real watchers
        self.signals = dict((
            signal,
            self.loop.signal(signal, cb),
            self.evloop.signal(signal, cb),
        ) for signal, cb in self.signals.iteritems())

        # load config variables
@@ -154,7 +172,7 @@ class MainLoop(object):
        # rpc connection
        self.rpc_con = None

        self.auth = AuthHandler(self)
        self.connect = RPCStartHandler(self)

        # role
        self.role = None
@@ -197,7 +215,7 @@ class MainLoop(object):
        :param tag: :class:`Tag` to add/replace
        """
        # TODO tag register
        tag.start(self.loop)
        tag.start(self.evloop)
        self.tag_db['__main__'][tag.name] = tag

    def remove_tag(self, tag_name):
@@ -206,7 +224,7 @@ class MainLoop(object):

    def reset_sub_tag(self, sub_id, tag):
        # TODO tag register
        tag.start(self.loop)
        tag.start(self.evloop)
        self.tag_db[sub_id][tag.name] = tag

    def remove_sub_tag(self, sub_id, tag_name):
@@ -252,29 +270,19 @@ class MainLoop(object):

        plugin.stop()

    def rpc_connect(self):
        # TODO async and error handling
        self.rpc_con = RpcConnection.from_addr_ssl(
            addr=self.config.server_host,
            port=self.config.server_port,
            handler=self.rpc_handler,
            loop=self.loop,
        )
        self.proxy = ConnectionProxy(self.rpc_con)

    def start(self):
        logger.info('Starting node')
        for signal in self.signals.itervalues():
            signal.start()
        logger.debug('About to connect')
        self.rpc_connect()
        self.auth.start()
        self.connect.start()
        logger.debug('About to start ev_loop')
        self.loop.start()
        self.evloop.start()

    def stop(self, watcher=None, revents=None):
        logger.info('Exiting node...')
        self.auth.stop()
        if self.connect is not None:
            self.connect.stop()
        # close rpc
        if self.rpc_con is not None:
            self.rpc_con.shutdown()
@@ -284,7 +292,7 @@ class MainLoop(object):
        self.registered_plugins = set()

        self.role = None
        self.loop.stop()
        self.evloop.stop()

    def reload(self, watcher=None, revents=None):
        logger.info(u'Reloading logging configuration...')
+2 −1
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ class Base(object):
        """
        :param loop: MainLoop instance
        """
        #: MainLoop instance
        self.main = kwargs.pop('loop')

        # plugins may define tags (see :mod:`ccnode.tags`)
@@ -44,7 +45,7 @@ class Base(object):
        for tag in chain.from_iterable(
            imap(lambda d: d.itervalues(), self.tag_db.itervalues()),
        ):
            tag.start(self.main.loop)
            tag.start(self.main.evloop)

    def stop(self):
        """Cleanup for plugins, can be used to clean pyev watchers."""