Commit c2720b2b authored by Anael Beutot's avatar Anael Beutot Committed by Antoine Millet
Browse files

Added base event loop for clients

parent 8825b2c5
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
class CloudControlError(Exception):
    """Base class for cloud control errors."""


class PluginError(CloudControlError):
    """Exception related to plugin execution."""
    pass
+327 −0
Original line number Diff line number Diff line
import time
import signal
import logging
import logging.config
from threading import Thread
from functools import partial

import pyev
from sjrpc.core import RpcConnection, RpcError
from sjrpc.utils import threadless

from cloudcontrol.common.client.tags import get_tags, RootTagDB
from cloudcontrol.common.client.exc import PluginError


logger = logging.getLogger(__name__)


class RPCStartHandler(Thread):
    """Handles rpc connection and authentication to the remote cc-server.

    This class inherits from :class:`Thread` but only the connection part is
    run in a background thread.

    Note:
        As :class:`Thread`, it can be started only one time.

    """
    def __init__(self, loop):
        """
        :param loop: MainLoop instance
        """
        Thread.__init__(self)
        self.daemon = True
        self.run = self.rpc_connect

        self.loop = loop
        # signal to the main thread when connection is done
        self.async = loop.evloop.async(self.async_cb)
        self.timer = loop.evloop.timer(5., 5., self.in_progress_cb)

        self.rpc_con = None
        # id for async rpc call
        self.auth_id = None

    # Method run in the thread
    def rpc_connect(self):
        while True:
            try:
                self.rpc_con = RpcConnection.from_addr_ssl(
                    addr=self.loop.config.server_host,
                    port=self.loop.config.server_port,
                    handler=self.loop.rpc_handler,
                    loop=self.loop.evloop,
                    on_disconnect=self.loop.restart_rpc_connection,
                )
            except IOError:
                logger.exception('Error while connecting to the cc-server')
                time.sleep(5)
            else:
                break

        self.async.send()  # success

    def start(self):
        self.timer.start()
        self.async.start()

        Thread.start(self)

    def stop(self):
        self.timer.stop()
        self.async.stop()

    def in_progress_cb(self, *args):
        logger.info('Connection to the cc-server still in progress')

    def async_cb(self, *args):
        logger.debug('Async callback')
        self.timer.stop()
        # connect is done
        self.loop.rpc_con = self.rpc_con
        # start authentication
        self.timer = self.loop.evloop.timer(.5, 5., self.auth_cb)
        self.timer.start()

    def auth_cb(self, *args):
        # check is fallback mode on sjrpc is set otherwise our call would block
        # the loop
        if not self.rpc_con._event_fallback.is_set():
            logger.debug('Will try authentication again latter')
            return

        if self.auth_id is not None:
            logger.error('Authentication is taking longer than expected')
            return

        # try to authenticate
        try:
            self.auth_id = self.rpc_con.rpc.async_call_cb(
                self.auth_done_cb,
                'authentify',
                self.loop.config.server_user,
                self.loop.config.server_passwd,
            )
        except RpcError as exc:
            if exc.exception == 'RpcConnectionError':
                logger.error('Authentication failed: connection lost')
            else:
                logger.exception('Unexpected exception while authenticating')

            self.stop()
            self.rpc_con.shutdown()
            self.loop.restart_rpc_connection()

    def auth_done_cb(self, call_id, response=None, error=None):
        assert call_id == self.auth_id

        if error is not None:
            # we got an error
            logger.error('Error while authenticating with cc-server: %s("%s")',
                         error['exception'], error.get('message', ''))

            self.set_reconnect()
            return

    def set_reconnect(self):
        self.loop.role = None
        self.auth_id = None
        # we also close the previously opened plugins
        self.loop.close_plugins()
        self.timer.stop()
        # we don't directly shutdown the rpc connection as the current
        # callback is not fully completed, thus it will be called again
        self.timer = self.loop.evloop.timer(.5, .0, self.handle_reconnect)
        self.timer.start()

    def handle_reconnect(self, *args):
        self.stop()
        self.rpc_con.shutdown()
        self.loop.restart_rpc_connection()


class MainLoop(object):

    # DEFAULTS_TAGS = tuple()
    # CONNECT_CLASS = RPCStartHandler
    # CONFIG_CLASS = None

    def __init__(self, config_path):
        self.config_path = config_path
        # load config variables
        self.load_config()
        # configure logging
        self.configure_logging()

        self.evloop = pyev.Loop(debug=self.config.debug)

        # set signal watchers
        self.signals = {
            signal.SIGINT: self.stop,
            signal.SIGTERM: self.stop,
            signal.SIGUSR1: self.reload,
        }

        # turn into real watchers
        self.signals = dict((
            signal_,
            self.evloop.signal(signal_, cb),
        ) for signal_, cb in self.signals.iteritems())

        # rpc connection
        self.rpc_con = None

        self.connect = self.CONNECT_CLASS(self)
        self.reconnect = None

        # role
        self.role = None
        self.main_plugin = None

        # tag database
        self.tag_db = RootTagDB(self, tags=self.DEFAULT_TAGS)

        # handlers
        self.rpc_handler = dict(
            get_tags=partial(threadless(get_tags), self.tag_db['__main__']),
            sub_tags=self.sub_tags,
        )

        # plugins
        self.registered_plugins = set()

    @property
    def rpc_connected(self):
        return self.rpc_con is not None

    @property
    def rpc_authenticated(self):
        return self.rpc_connected and self.role is not None

    def load_config(self):
        self.config = self.CONFIG_CLASS(self.config_path)

    def configure_logging(self):
        raise NotImplementedError

    # RPC handlers definitions
    @threadless
    def sub_tags(self, sub_id, tags=None):
        if sub_id == '__main__':
            # FIXME should we raise ?
            logger.debug('Invalid request for sub object')
            return {}

        sub_db = self.tag_db.get(sub_id)
        if sub_db is None:
            # FIXME should we also raise here ?
            logger.debug('Failed to find sub_id %s', sub_id)
            return {}

        return get_tags(sub_db, tags)

    @threadless
    def job_list(self):
        pass
    # End RPC handlers definitions

    def reset_handler(self, name, handl):
        self.rpc_handler[name] = handl

    def remove_handler(self, name):
        self.rpc_handler.pop(name, None)

    def register_plugin(self, plugin):
        # keep track of registered plugins
        if plugin in self.registered_plugins:
            raise PluginError('Plugin was already registered')
        self.registered_plugins.add(plugin)

        # register tags
        plugin.tag_db.set_parent(self.tag_db)

        # register handler
        self.rpc_handler.update(plugin.rpc_handler)

        plugin.start()

    def unregister_plugin(self, plugin):
        try:
            self.registered_plugins.remove(plugin)
        except KeyError:
            raise PluginError('Plugin was not registered, cannot remove')

        # remove tags
        plugin.tag_db.set_parent(None)

        # remove handlers
        for handler_name in plugin.rpc_handler:
            del self.rpc_handler[handler_name]

        plugin.stop()

    def close_plugins(self):
        """Unregister all plugins from the loop."""
        for plugin in self.registered_plugins.copy():
            self.unregister_plugin(plugin)

    def restart_rpc_connection(self, *args):
        if not self.rpc_connected:
            return

        # clear connection
        self.rpc_con = None

        logger.error('Lost connection to the cc-server, will attempt'
                     ' reconnection')

        # reconnection atempt in one second
        self.reconnect = self.evloop.timer(
            1., 0., self.restart_rpc_connection_cb)
        self.reconnect.start()

    def restart_rpc_connection_cb(self, *args):
        # attempt to connect to the cc-server again
        self.connect = self.CONNECT_CLASS(self)
        self.connect.start()
        self.reconnect.stop()
        self.reconnect = None

    def start(self):
        logger.info('Starting node')
        for signal_ in self.signals.itervalues():
            signal_.start()
        logger.debug('About to connect')
        self.connect.start()
        logger.debug('About to start ev_loop')
        self.evloop.start()

    def stop(self, watcher=None, revents=None):
        logger.info('Exiting node...')
        if self.connect is not None:
            self.connect.stop()
        if self.reconnect is not None:
            self.reconnect.stop()
        # close rpc
        if self.rpc_con is not None:
            # disable callback to prevent trampoline calls
            self.rpc_con._on_disconnect = None   # FIXME doesn't work
            self.rpc_con.shutdown()
        # close all plugins
        for plugin in self.registered_plugins:
            plugin.stop()
        self.registered_plugins = set()

        # FIXME check for closing of main tags that were not in plugin
        self.role = None
        self.main_plugin = None
        self.evloop.stop()

    def reload(self, watcher=None, revents=None):
        logger.info('Reloading logging configuration...')
        try:
            self.load_config()
            self.configure_logging()
        except Exception:
            logger.exception('Invalid config file')