Skip to content
Snippets Groups Projects
node.py 8.54 KiB
Newer Older
import time
Anael Beutot's avatar
Anael Beutot committed
import signal
import logging
Anael Beutot's avatar
Anael Beutot committed
import logging.config
Anael Beutot's avatar
Anael Beutot committed
from collections import defaultdict
Anael Beutot's avatar
Anael Beutot committed
from functools import partial
Anael Beutot's avatar
Anael Beutot committed
import pyev
Anael Beutot's avatar
Anael Beutot committed
from sjrpc.core import RpcConnection
from sjrpc.utils import ConnectionProxy, RpcHandler

from ccnode import __version__
Anael Beutot's avatar
Anael Beutot committed
from ccnode.config import NodeConfigParser
from ccnode.tags import Tag, get_tags
Anael Beutot's avatar
Anael Beutot committed
from ccnode.exc import PluginError


logger = logging.getLogger(__name__)


Anael Beutot's avatar
Anael Beutot committed
DEFAULT_TAGS = (Tag(u'version', __version__),)
Anael Beutot's avatar
Anael Beutot committed
class RPCHandler(object):
    """Handles rpc connection to the remote cc-server."""
    def __init__(self, loop):
        self.loop = loop
        self.watcher = loop.loop.io(self.cb)
        self.timer = loop.loop.timer(0, 5, self.timeout)
Anael Beutot's avatar
Anael Beutot committed
        self.connection = None
        self.proxy = None
Anael Beutot's avatar
Anael Beutot committed
        # configure socket
        self.sock = None
        self.sock.setblocking(0)
Anael Beutot's avatar
Anael Beutot committed
    def cb(self, watcher, revents):
        if revents & pyev.EV_WRITE:
            self.loop.connection = RpcConnection(
                sock=ssl.wrap_socket(self.sock, ssl_version=ssl.PROTOCOL_TLSv1),
                loop=self.loop.loop,
            )
            self.loop.proxy = ConnectionProxy(self.loop.connection)
Anael Beutot's avatar
Anael Beutot committed
    def timeout(self, watcher, revents):
        self.sock.close()
Anael Beutot's avatar
Anael Beutot committed
    def run(self):
        self.watcher.start()
        self.timeout.start()
        self.connection = RpcConnection.from_addr_ssl(
            addr=self.loop.config.server_host,
            port=self.loop.config.server_port,
            handler=self.loop.handlers,
        )
        self.proxy = ConnectionProxy(self.connection)
        #self.loop.
Anael Beutot's avatar
Anael Beutot committed
class AuthHandler(object):
    """Handles rpc authentication to the remote cc-server."""
    def __init__(self, loop):
        self.loop = loop
        self.watcher = loop.loop.timer(.5, 5, self.cb)
        self.auth_id = None

    def cb(self, watcher, revents):
        logger.debug('Callback auth')
        # check is fallback mode on sjrpc is set otherwise our call would block
        # the loop
        if not self.loop.rpc_con._event_fallback.is_set():
            logger.debug('Will try authentication again latter')
            return

        if self.auth_id is not None:
            logger.error('Authentication is taking longer than expected')
            return

        # try to authenticate
        self.auth_id = self.loop.rpc_con.rpc.async_call_cb(
            self.cb_auth,
            'authentify',
            self.loop.config.server_user,
            self.loop.config.server_passwd,
Anael Beutot's avatar
Anael Beutot committed
    def cb_auth(self, call_id, response=None, error=None):
        assert call_id == self.auth_id
Anael Beutot's avatar
Anael Beutot committed
        if error is not None:
            # we got an error
            logger.error('Error while authenticating with cc-server: %s("%s")',
                         error['exception'], error.get('message', ''))

            # try to recconnect in 5 seconds
            return

        # set handler according to which role was returned by the cc-server
Anael Beutot's avatar
Anael Beutot committed
        if response == u'host':
Anael Beutot's avatar
Anael Beutot committed
            logger.debug('Role host affected')
            from ccnode.host import Handler as HostHandler
Anael Beutot's avatar
Anael Beutot committed
            self.loop.role = HostHandler(loop=self.loop.loop)
Anael Beutot's avatar
Anael Beutot committed
        elif response == u'hv':
Anael Beutot's avatar
Anael Beutot committed
            logger.debug('Role hypervisor affected')
            from ccnode.hypervisor import Handler as HypervisorHandler
Anael Beutot's avatar
Anael Beutot committed
            self.loop.role = HypervisorHandler(
Anael Beutot's avatar
Anael Beutot committed
                proxy=self.loop.proxy,
Anael Beutot's avatar
Anael Beutot committed
                hypervisor_name=self.loop.config.server_user,
Anael Beutot's avatar
Anael Beutot committed
                loop=self.loop.loop,
Anael Beutot's avatar
Anael Beutot committed
            )
Anael Beutot's avatar
Anael Beutot committed
            logger.error('Failed authentication, role returned: %s', response)
Anael Beutot's avatar
Anael Beutot committed
            self.loop.role = None
            self.auth_id = None
            return

        if self.loop.role is not None:
            self.watcher.stop()
            logger.info('Successfully authenticated with role %s', response)
Anael Beutot's avatar
Anael Beutot committed
            self.loop.register_plugin(self.loop.role)
Anael Beutot's avatar
Anael Beutot committed
        self.auth_id = None

    def start(self):
        self.watcher.start()

Anael Beutot's avatar
Anael Beutot committed
    def stop(self):
        self.watcher.stop()

Anael Beutot's avatar
Anael Beutot committed

class MainLoop(object):
    def __init__(self, config_path):
        self.loop = pyev.default_loop()
        self.config_path = config_path

        # set signal watchers
        self.signals = {
            signal.SIGINT: self.stop,
            signal.SIGTERM: self.stop,
            signal.SIGUSR1: self.reload,
        }

        # turn into real watchers
        self.signals = dict((
            signal,
            self.loop.signal(signal, cb),
        ) for signal, cb in self.signals.iteritems())

        # load config variables
        self.config = NodeConfigParser(self.config_path)

        # configure logging
        logging.config.fileConfig(self.config_path)

        # rpc connection
        self.rpc_con = None

        self.auth = AuthHandler(self)

        # role
        self.role = None

Anael Beutot's avatar
Anael Beutot committed
        # tag database
Anael Beutot's avatar
Anael Beutot committed
        self.tag_db = defaultdict(
            dict,
Anael Beutot's avatar
Anael Beutot committed
            __main__=dict((t.name, t)
                          for t in DEFAULT_TAGS),  # db for main objects
            # other sub-objects db can go here (for example VMs)
        )
        # handlers
        self.rpc_handler = dict(
            get_tags=partial(get_tags, self.tag_db['__main__']),
            sub_tags=self.sub_tags,
        )

Anael Beutot's avatar
Anael Beutot committed
        # plugins
        self.registered_plugins = set()

Anael Beutot's avatar
Anael Beutot committed
    # RPC handlers definitions
    def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
        if sub_id == '__main__':
            # FIXME should we raise ?
            logger.debug('Invalid request for sub object')
            return

        sub_db = self.tag_db.get(sub_id)
        if sub_db is None:
            # FIXME should we also raise here ?
            logger.debug('Failed to find sub_id %s', sub_id)
            return

        return get_tags(sub_db, tags, noresolve_tags)
    # End RPC handlers definitions

Anael Beutot's avatar
Anael Beutot committed
    def reset_tag(self, tag):
        """
        :param tag: :class:`Tag` to add/replace
        """
        # TODO tag register
        tag.start(self.loop)
        self.tag_db['__main__'][tag.name] = tag

    def remove_tag(self, tag_name):
        # TODO tag unregister
        self.tag_db['__main__'].pop(tag_name).stop()

    def reset_sub_tag(self, sub_id, tag):
        # TODO tag register
        tag.start(self.loop)
        self.tag_db[sub_id][tag.name] = tag

    def remove_sub_tag(self, sub_id, tag_name):
        # TODO tag unregister
        self.tag_db[sub_id].pop(tag_name).stop()

    def register_plugin(self, plugin):
        # keep track of registered plugins
        if plugin in self.registered_plugins:
            raise PluginError('Plugin was already registered')
        self.registered_plugins.add(plugin)

        # register tags
        for name, sub_db in plugin.tag_db.iteritems():
            self.tag_db[name].update(sub_db)

        # register handler
        self.rpc_handler.update(plugin.rpc_handler)

Anael Beutot's avatar
Anael Beutot committed
        plugin.start()
Anael Beutot's avatar
Anael Beutot committed

    def unregister_plugin(self, plugin):
        try:
            self.registered_plugins.remove(plugin)
        except KeyError:
            raise PluginError('Plugin was not registered, cannot remove')

        # remove tags
        for db_name, tag_db in plugin.tag_db:
            for tag_name in tag_db:
                del self.tag_db[db_name][tag_name]

            if not self.tag_db[db_name]:  # if there's no more tag in the db
                del self.tag_db[db_name]

        # remove handlers
        for handler_name in plugin.rpc_handler:
            del self.rpc_handler[handler_name]

        plugin.stop()

Anael Beutot's avatar
Anael Beutot committed
    def rpc_connect(self):
        # TODO async and error handling
        self.rpc_con = RpcConnection.from_addr_ssl(
            addr=self.config.server_host,
            port=self.config.server_port,
Anael Beutot's avatar
Anael Beutot committed
            handler=self.rpc_handler,
Anael Beutot's avatar
Anael Beutot committed
            loop=self.loop,
        )
        self.proxy = ConnectionProxy(self.rpc_con)

    def start(self):
        logger.info('Starting node')
Anael Beutot's avatar
Anael Beutot committed
        for signal in self.signals.itervalues():
            signal.start()
        logger.debug('About to connect')
        self.rpc_connect()
        self.auth.start()
        logger.debug('About to start ev_loop')
        self.loop.start()

    def stop(self, watcher=None, revents=None):
        logger.info('Exiting node...')
Anael Beutot's avatar
Anael Beutot committed
        self.auth.stop()
Anael Beutot's avatar
Anael Beutot committed
        # close rpc
Anael Beutot's avatar
Anael Beutot committed
        if self.rpc_con is not None:
            self.rpc_con.shutdown()
Anael Beutot's avatar
Anael Beutot committed
        # close all plugins
        for plugin in self.registered_plugins:
            plugin.stop()
        self.registered_plugins = set()

        self.role = None
Anael Beutot's avatar
Anael Beutot committed
        self.loop.stop()

    def reload(self, watcher=None, revents=None):
        logger.info(u'Reloading logging configuration...')
        logging.config.fileConfig(self.config_path)