Newer
Older
import time
import logging
from threading import Thread, Lock
from sjrpc.client import SimpleRpcClient
from sjrpc.utils import ConnectionProxy, RpcHandler, pure
from ccnode import __version__
from ccnode.tags import Tag
logger = logging.getLogger(__name__)
class DefaultHandler(RpcHandler):
def __init__(self, *args, **kwargs):
RpcHandler.__init__(self, *args, **kwargs)
self.tags = dict((t.name, t) for t in DEFAULT_TAGS)
@pure
def get_tags(self, tags=None, noresolve_tags=None):
"""
:param iterable tags: list of tags to return
:param iterable noresolve_tags: list of tags to not return
"""
logger.debug('Tags request: %s, %s' % (
unicode(tags),
unicode(noresolve_tags),
))
tags = set(tags) - set(noresolve_tags) if tags is not None else None
if tags is None:
tags = self.tags.iterkeys()
else:
tags = tags & set(self.tags.iterkeys())
profile = time.time()
try:
result = dict((
t, # tag name
dict(
value=self.tags[t].value,
ttl=self.tags[t].ttl,
),
) for t in tags)
except Exception:
logger.exception(u'SHould not happend, result.')
logger.debug(u'Profiling: %f seconds.' % (time.time() - profile))
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
logger.debug(u'Returning: %s' % unicode(result))
return result
class Node(Thread):
"""Main class for ccnode."""
def __init__(self, server_host, server_port, user_name, user_passwd):
"""
:param string server_host: hostname for cc-server
:param int server_port: port for cc-server
:param string user_name: account name for authentication to cc-server
:param string user_passwd: password for cc-server authentication
"""
Thread.__init__(self)
# settings used as read only
self.server_host = server_host
self.server_port = int(server_port)
self.user_name = user_name
self.user_passwd = user_passwd
self.daemon = True
#: proxy
self.proxy = None
#: rpc connection manager
self.manager = None
#: role returned by cc-server
self.role = None
self._manager_lock = Lock()
def init_rpc(self):
self.manager = SimpleRpcClient.from_addr(
addr=self.server_host,
port=self.server_port,
enable_ssl=True,
default_handler=DefaultHandler(),
)
self.proxy = ConnectionProxy(self.manager)
def authentify(self):
"""Try to authenticate to the server while the server returns a bad
role.
"""
try:
role = self.proxy.authentify(self.user_name, self.user_passwd)
except Exception:
logger.exception(u'Unknow exception while authentifying.')
raise
# set handler according to which role was returned by the cc-server
if role == u'host':
logger.debug(u'Role host affected.')
from ccnode.host import Handler as HostHandler
# FIXME bad API
self.manager._connection.set_handler(HostHandler())
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
self.role = u'host'
elif role == u'hv':
logger.debug(u'Role hypervisor affected.')
self.role = u'hv'
else:
logger.debug(u'Wrong role returned: %s' % role)
role = None
time.sleep(2)
self.role = role
def rpc(self):
"""Runs rpc main loop."""
try:
self.manager.run()
except Exception:
logger.exception(u'Unknown exception:')
finally:
self.shutdown()
def run(self):
"""Node main loop."""
while True:
# init rpc connection
while True:
try:
self.init_rpc()
except Exception as e:
logger.exception(u'Error in init.')
else:
break
time.sleep(2)
# launch main rpc thread
rpc_thread = Thread(target=self.rpc)
rpc_thread.daemon = True
rpc_thread.start()
# launch auth thread, make sure rpc is still running
while rpc_thread.is_alive() and self.role is None:
auth_thread = Thread(target=self.authentify)
auth_thread.daemon = True
auth_thread.start()
auth_thread.join()
# wait for rpc thread to terminates (it means error)
rpc_thread.join()
logger.error('Reconnecting to server.')
# reset settings
self.role = None
def shutdown(self):
with self._manager_lock:
if self.manager is not None:
self.manager.shutdown()
self.manager = None
self.role = None