-
Anael Beutot authoredAnael Beutot authored
node.py 10.92 KiB
import time
import signal
import logging
import logging.config
from threading import Thread
from collections import defaultdict
from functools import partial
import pyev
from sjrpc.core import RpcConnection, RpcError
from sjrpc.utils import ConnectionProxy, RpcHandler, threadless
from ccnode import __version__
from ccnode.config import NodeConfigParser
from ccnode.tags import Tag, get_tags, TagDB
from ccnode.exc import PluginError
logger = logging.getLogger(__name__)
DEFAULT_TAGS = (Tag(u'version', __version__),)
class RPCStartHandler(Thread):
"""Handles rpc connection and authentication to the remote cc-server.
This class inherits from :class:`Thread` but only the connection part is run
in a background thread.
Note:
As :class:`Thread`, it can be started only one time.
"""
def __init__(self, loop):
"""
:param loop: MainLoop instance
"""
Thread.__init__(self)
self.daemon = True
self.run = self.rpc_connect
self.loop = loop
# signal to the main thread when connection is done
self.async = loop.evloop.async(self.async_cb)
self.timer = loop.evloop.timer(5., 5., self.in_progress_cb)
self.rpc_con = None
# id for async rpc call
self.auth_id = None
# Method run in the thread
def rpc_connect(self):
while True:
try:
logger.debug('About to create connection')
self.rpc_con = RpcConnection.from_addr_ssl(
addr=self.loop.config.server_host,
port=self.loop.config.server_port,
handler=self.loop.rpc_handler,
loop=self.loop.evloop,
on_disconnect=self.loop.restart_rpc_connection,
)
logger.debug('Connection created')
except IOError:
logger.exception('Error while connecting to the cc-server')
time.sleep(5)
else:
break
logger.debug('Async send')
self.async.send() # success
def start(self):
self.timer.start()
self.async.start()
Thread.start(self)
def stop(self):
self.timer.stop()
self.async.stop()
def in_progress_cb(self, *args):
logger.info('Connection to the cc-server still in progress')
def async_cb(self, *args):
logger.debug('Async callback')
self.timer.stop()
# connect is done
self.loop.rpc_con = self.rpc_con
# start authentication
self.timer = self.loop.evloop.timer(.5, 5., self.auth_cb)
self.timer.start()
def auth_cb(self, *args):
logger.debug('Callback auth')
# check is fallback mode on sjrpc is set otherwise our call would block
# the loop
if not self.rpc_con._event_fallback.is_set():
logger.debug('Will try authentication again latter')
return
if self.auth_id is not None:
logger.error('Authentication is taking longer than expected')
return
# try to authenticate
try:
self.auth_id = self.rpc_con.rpc.async_call_cb(
self.auth_done_cb,
'authentify',
self.loop.config.server_user,
self.loop.config.server_passwd,
)
except RpcError as exc:
if exc.exception == 'RpcConnectionError':
logger.error('Authentication failed: connection lost')
else:
logger.exception('Unexpected exception while authenticating')
self.stop()
self.loop.restart_rpc_connection()
def auth_done_cb(self, call_id, response=None, error=None):
assert call_id == self.auth_id
if error is not None:
# we got an error
logger.error('Error while authenticating with cc-server: %s("%s")',
error['exception'], error.get('message', ''))
# try to reconnect in 5 seconds
return
# set handler according to which role was returned by the cc-server
if response == self.loop.role and response is not None:
# we don't need to reload the plugins
# but we need to register the objects and tags
self.loop.tag_db.rpc_register()
elif response == u'host':
# close previous plugins if needed
if self.loop.role is not None:
self.loop.close_plugins()
# re-register the tags of the main loop
self.loop.tag_db.register()
logger.debug('Role host affected')
from ccnode.host import Handler as HostHandler
self.loop.main_plugin = HostHandler(loop=self.loop)
self.loop.role = u'host'
self.loop.register_plugin(self.loop.main_plugin)
elif response == u'hv':
# close previous plugins if needed
if self.loop.role is not None:
self.loop.close_plugins()
# re-register the tags of the main loop
self.loop.tag_db.register()
logger.debug('Role hypervisor affected')
# we don't import those modules at the top because some dependancies
# may not be installed
from ccnode.hypervisor import Handler as HypervisorHandler
self.loop.main_plugin = HypervisorHandler(
hypervisor_name=self.loop.config.server_user,
loop=self.loop,
)
self.loop.role = u'hv'
self.loop.register_plugin(self.loop.main_plugin)
else:
logger.error('Failed authentication, role returned: %s', response)
self.loop.role = None
self.auth_id = None
# we also close the previously opened plugins
self.loop.close_plugins()
return # we retry while it fails
self.stop()
logger.info('Successfully authenticated with role %s', str(response))
self.auth_id = None
class MainLoop(object):
def __init__(self, config_path):
self.evloop = pyev.default_loop(debug=True)
self.config_path = config_path
# set signal watchers
self.signals = {
signal.SIGINT: self.stop,
signal.SIGTERM: self.stop,
signal.SIGUSR1: self.reload,
}
# turn into real watchers
self.signals = dict((
signal,
self.evloop.signal(signal, cb),
) for signal, cb in self.signals.iteritems())
# load config variables
self.config = NodeConfigParser(self.config_path)
# configure logging
logging.config.fileConfig(self.config_path)
# rpc connection
self.rpc_con = None
self.connect = RPCStartHandler(self)
self.reconnect = None
# role
self.role = None
self.main_plugin = None
# tag database
self.tag_db = TagDB(self, tags=DEFAULT_TAGS)
# handlers
self.rpc_handler = dict(
get_tags=partial(threadless(get_tags), self.tag_db['__main__']),
sub_tags=self.sub_tags,
)
# plugins
self.registered_plugins = set()
@property
def rpc_connected(self):
return self.rpc_con is not None
@property
def rpc_authenticated(self):
return self.rpc_connected and self.role is not None
# RPC handlers definitions
@threadless
def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
if sub_id == '__main__':
# FIXME should we raise ?
logger.debug('Invalid request for sub object')
return
sub_db = self.tag_db.get(sub_id)
if sub_db is None:
# FIXME should we also raise here ?
logger.debug('Failed to find sub_id %s', sub_id)
return {}
return get_tags(sub_db, tags, noresolve_tags)
# End RPC handlers definitions
def reset_handler(self, name, handl):
self.rpc_handler[name] = handl
def remove_handler(self, name):
self.rpc_handler.pop(name, None)
def register_plugin(self, plugin):
# keep track of registered plugins
if plugin in self.registered_plugins:
raise PluginError('Plugin was already registered')
self.registered_plugins.add(plugin)
# register tags
self.tag_db.update_from_db(plugin.tag_db)
# register handler
self.rpc_handler.update(plugin.rpc_handler)
plugin.start()
def unregister_plugin(self, plugin):
try:
self.registered_plugins.remove(plugin)
except KeyError:
raise PluginError('Plugin was not registered, cannot remove')
# remove tags
for db_name, tag_db in plugin.tag_db:
for tag_name in tag_db:
del self.tag_db[db_name][tag_name]
if not self.tag_db[db_name]: # if there's no more tag in the db
del self.tag_db[db_name]
self.tag_db.delete_from_db(plugin.tag_db)
# remove handlers
for handler_name in plugin.rpc_handler:
del self.rpc_handler[handler_name]
plugin.stop()
def restart_rpc_connection(self, *args):
if not self.rpc_connected:
return
logger.error('Lost connection to the cc-server, will attempt'
' reconnection')
# reconnection atempt in one second
self.reconnect = self.evloop.timer(
1.,0., self.restart_rpc_connection_cb)
self.reconnect.start()
def restart_rpc_connection_cb(self, *args):
# clear connection
self.rpc_con = None
# attempt to connect to the cc-server again
self.connect = RPCStartHandler(self)
self.connect.start()
self.reconnect.stop()
self.reconnect = None
def start(self):
logger.info('Starting node')
for signal in self.signals.itervalues():
signal.start()
logger.debug('About to connect')
self.connect.start()
logger.debug('About to start ev_loop')
self.evloop.start()
def stop(self, watcher=None, revents=None):
logger.info('Exiting node...')
if self.connect is not None:
self.connect.stop()
if self.reconnect is not None:
self.reconnect.stop()
# close rpc
if self.rpc_con is not None:
# disable callback to prevent trampoline calls
self.rpc_con._on_disconnect = None # FIXME doesn't work
self.rpc_con.shutdown()
# close all plugins
for plugin in self.registered_plugins:
plugin.stop()
self.registered_plugins = set()
# FIXME check for closing of main tags that were not in plugin
self.role = None
self.main_plugin = None
self.evloop.stop()
def reload(self, watcher=None, revents=None):
logger.info(u'Reloading logging configuration...')
logging.config.fileConfig(self.config_path)