Newer
Older
from sjrpc.utils import ConnectionProxy, RpcHandler, threadless
from ccnode.tags import Tag, get_tags
logger = logging.getLogger(__name__)
class RPCStartHandler(Thread):
"""Handles rpc connection and authentication to the remote cc-server.
This class inherits from :class:`Thread` but only the connection part is run
in a background thread.
"""
"""
:param loop: MainLoop instance
"""
Thread.__init__(self)
self.daemon = True
self.run = self.rpc_connect
# signal to the main thread when connection is done
self.async = loop.evloop.async(self.async_cb)
self.timer = loop.evloop.timer(5., 5., self.in_progress_cb)
self.rpc_con = None
# id for async rpc call
self.auth_id = None
# Method run in the thread
def rpc_connect(self):
while True:
try:
logger.debug('About to create connection')
self.rpc_con = RpcConnection.from_addr_ssl(
addr=self.loop.config.server_host,
port=self.loop.config.server_port,
handler=self.loop.rpc_handler,
loop=self.loop.evloop,
)
logger.debug('Connection created')
except IOError:
logger.exception('Error while connecting to the cc-server')
time.sleep(5)
else:
break
logger.debug('Async send')
self.async.send() # success
def start(self):
self.timer.start()
self.async.start()
def stop(self):
self.timer.stop()
self.async.stop()
def in_progress_cb(self, *args):
logger.info('Connection to the cc-server still in progress')
def async_cb(self, *args):
logger.debug('Async callback')
self.timer.stop()
# connect is done
self.loop.rpc_con = self.rpc_con
# start authentication
self.timer = self.loop.evloop.timer(.5, 5., self.auth_cb)
self.timer.start()
def auth_cb(self, *args):
logger.debug('Callback auth')
# check is fallback mode on sjrpc is set otherwise our call would block
# the loop
if not self.rpc_con._event_fallback.is_set():
logger.debug('Will try authentication again latter')
return
if self.auth_id is not None:
logger.error('Authentication is taking longer than expected')
return
# try to authenticate
self.auth_id = self.rpc_con.rpc.async_call_cb(
self.auth_done_cb,
'authentify',
self.loop.config.server_user,
self.loop.config.server_passwd,
def auth_done_cb(self, call_id, response=None, error=None):
if error is not None:
# we got an error
logger.error('Error while authenticating with cc-server: %s("%s")',
error['exception'], error.get('message', ''))
# set handler according to which role was returned by the cc-server
from ccnode.host import Handler as HostHandler
from ccnode.hypervisor import Handler as HypervisorHandler
self.loop.role = HypervisorHandler(
hypervisor_name=self.loop.config.server_user,
logger.error('Failed authentication, role returned: %s', response)
self.loop.role = None
self.auth_id = None
return
if self.loop.role is not None:
logger.info('Successfully authenticated with role %s', response)
self.auth_id = None
class MainLoop(object):
def __init__(self, config_path):
self.evloop = pyev.default_loop(debug=True)
self.config_path = config_path
# set signal watchers
self.signals = {
signal.SIGINT: self.stop,
signal.SIGTERM: self.stop,
signal.SIGUSR1: self.reload,
}
# turn into real watchers
self.signals = dict((
signal,
) for signal, cb in self.signals.iteritems())
# load config variables
self.config = NodeConfigParser(self.config_path)
# configure logging
logging.config.fileConfig(self.config_path)
# rpc connection
self.rpc_con = None
__main__=dict((t.name, t)
for t in DEFAULT_TAGS), # db for main objects
# other sub-objects db can go here (for example VMs)
)
# handlers
self.rpc_handler = dict(
get_tags=partial(threadless(get_tags), self.tag_db['__main__']),
def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
if sub_id == '__main__':
# FIXME should we raise ?
logger.debug('Invalid request for sub object')
return
sub_db = self.tag_db.get(sub_id)
if sub_db is None:
# FIXME should we also raise here ?
logger.debug('Failed to find sub_id %s', sub_id)
return
return get_tags(sub_db, tags, noresolve_tags)
# End RPC handlers definitions
def reset_tag(self, tag):
"""
:param tag: :class:`Tag` to add/replace
"""
# TODO tag register
self.tag_db['__main__'][tag.name] = tag
def remove_tag(self, tag_name):
# TODO tag unregister
self.tag_db['__main__'].pop(tag_name).stop()
def reset_sub_tag(self, sub_id, tag):
# TODO tag register
self.tag_db[sub_id][tag.name] = tag
def remove_sub_tag(self, sub_id, tag_name):
# TODO tag unregister
self.tag_db[sub_id].pop(tag_name).stop()
def remove_sub_object(self, sub_id):
for tag in self.tag_db.pop(sub_id, {}).itervalues():
tag.stop()
def register_plugin(self, plugin):
# keep track of registered plugins
if plugin in self.registered_plugins:
raise PluginError('Plugin was already registered')
self.registered_plugins.add(plugin)
# register tags
for name, sub_db in plugin.tag_db.iteritems():
self.tag_db[name].update(sub_db)
# register handler
self.rpc_handler.update(plugin.rpc_handler)
def unregister_plugin(self, plugin):
try:
self.registered_plugins.remove(plugin)
except KeyError:
raise PluginError('Plugin was not registered, cannot remove')
# remove tags
for db_name, tag_db in plugin.tag_db:
for tag_name in tag_db:
del self.tag_db[db_name][tag_name]
if not self.tag_db[db_name]: # if there's no more tag in the db
del self.tag_db[db_name]
# remove handlers
for handler_name in plugin.rpc_handler:
del self.rpc_handler[handler_name]
plugin.stop()
for signal in self.signals.itervalues():
signal.start()
logger.debug('About to connect')
def stop(self, watcher=None, revents=None):
logger.info('Exiting node...')
if self.connect is not None:
self.connect.stop()
if self.rpc_con is not None:
self.rpc_con.shutdown()
# close all plugins
for plugin in self.registered_plugins:
plugin.stop()
self.registered_plugins = set()
self.role = None
def reload(self, watcher=None, revents=None):
logger.info(u'Reloading logging configuration...')
logging.config.fileConfig(self.config_path)