Newer
Older
import time
import logging
from threading import Thread, Lock
from sjrpc.client import SimpleRpcClient
from sjrpc.utils import ConnectionProxy, RpcHandler, pure
from ccnode import __version__
from ccnode.tags import Tag, get_tags
logger = logging.getLogger(__name__)
"""Base handler for :class:`Node` objects. Containing only a ``version``
tag that returns current ``cc-node`` version.
See `sjRpc documentation <http://google.fr>`_ for more information.
"""
self.tags = dict((t.name, t) for t in DEFAULT_TAGS)
@pure
def get_tags(self, tags=None, noresolve_tags=None):
"""Method used from the ``cc-server`` to get tags.
:param iterable tags: list of tags to return
:param iterable noresolve_tags: list of tags to not return
"""
return get_tags(self, tags, noresolve_tags)
class Node(Thread):
"""Main class for ccnode."""
def __init__(self, server_host, server_port, user_name, user_passwd):
"""
:param string server_host: hostname for cc-server
:param int server_port: port for cc-server
:param string user_name: account name for authentication to cc-server
:param string user_passwd: password for cc-server authentication
"""
Thread.__init__(self)
# settings used as read only
self.server_host = server_host
self.server_port = int(server_port)
self.user_name = user_name
self.user_passwd = user_passwd
self.daemon = True
#: role returned by cc-server (set to None unless the authentication
#: has succeed)
self.role = None
self._manager_lock = Lock()
def init_rpc(self):
"""Init a new connection to ``cc-server``, create a ``sjRpc`` manager
and proxy.
"""
self.manager = SimpleRpcClient.from_addr(
addr=self.server_host,
port=self.server_port,
enable_ssl=True,
default_handler=DefaultHandler(),
)
self.proxy = ConnectionProxy(self.manager)
def authentify(self):
"""Try to authenticate to the server. If successfull, import and then
set :class:`Handler` corresponding to the role returned by the
``cc-server``.
:raise: exception raised by :func:`proxy.authentify`
"""
try:
role = self.proxy.authentify(self.user_name, self.user_passwd)
except Exception:
logger.exception(u'Unknow exception while authentifying.')
raise
# set handler according to which role was returned by the cc-server
if role == u'host':
logger.debug(u'Role host affected.')
from ccnode.host import Handler as HostHandler
# FIXME bad API
self.manager._connection.set_handler(HostHandler())
self.role = u'host'
elif role == u'hv':
logger.debug(u'Role hypervisor affected.')
from ccnode.hypervisor import Handler as HypervisorHandler
# FIXME bad API
self.manager._connection.set_handler(HypervisorHandler(
proxy=self.proxy))
self.role = u'hv'
else:
logger.debug(u'Wrong role returned: %s' % role)
role = None
time.sleep(2)
self.role = role
def rpc(self):
"""Runs ``sjRpc`` main loop. Catches exceptions. :func:`shutdown` the
manager before returning."""
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
try:
self.manager.run()
except Exception:
logger.exception(u'Unknown exception:')
finally:
self.shutdown()
def run(self):
"""Node main loop."""
while True:
# init rpc connection
while True:
try:
self.init_rpc()
except Exception as e:
logger.exception(u'Error in init.')
else:
break
time.sleep(2)
# launch main rpc thread
rpc_thread = Thread(target=self.rpc)
rpc_thread.daemon = True
rpc_thread.start()
# launch auth thread, make sure rpc is still running
while rpc_thread.is_alive() and self.role is None:
auth_thread = Thread(target=self.authentify)
auth_thread.daemon = True
auth_thread.start()
auth_thread.join()
# FIXME for debug
time.sleep(4)
# wait for rpc thread to terminates (it means error)
rpc_thread.join()
logger.error('Reconnecting to server.')
# reset settings
self.role = None
def shutdown(self):
"""Shutdown the ``sjRpc`` manager and reset object state."""
with self._manager_lock:
if self.manager is not None:
self.manager.shutdown()
self.manager = None
self.role = None