Newer
Older
from sjrpc.core import RpcConnection
from sjrpc.utils import ConnectionProxy, RpcHandler
from ccnode.tags import Tag, get_tags
logger = logging.getLogger(__name__)
class RPCHandler(object):
"""Handles rpc connection to the remote cc-server."""
def __init__(self, loop):
self.loop = loop
self.watcher = loop.loop.io(self.cb)
self.timer = loop.loop.timer(0, 5, self.timeout)
# configure socket
self.sock = None
self.sock.setblocking(0)
def cb(self, watcher, revents):
if revents & pyev.EV_WRITE:
self.loop.connection = RpcConnection(
sock=ssl.wrap_socket(self.sock, ssl_version=ssl.PROTOCOL_TLSv1),
loop=self.loop.loop,
)
self.loop.proxy = ConnectionProxy(self.loop.connection)
def timeout(self, watcher, revents):
self.sock.close()
def run(self):
self.watcher.start()
self.timeout.start()
self.connection = RpcConnection.from_addr_ssl(
addr=self.loop.config.server_host,
port=self.loop.config.server_port,
handler=self.loop.handlers,
)
self.proxy = ConnectionProxy(self.connection)
#self.loop.
class AuthHandler(object):
"""Handles rpc authentication to the remote cc-server."""
def __init__(self, loop):
self.loop = loop
self.watcher = loop.loop.timer(.5, 5, self.cb)
self.auth_id = None
def cb(self, watcher, revents):
logger.debug('Callback auth')
# check is fallback mode on sjrpc is set otherwise our call would block
# the loop
if not self.loop.rpc_con._event_fallback.is_set():
logger.debug('Will try authentication again latter')
return
if self.auth_id is not None:
logger.error('Authentication is taking longer than expected')
return
# try to authenticate
self.auth_id = self.loop.rpc_con.rpc.async_call_cb(
self.cb_auth,
'authentify',
self.loop.config.server_user,
self.loop.config.server_passwd,
def cb_auth(self, call_id, response=None, error=None):
assert call_id == self.auth_id
if error is not None:
# we got an error
logger.error('Error while authenticating with cc-server: %s("%s")',
error['exception'], error.get('message', ''))
# try to recconnect in 5 seconds
return
# set handler according to which role was returned by the cc-server
from ccnode.host import Handler as HostHandler
self.loop.role = HostHandler(loop=self.loop.loop)
from ccnode.hypervisor import Handler as HypervisorHandler
logger.error('Failed authentication, role returned: %s', response)
self.loop.role = None
self.auth_id = None
return
if self.loop.role is not None:
self.watcher.stop()
logger.info('Successfully authenticated with role %s', response)
self.auth_id = None
def start(self):
self.watcher.start()
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
class MainLoop(object):
def __init__(self, config_path):
self.loop = pyev.default_loop()
self.config_path = config_path
# set signal watchers
self.signals = {
signal.SIGINT: self.stop,
signal.SIGTERM: self.stop,
signal.SIGUSR1: self.reload,
}
# turn into real watchers
self.signals = dict((
signal,
self.loop.signal(signal, cb),
) for signal, cb in self.signals.iteritems())
# load config variables
self.config = NodeConfigParser(self.config_path)
# configure logging
logging.config.fileConfig(self.config_path)
# rpc connection
self.rpc_con = None
self.auth = AuthHandler(self)
# role
self.role = None
__main__=dict((t.name, t)
for t in DEFAULT_TAGS), # db for main objects
# other sub-objects db can go here (for example VMs)
)
# handlers
self.rpc_handler = dict(
get_tags=partial(get_tags, self.tag_db['__main__']),
sub_tags=self.sub_tags,
)
# RPC handlers definitions
def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
if sub_id == '__main__':
# FIXME should we raise ?
logger.debug('Invalid request for sub object')
return
sub_db = self.tag_db.get(sub_id)
if sub_db is None:
# FIXME should we also raise here ?
logger.debug('Failed to find sub_id %s', sub_id)
return
return get_tags(sub_db, tags, noresolve_tags)
# End RPC handlers definitions
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def reset_tag(self, tag):
"""
:param tag: :class:`Tag` to add/replace
"""
# TODO tag register
tag.start(self.loop)
self.tag_db['__main__'][tag.name] = tag
def remove_tag(self, tag_name):
# TODO tag unregister
self.tag_db['__main__'].pop(tag_name).stop()
def reset_sub_tag(self, sub_id, tag):
# TODO tag register
tag.start(self.loop)
self.tag_db[sub_id][tag.name] = tag
def remove_sub_tag(self, sub_id, tag_name):
# TODO tag unregister
self.tag_db[sub_id].pop(tag_name).stop()
def register_plugin(self, plugin):
# keep track of registered plugins
if plugin in self.registered_plugins:
raise PluginError('Plugin was already registered')
self.registered_plugins.add(plugin)
# register tags
for name, sub_db in plugin.tag_db.iteritems():
self.tag_db[name].update(sub_db)
# register handler
self.rpc_handler.update(plugin.rpc_handler)
def unregister_plugin(self, plugin):
try:
self.registered_plugins.remove(plugin)
except KeyError:
raise PluginError('Plugin was not registered, cannot remove')
# remove tags
for db_name, tag_db in plugin.tag_db:
for tag_name in tag_db:
del self.tag_db[db_name][tag_name]
if not self.tag_db[db_name]: # if there's no more tag in the db
del self.tag_db[db_name]
# remove handlers
for handler_name in plugin.rpc_handler:
del self.rpc_handler[handler_name]
plugin.stop()
def rpc_connect(self):
# TODO async and error handling
self.rpc_con = RpcConnection.from_addr_ssl(
addr=self.config.server_host,
port=self.config.server_port,
loop=self.loop,
)
self.proxy = ConnectionProxy(self.rpc_con)
def start(self):
for signal in self.signals.itervalues():
signal.start()
logger.debug('About to connect')
self.rpc_connect()
self.auth.start()
logger.debug('About to start ev_loop')
self.loop.start()
def stop(self, watcher=None, revents=None):
logger.info('Exiting node...')
if self.rpc_con is not None:
self.rpc_con.shutdown()
# close all plugins
for plugin in self.registered_plugins:
plugin.stop()
self.registered_plugins = set()
self.role = None
self.loop.stop()
def reload(self, watcher=None, revents=None):
logger.info(u'Reloading logging configuration...')
logging.config.fileConfig(self.config_path)