Newer
Older
import time
import logging
from threading import Thread, Lock
from sjrpc.client import SimpleRpcClient
from sjrpc.utils import ConnectionProxy, RpcHandler, pure
from ccnode import __version__
from ccnode.tags import Tag
logger = logging.getLogger(__name__)
"""Base handler for :class:`Node` objects. Containing only a ``version``
tag that returns current ``cc-node`` version.
See `sjRpc documentation <http://google.fr>`_ for more information.
"""
def __init__(self, *args, **kwargs):
RpcHandler.__init__(self, *args, **kwargs)
self.tags = dict((t.name, t) for t in DEFAULT_TAGS)
@pure
def get_tags(self, tags=None, noresolve_tags=None):
"""Method used from the ``cc-server`` to get tags.
:param iterable tags: list of tags to return
:param iterable noresolve_tags: list of tags to not return
"""
logger.debug('Tags request: %s, %s' % (
unicode(tags),
unicode(noresolve_tags),
))
tags = set(tags) - set(noresolve_tags) if tags is not None else None
if tags is None:
tags = self.tags.iterkeys()
else:
tags = tags & set(self.tags.iterkeys())
profile = time.time()
try:
result = dict((
t, # tag name
dict(
value=self.tags[t].value,
ttl=self.tags[t].ttl,
),
) for t in tags)
except Exception:
logger.exception(u'SHould not happend, result.')
logger.debug(u'Profiling: %f seconds.' % (time.time() - profile))
logger.debug(u'Returning: %s' % unicode(result))
return result
class Node(Thread):
"""Main class for ccnode."""
def __init__(self, server_host, server_port, user_name, user_passwd):
"""
:param string server_host: hostname for cc-server
:param int server_port: port for cc-server
:param string user_name: account name for authentication to cc-server
:param string user_passwd: password for cc-server authentication
"""
Thread.__init__(self)
# settings used as read only
self.server_host = server_host
self.server_port = int(server_port)
self.user_name = user_name
self.user_passwd = user_passwd
self.daemon = True
#: role returned by cc-server (set to None unless the authentication
#: has succeed)
self.role = None
self._manager_lock = Lock()
def init_rpc(self):
"""Init a new connection to ``cc-server``, create a ``sjRpc`` manager
and proxy.
"""
self.manager = SimpleRpcClient.from_addr(
addr=self.server_host,
port=self.server_port,
enable_ssl=True,
default_handler=DefaultHandler(),
)
self.proxy = ConnectionProxy(self.manager)
def authentify(self):
"""Try to authenticate to the server. If successfull, import and then
set :class:`Handler` corresponding to the role returned by the
``cc-server``.
:raise: exception raised by :func:`proxy.authentify`
"""
try:
role = self.proxy.authentify(self.user_name, self.user_passwd)
except Exception:
logger.exception(u'Unknow exception while authentifying.')
raise
# set handler according to which role was returned by the cc-server
if role == u'host':
logger.debug(u'Role host affected.')
from ccnode.host import Handler as HostHandler
# FIXME bad API
self.manager._connection.set_handler(HostHandler())
self.role = u'host'
elif role == u'hv':
logger.debug(u'Role hypervisor affected.')
self.role = u'hv'
else:
logger.debug(u'Wrong role returned: %s' % role)
role = None
time.sleep(2)
self.role = role
def rpc(self):
"""Runs ``sjRpc`` main loop. Catches exceptions. :func:`shutdown` the
manager before returning."""
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
try:
self.manager.run()
except Exception:
logger.exception(u'Unknown exception:')
finally:
self.shutdown()
def run(self):
"""Node main loop."""
while True:
# init rpc connection
while True:
try:
self.init_rpc()
except Exception as e:
logger.exception(u'Error in init.')
else:
break
time.sleep(2)
# launch main rpc thread
rpc_thread = Thread(target=self.rpc)
rpc_thread.daemon = True
rpc_thread.start()
# launch auth thread, make sure rpc is still running
while rpc_thread.is_alive() and self.role is None:
auth_thread = Thread(target=self.authentify)
auth_thread.daemon = True
auth_thread.start()
auth_thread.join()
# wait for rpc thread to terminates (it means error)
rpc_thread.join()
logger.error('Reconnecting to server.')
# reset settings
self.role = None
def shutdown(self):
"""Shutdown the ``sjRpc`` manager and reset object state."""
with self._manager_lock:
if self.manager is not None:
self.manager.shutdown()
self.manager = None
self.role = None