Newer
Older
from sjrpc.core import RpcConnection, RpcError
from sjrpc.utils import ConnectionProxy, RpcHandler, threadless
from ccnode.tags import Tag, get_tags, RootTagDB
logger = logging.getLogger(__name__)
class RPCStartHandler(Thread):
"""Handles rpc connection and authentication to the remote cc-server.
This class inherits from :class:`Thread` but only the connection part is run
in a background thread.
Note:
As :class:`Thread`, it can be started only one time.
"""
:param loop: MainLoop instance
"""
Thread.__init__(self)
self.daemon = True
self.run = self.rpc_connect
# signal to the main thread when connection is done
self.async = loop.evloop.async(self.async_cb)
self.timer = loop.evloop.timer(5., 5., self.in_progress_cb)
self.rpc_con = None
# id for async rpc call
self.auth_id = None
# Method run in the thread
def rpc_connect(self):
while True:
try:
self.rpc_con = RpcConnection.from_addr_ssl(
addr=self.loop.config.server_host,
port=self.loop.config.server_port,
handler=self.loop.rpc_handler,
loop=self.loop.evloop,
on_disconnect=self.loop.restart_rpc_connection,
)
except IOError:
logger.exception('Error while connecting to the cc-server')
time.sleep(5)
else:
break
self.async.send() # success
def start(self):
self.timer.start()
self.async.start()
def stop(self):
self.timer.stop()
self.async.stop()
def in_progress_cb(self, *args):
logger.info('Connection to the cc-server still in progress')
def async_cb(self, *args):
logger.debug('Async callback')
self.timer.stop()
# connect is done
self.loop.rpc_con = self.rpc_con
# start authentication
self.timer = self.loop.evloop.timer(.5, 5., self.auth_cb)
self.timer.start()
def auth_cb(self, *args):
# check is fallback mode on sjrpc is set otherwise our call would block
# the loop
if not self.rpc_con._event_fallback.is_set():
logger.debug('Will try authentication again latter')
return
if self.auth_id is not None:
logger.error('Authentication is taking longer than expected')
return
# try to authenticate
try:
self.auth_id = self.rpc_con.rpc.async_call_cb(
self.auth_done_cb,
'authentify',
self.loop.config.server_user,
self.loop.config.server_passwd,
)
except RpcError as exc:
if exc.exception == 'RpcConnectionError':
logger.error('Authentication failed: connection lost')
else:
logger.exception('Unexpected exception while authenticating')
self.stop()
self.loop.restart_rpc_connection()
def auth_done_cb(self, call_id, response=None, error=None):
if error is not None:
# we got an error
logger.error('Error while authenticating with cc-server: %s("%s")',
error['exception'], error.get('message', ''))
# set handler according to which role was returned by the cc-server
if response == self.loop.role and response is not None:
# we don't need to reload the plugins
# but we need to register the objects and tags
self.loop.tag_db.rpc_register()
elif response == u'host':
# close previous plugins if needed
if self.loop.role is not None:
self.loop.close_plugins()
from ccnode.host import Handler as HostHandler
self.loop.main_plugin = HostHandler(loop=self.loop)
self.loop.role = u'host'
# (re)-register the tags of the main loop
self.loop.tag_db.rpc_register()
self.loop.register_plugin(self.loop.main_plugin)
# close previous plugins if needed
if self.loop.role is not None:
self.loop.close_plugins()
# we don't import those modules at the top because some dependancies
# may not be installed
from ccnode.hypervisor import Handler as HypervisorHandler
self.loop.main_plugin = HypervisorHandler(
# (re)-register the tags of the main loop
self.loop.tag_db.rpc_register()
self.loop.register_plugin(self.loop.main_plugin)
logger.error('Failed authentication, role returned: %s', response)
# we also close the previously opened plugins
self.loop.close_plugins()
return # we retry while it fails
logger.info('Successfully authenticated with role %s', str(response))
self.auth_id = None
class MainLoop(object):
def __init__(self, config_path):
self.config_path = config_path
# set signal watchers
self.signals = {
signal.SIGINT: self.stop,
signal.SIGTERM: self.stop,
signal.SIGUSR1: self.reload,
}
# turn into real watchers
self.signals = dict((
signal_,
self.evloop.signal(signal_, cb),
) for signal_, cb in self.signals.iteritems())
# load config variables
self.config = NodeConfigParser(self.config_path)
# configure logging
logging.config.fileConfig(self.config_path)
# rpc connection
self.rpc_con = None
self.tag_db = RootTagDB(self, tags=DEFAULT_TAGS)
# job manages
self.job_manager = JobManager(self)
get_tags=partial(threadless(get_tags), self.tag_db['__main__']),
@property
def rpc_connected(self):
return self.rpc_con is not None
@property
def rpc_authenticated(self):
return self.rpc_connected and self.role is not None
def sub_tags(self, sub_id, tags=None):
if sub_id == '__main__':
# FIXME should we raise ?
logger.debug('Invalid request for sub object')
sub_db = self.tag_db.get(sub_id)
if sub_db is None:
# FIXME should we also raise here ?
logger.debug('Failed to find sub_id %s', sub_id)
@threadless
def job_list(self):
pass
def reset_handler(self, name, handl):
self.rpc_handler[name] = handl
def remove_handler(self, name):
self.rpc_handler.pop(name, None)
def register_plugin(self, plugin):
# keep track of registered plugins
if plugin in self.registered_plugins:
raise PluginError('Plugin was already registered')
self.registered_plugins.add(plugin)
# register tags
# register handler
self.rpc_handler.update(plugin.rpc_handler)
def unregister_plugin(self, plugin):
try:
self.registered_plugins.remove(plugin)
except KeyError:
raise PluginError('Plugin was not registered, cannot remove')
# remove tags
# remove handlers
for handler_name in plugin.rpc_handler:
del self.rpc_handler[handler_name]
plugin.stop()
def close_plugins(self):
"""Unregister all plugins from the loop."""
for plugin in self.registered_plugins.copy():
self.unregister_plugin(plugin)
def restart_rpc_connection(self, *args):
if not self.rpc_connected:
return
logger.error('Lost connection to the cc-server, will attempt'
' reconnection')
# reconnection atempt in one second
self.reconnect = self.evloop.timer(
1.,0., self.restart_rpc_connection_cb)
self.reconnect.start()
def restart_rpc_connection_cb(self, *args):
# attempt to connect to the cc-server again
self.connect = RPCStartHandler(self)
self.connect.start()
self.reconnect.stop()
self.reconnect = None
for signal in self.signals.itervalues():
signal.start()
logger.debug('About to connect')
def stop(self, watcher=None, revents=None):
logger.info('Exiting node...')
if self.connect is not None:
self.connect.stop()
if self.reconnect is not None:
self.reconnect.stop()
# disable callback to prevent trampoline calls
self.rpc_con._on_disconnect = None # FIXME doesn't work
# close all plugins
for plugin in self.registered_plugins:
plugin.stop()
self.registered_plugins = set()
# FIXME check for closing of main tags that were not in plugin
def reload(self, watcher=None, revents=None):
logger.info(u'Reloading logging configuration...')
logging.config.fileConfig(self.config_path)