import time import logging from threading import Thread, Lock from sjrpc.client import SimpleRpcClient from sjrpc.utils import ConnectionProxy, RpcHandler, pure from ccnode import __version__ from ccnode.tags import Tag, get_tags logger = logging.getLogger(__name__) DEFAULT_TAGS = (Tag(u'version', __version__, -1),) class DefaultHandler(RpcHandler): """Base handler for :class:`Node` objects. Containing only a ``version`` tag that returns current ``cc-node`` version. See `sjRpc documentation <http://google.fr>`_ for more information. """ def __init__(self, *args, **kwargs): RpcHandler.__init__(self) self.tags = dict((t.name, t) for t in DEFAULT_TAGS) @pure def get_tags(self, tags=None, noresolve_tags=None): """Method used from the ``cc-server`` to get tags. :param iterable tags: list of tags to return :param iterable noresolve_tags: list of tags to not return """ return get_tags(self, tags, noresolve_tags) class Node(Thread): """Main class for ccnode.""" def __init__(self, server_host, server_port, user_name, user_passwd): """ :param string server_host: hostname for cc-server :param int server_port: port for cc-server :param string user_name: account name for authentication to cc-server :param string user_passwd: password for cc-server authentication """ Thread.__init__(self) # settings used as read only self.server_host = server_host self.server_port = int(server_port) self.user_name = user_name self.user_passwd = user_passwd self.daemon = True #: ``sjRpc`` proxy self.proxy = None #: ``sjRpc`` connection manager self.manager = None #: role returned by cc-server (set to None unless the authentication #: has succeed) self.role = None self._manager_lock = Lock() def init_rpc(self): """Init a new connection to ``cc-server``, create a ``sjRpc`` manager and proxy. """ self.manager = SimpleRpcClient.from_addr( addr=self.server_host, port=self.server_port, enable_ssl=True, default_handler=DefaultHandler(), ) self.proxy = ConnectionProxy(self.manager) def authentify(self): """Try to authenticate to the server. If successfull, import and then set :class:`Handler` corresponding to the role returned by the ``cc-server``. :raise: exception raised by :func:`proxy.authentify` """ try: role = self.proxy.authentify(self.user_name, self.user_passwd) except Exception: logger.exception(u'Unknow exception while authentifying.') raise # set handler according to which role was returned by the cc-server if role == u'host': logger.debug(u'Role host affected.') from ccnode.host import Handler as HostHandler # FIXME bad API self.manager._connection.set_handler(HostHandler()) self.role = u'host' elif role == u'hv': logger.debug(u'Role hypervisor affected.') from ccnode.hypervisor import Handler as HypervisorHandler # FIXME bad API self.manager._connection.set_handler(HypervisorHandler( proxy=self.proxy)) self.role = u'hv' else: logger.debug(u'Wrong role returned: %s' % role) role = None time.sleep(2) self.role = role def rpc(self): """Runs ``sjRpc`` main loop. Catches exceptions. :func:`shutdown` the manager before returning.""" try: self.manager.run() except Exception: logger.exception(u'Unknown exception:') finally: self.shutdown() def run(self): """Node main loop.""" while True: # init rpc connection while True: try: self.init_rpc() except Exception as e: logger.exception(u'Error in init.') else: break time.sleep(2) # launch main rpc thread rpc_thread = Thread(target=self.rpc) rpc_thread.daemon = True rpc_thread.start() # launch auth thread, make sure rpc is still running while rpc_thread.is_alive() and self.role is None: auth_thread = Thread(target=self.authentify) auth_thread.daemon = True auth_thread.start() auth_thread.join() # FIXME for debug time.sleep(4) # wait for rpc thread to terminates (it means error) rpc_thread.join() logger.error('Reconnecting to server.') # reset settings self.role = None def shutdown(self): """Shutdown the ``sjRpc`` manager and reset object state.""" with self._manager_lock: if self.manager is not None: self.manager.shutdown() self.manager = None self.role = None