Skip to content
Snippets Groups Projects
node.py 5.17 KiB
Newer Older
import time
import logging
from threading import Thread, Lock

from sjrpc.client import SimpleRpcClient
from sjrpc.utils import ConnectionProxy, RpcHandler, pure

from ccnode import __version__
from ccnode.tags import Tag, get_tags


logger = logging.getLogger(__name__)


Anael Beutot's avatar
Anael Beutot committed
DEFAULT_TAGS = (Tag(u'version', __version__, -1),)


class DefaultHandler(RpcHandler):
    """Base handler for :class:`Node` objects. Containing only a ``version``
    tag that returns current ``cc-node`` version.

    See `sjRpc documentation <http://google.fr>`_ for more information.

    """
    def __init__(self, *args, **kwargs):
        RpcHandler.__init__(self)

        self.tags = dict((t.name, t) for t in DEFAULT_TAGS)

    @pure
    def get_tags(self, tags=None, noresolve_tags=None):
        """Method used from the ``cc-server`` to get tags.

        :param iterable tags: list of tags to return
        :param iterable noresolve_tags: list of tags to not return
        """
        return get_tags(self, tags, noresolve_tags)


class Node(Thread):
    """Main class for ccnode."""
    def __init__(self, server_host, server_port, user_name, user_passwd):
        """
        :param string server_host: hostname for cc-server
        :param int server_port: port for cc-server
        :param string user_name: account name for authentication to cc-server
        :param string user_passwd: password for cc-server authentication
        """
        Thread.__init__(self)

        # settings used as read only
        self.server_host = server_host
        self.server_port = int(server_port)
        self.user_name = user_name
        self.user_passwd = user_passwd

        self.daemon = True

        #: ``sjRpc`` proxy
        self.proxy = None
        #: ``sjRpc`` connection manager
        self.manager = None
        #: role returned by cc-server (set to None unless the authentication
        #: has succeed)
        self.role = None

        self._manager_lock = Lock()

    def init_rpc(self):
        """Init a new connection to ``cc-server``, create a ``sjRpc`` manager
        and proxy.

        """
        self.manager = SimpleRpcClient.from_addr(
            addr=self.server_host,
            port=self.server_port,
            enable_ssl=True,
            default_handler=DefaultHandler(),
        )
        self.proxy = ConnectionProxy(self.manager)

    def authentify(self):
        """Try to authenticate to the server. If successfull, import and then
        set :class:`Handler` corresponding to the role returned by the
        ``cc-server``.
        :raise: exception raised by :func:`proxy.authentify`
        """
        try:
            role = self.proxy.authentify(self.user_name, self.user_passwd)
        except Exception:
            logger.exception(u'Unknow exception while authentifying.')
            raise

        # set handler according to which role was returned by the cc-server
        if role == u'host':
            logger.debug(u'Role host affected.')
            from ccnode.host import Handler as HostHandler
            # FIXME bad API
            self.manager._connection.set_handler(HostHandler())
            self.role = u'host'
        elif role == u'hv':
            logger.debug(u'Role hypervisor affected.')
            from ccnode.hypervisor import Handler as HypervisorHandler
            # FIXME bad API
            self.manager._connection.set_handler(HypervisorHandler(
                proxy=self.proxy))
            self.role = u'hv'
        else:
            logger.debug(u'Wrong role returned: %s' % role)
            role = None
            time.sleep(2)

        self.role = role

    def rpc(self):
        """Runs ``sjRpc`` main loop. Catches exceptions. :func:`shutdown` the
        manager before returning."""
        try:
            self.manager.run()
        except Exception:
            logger.exception(u'Unknown exception:')
        finally:
            self.shutdown()

    def run(self):
        """Node main loop."""
        while True:
            # init rpc connection
            while True:
                try:
                    self.init_rpc()
                except Exception as e:
                    logger.exception(u'Error in init.')
                else:
                    break
                time.sleep(2)
            # launch main rpc thread
            rpc_thread = Thread(target=self.rpc)
            rpc_thread.daemon = True
            rpc_thread.start()
            # launch auth thread, make sure rpc is still running
            while rpc_thread.is_alive() and self.role is None:
                auth_thread = Thread(target=self.authentify)
                auth_thread.daemon = True
                auth_thread.start()
                auth_thread.join()
                # FIXME for debug
                time.sleep(4)

            # wait for rpc thread to terminates (it means error)
            rpc_thread.join()
            logger.error('Reconnecting to server.')
            # reset settings
            self.role = None

    def shutdown(self):
        """Shutdown the ``sjRpc`` manager and reset object state."""
        with self._manager_lock:
            if self.manager is not None:
                self.manager.shutdown()
                self.manager = None
                self.role = None