Skip to content
Snippets Groups Projects
node.py 10.8 KiB
Newer Older
import time
Anael Beutot's avatar
Anael Beutot committed
import signal
import logging
Anael Beutot's avatar
Anael Beutot committed
import logging.config
from threading import Thread
Anael Beutot's avatar
Anael Beutot committed
from collections import defaultdict
Anael Beutot's avatar
Anael Beutot committed
from functools import partial
Anael Beutot's avatar
Anael Beutot committed
import pyev
from sjrpc.core import RpcConnection, RpcError
from sjrpc.utils import ConnectionProxy, RpcHandler, threadless

from ccnode import __version__
Anael Beutot's avatar
Anael Beutot committed
from ccnode.config import NodeConfigParser
from ccnode.tags import Tag, get_tags, RootTagDB
from ccnode.jobs import JobManager
Anael Beutot's avatar
Anael Beutot committed
from ccnode.exc import PluginError


logger = logging.getLogger(__name__)


Anael Beutot's avatar
Anael Beutot committed
DEFAULT_TAGS = (Tag(u'version', __version__),)
class RPCStartHandler(Thread):
    """Handles rpc connection and authentication to the remote cc-server.

    This class inherits from :class:`Thread` but only the connection part is run
    in a background thread.

    Note:
        As :class:`Thread`, it can be started only one time.

Anael Beutot's avatar
Anael Beutot committed
    def __init__(self, loop):
        """
        :param loop: MainLoop instance
        """
        Thread.__init__(self)
        self.daemon = True
        self.run = self.rpc_connect

Anael Beutot's avatar
Anael Beutot committed
        self.loop = loop
        # signal to the main thread when connection is done
        self.async = loop.evloop.async(self.async_cb)
        self.timer = loop.evloop.timer(5., 5., self.in_progress_cb)
        self.rpc_con = None
        # id for async rpc call
        self.auth_id = None
    # Method run in the thread
    def rpc_connect(self):
        while True:
            try:
                self.rpc_con = RpcConnection.from_addr_ssl(
                    addr=self.loop.config.server_host,
                    port=self.loop.config.server_port,
                    handler=self.loop.rpc_handler,
                    loop=self.loop.evloop,
                    on_disconnect=self.loop.restart_rpc_connection,
                )
            except IOError:
                logger.exception('Error while connecting to the cc-server')
                time.sleep(5)
            else:
                break

        self.async.send()  # success
    def start(self):
        self.timer.start()
        self.async.start()
        Thread.start(self)
    def stop(self):
        self.timer.stop()
        self.async.stop()

    def in_progress_cb(self, *args):
        logger.info('Connection to the cc-server still in progress')

    def async_cb(self, *args):
        logger.debug('Async callback')
        self.timer.stop()
        # connect is done
        self.loop.rpc_con = self.rpc_con
        # start authentication
        self.timer = self.loop.evloop.timer(.5, 5., self.auth_cb)
        self.timer.start()

    def auth_cb(self, *args):
Anael Beutot's avatar
Anael Beutot committed
        # check is fallback mode on sjrpc is set otherwise our call would block
        # the loop
        if not self.rpc_con._event_fallback.is_set():
Anael Beutot's avatar
Anael Beutot committed
            logger.debug('Will try authentication again latter')
            return

        if self.auth_id is not None:
            logger.error('Authentication is taking longer than expected')
            return

        # try to authenticate
        try:
            self.auth_id = self.rpc_con.rpc.async_call_cb(
                self.auth_done_cb,
                'authentify',
                self.loop.config.server_user,
                self.loop.config.server_passwd,
            )
        except RpcError as exc:
            if exc.exception == 'RpcConnectionError':
                logger.error('Authentication failed: connection lost')
            else:
                logger.exception('Unexpected exception while authenticating')

            self.stop()
            self.loop.restart_rpc_connection()
    def auth_done_cb(self, call_id, response=None, error=None):
Anael Beutot's avatar
Anael Beutot committed
        assert call_id == self.auth_id
Anael Beutot's avatar
Anael Beutot committed
        if error is not None:
            # we got an error
            logger.error('Error while authenticating with cc-server: %s("%s")',
                         error['exception'], error.get('message', ''))

Anael Beutot's avatar
Anael Beutot committed
            # try to reconnect in 5 seconds
Anael Beutot's avatar
Anael Beutot committed
            return

        # set handler according to which role was returned by the cc-server
        if response == self.loop.role and response is not None:
            # we don't need to reload the plugins
            # but we need to register the objects and tags
            self.loop.tag_db.rpc_register()
        elif response == u'host':
            # close previous plugins if needed
            if self.loop.role is not None:
                self.loop.close_plugins()
Anael Beutot's avatar
Anael Beutot committed
            logger.debug('Role host affected')
            from ccnode.host import Handler as HostHandler
            self.loop.main_plugin = HostHandler(loop=self.loop)
            self.loop.role = u'host'
            # (re)-register the tags of the main loop
            self.loop.tag_db.rpc_register()
            self.loop.register_plugin(self.loop.main_plugin)
Anael Beutot's avatar
Anael Beutot committed
        elif response == u'hv':
            # close previous plugins if needed
            if self.loop.role is not None:
                self.loop.close_plugins()
Anael Beutot's avatar
Anael Beutot committed
            logger.debug('Role hypervisor affected')
            # we don't import those modules at the top because some dependancies
            # may not be installed
            from ccnode.hypervisor import Handler as HypervisorHandler
            self.loop.main_plugin = HypervisorHandler(
Anael Beutot's avatar
Anael Beutot committed
                hypervisor_name=self.loop.config.server_user,
Anael Beutot's avatar
Anael Beutot committed
                loop=self.loop,
Anael Beutot's avatar
Anael Beutot committed
            )
            self.loop.role = u'hv'
            # (re)-register the tags of the main loop
            self.loop.tag_db.rpc_register()
            self.loop.register_plugin(self.loop.main_plugin)
Anael Beutot's avatar
Anael Beutot committed
            logger.error('Failed authentication, role returned: %s', response)
Anael Beutot's avatar
Anael Beutot committed
            self.loop.role = None
            self.auth_id = None
            # we also close the previously opened plugins
            self.loop.close_plugins()
            return  # we retry while it fails
        self.stop()
        logger.info('Successfully authenticated with role %s', str(response))
Anael Beutot's avatar
Anael Beutot committed
        self.auth_id = None


class MainLoop(object):
    def __init__(self, config_path):
        self.evloop = pyev.Loop(debug=True)
Anael Beutot's avatar
Anael Beutot committed
        self.config_path = config_path

        # set signal watchers
        self.signals = {
            signal.SIGINT: self.stop,
            signal.SIGTERM: self.stop,
            signal.SIGUSR1: self.reload,
        }

        # turn into real watchers
        self.signals = dict((
            signal_,
            self.evloop.signal(signal_, cb),
        ) for signal_, cb in self.signals.iteritems())
Anael Beutot's avatar
Anael Beutot committed

        # load config variables
        self.config = NodeConfigParser(self.config_path)

        # configure logging
        logging.config.fileConfig(self.config_path)

        # rpc connection
        self.rpc_con = None

        self.connect = RPCStartHandler(self)
        self.reconnect = None
Anael Beutot's avatar
Anael Beutot committed

        # role
        self.role = None
        self.main_plugin = None
Anael Beutot's avatar
Anael Beutot committed
        # tag database
        self.tag_db = RootTagDB(self, tags=DEFAULT_TAGS)

        # job manages
        self.job_manager = JobManager(self)

Anael Beutot's avatar
Anael Beutot committed
        # handlers
        self.rpc_handler = dict(
            get_tags=partial(threadless(get_tags), self.tag_db['__main__']),
Anael Beutot's avatar
Anael Beutot committed
            sub_tags=self.sub_tags,
        )

Anael Beutot's avatar
Anael Beutot committed
        # plugins
        self.registered_plugins = set()

    @property
    def rpc_connected(self):
        return self.rpc_con is not None

    @property
    def rpc_authenticated(self):
        return self.rpc_connected and self.role is not None
Anael Beutot's avatar
Anael Beutot committed
    # RPC handlers definitions
    def sub_tags(self, sub_id, tags=None):
Anael Beutot's avatar
Anael Beutot committed
        if sub_id == '__main__':
            # FIXME should we raise ?
            logger.debug('Invalid request for sub object')
Anael Beutot's avatar
Anael Beutot committed

        sub_db = self.tag_db.get(sub_id)
        if sub_db is None:
            # FIXME should we also raise here ?
            logger.debug('Failed to find sub_id %s', sub_id)
        return get_tags(sub_db, tags)

    @threadless
    def job_list(self):
        pass
Anael Beutot's avatar
Anael Beutot committed
    # End RPC handlers definitions

    def reset_handler(self, name, handl):
        self.rpc_handler[name] = handl

    def remove_handler(self, name):
        self.rpc_handler.pop(name, None)

Anael Beutot's avatar
Anael Beutot committed
    def register_plugin(self, plugin):
        # keep track of registered plugins
        if plugin in self.registered_plugins:
            raise PluginError('Plugin was already registered')
        self.registered_plugins.add(plugin)

        # register tags
        plugin.tag_db.set_parent(self.tag_db)
Anael Beutot's avatar
Anael Beutot committed

        # register handler
        self.rpc_handler.update(plugin.rpc_handler)

Anael Beutot's avatar
Anael Beutot committed
        plugin.start()
Anael Beutot's avatar
Anael Beutot committed

    def unregister_plugin(self, plugin):
        try:
            self.registered_plugins.remove(plugin)
        except KeyError:
            raise PluginError('Plugin was not registered, cannot remove')

        # remove tags
        plugin.tag_db.set_parent(None)
Anael Beutot's avatar
Anael Beutot committed
        # remove handlers
        for handler_name in plugin.rpc_handler:
            del self.rpc_handler[handler_name]

        plugin.stop()

    def close_plugins(self):
        """Unregister all plugins from the loop."""
        for plugin in self.registered_plugins.copy():
            self.unregister_plugin(plugin)

    def restart_rpc_connection(self, *args):
        if not self.rpc_connected:
            return

Anael Beutot's avatar
Anael Beutot committed
        # clear connection
        self.rpc_con = None

        logger.error('Lost connection to the cc-server, will attempt'
                     ' reconnection')

        # reconnection atempt in one second
        self.reconnect = self.evloop.timer(
            1.,0., self.restart_rpc_connection_cb)
        self.reconnect.start()

    def restart_rpc_connection_cb(self, *args):
        # attempt to connect to the cc-server again
        self.connect = RPCStartHandler(self)
        self.connect.start()
        self.reconnect.stop()
        self.reconnect = None
Anael Beutot's avatar
Anael Beutot committed
    def start(self):
        logger.info('Starting node')
Anael Beutot's avatar
Anael Beutot committed
        for signal in self.signals.itervalues():
            signal.start()
        logger.debug('About to connect')
        self.connect.start()
Anael Beutot's avatar
Anael Beutot committed
        logger.debug('About to start ev_loop')
        self.evloop.start()
Anael Beutot's avatar
Anael Beutot committed

    def stop(self, watcher=None, revents=None):
        logger.info('Exiting node...')
        if self.connect is not None:
            self.connect.stop()
        if self.reconnect is not None:
            self.reconnect.stop()
Anael Beutot's avatar
Anael Beutot committed
        # close rpc
Anael Beutot's avatar
Anael Beutot committed
        if self.rpc_con is not None:
            # disable callback to prevent trampoline calls
            self.rpc_con._on_disconnect = None   # FIXME doesn't work
Anael Beutot's avatar
Anael Beutot committed
            self.rpc_con.shutdown()
Anael Beutot's avatar
Anael Beutot committed
        # close all plugins
        for plugin in self.registered_plugins:
            plugin.stop()
        self.registered_plugins = set()

        # FIXME check for closing of main tags that were not in plugin
Anael Beutot's avatar
Anael Beutot committed
        self.role = None
        self.main_plugin = None
        self.evloop.stop()
Anael Beutot's avatar
Anael Beutot committed

    def reload(self, watcher=None, revents=None):
        logger.info(u'Reloading logging configuration...')
        logging.config.fileConfig(self.config_path)