-
Anael Beutot authoredAnael Beutot authored
node.py 8.54 KiB
import time
import signal
import logging
import logging.config
from collections import defaultdict
from functools import partial
import pyev
from sjrpc.core import RpcConnection
from sjrpc.utils import ConnectionProxy, RpcHandler
from ccnode import __version__
from ccnode.config import NodeConfigParser
from ccnode.tags import Tag, get_tags
from ccnode.exc import PluginError
logger = logging.getLogger(__name__)
DEFAULT_TAGS = (Tag(u'version', __version__, -1),)
class RPCHandler(object):
"""Handles rpc connection to the remote cc-server."""
def __init__(self, loop):
self.loop = loop
self.watcher = loop.loop.io(self.cb)
self.timer = loop.loop.timer(0, 5, self.timeout)
self.connection = None
self.proxy = None
# configure socket
self.sock = None
self.sock.setblocking(0)
def cb(self, watcher, revents):
if revents & pyev.EV_WRITE:
self.loop.connection = RpcConnection(
sock=ssl.wrap_socket(self.sock, ssl_version=ssl.PROTOCOL_TLSv1),
loop=self.loop.loop,
)
self.loop.proxy = ConnectionProxy(self.loop.connection)
def timeout(self, watcher, revents):
self.sock.close()
def run(self):
self.watcher.start()
self.timeout.start()
self.connection = RpcConnection.from_addr_ssl(
addr=self.loop.config.server_host,
port=self.loop.config.server_port,
handler=self.loop.handlers,
)
self.proxy = ConnectionProxy(self.connection)
#self.loop.
class AuthHandler(object):
"""Handles rpc authentication to the remote cc-server."""
def __init__(self, loop):
self.loop = loop
self.watcher = loop.loop.timer(.5, 5, self.cb)
self.auth_id = None
def cb(self, watcher, revents):
logger.debug('Callback auth')
# check is fallback mode on sjrpc is set otherwise our call would block
# the loop
if not self.loop.rpc_con._event_fallback.is_set():
logger.debug('Will try authentication again latter')
return
if self.auth_id is not None:
logger.error('Authentication is taking longer than expected')
return
# try to authenticate
self.auth_id = self.loop.rpc_con.rpc.async_call_cb(
self.cb_auth,
'authentify',
self.loop.config.server_user,
self.loop.config.server_passwd,
)
def cb_auth(self, call_id, response=None, error=None):
assert call_id == self.auth_id
if error is not None:
# we got an error
logger.error('Error while authenticating with cc-server: %s("%s")',
error['exception'], error.get('message', ''))
# try to recconnect in 5 seconds
return
# set handler according to which role was returned by the cc-server
if response == u'host':
logger.debug('Role host affected')
from ccnode.host import Handler as HostHandler
self.loop.role = HostHandler(loop=self.loop.loop)
elif response == u'hv':
logger.debug('Role hypervisor affected')
from ccnode.hypervisor import Handler as HypervisorHandler
self.loop.role = HypervisorHandler(
proxy=self.loop.proxy,
hypervisor_name=self.loop.config.server_user,
loop=self.loop.loop,
)
else:
logger.error('Failed authentication, role returned: %s', response)
self.loop.role = None
self.auth_id = None
return
if self.loop.role is not None:
self.watcher.stop()
logger.info('Successfully authenticated with role %s', response)
self.loop.register_plugin(self.loop.role)
self.auth_id = None
def start(self):
self.watcher.start()
def stop(self):
self.watcher.stop()
class MainLoop(object):
def __init__(self, config_path):
self.loop = pyev.default_loop()
self.config_path = config_path
# set signal watchers
self.signals = {
signal.SIGINT: self.stop,
signal.SIGTERM: self.stop,
signal.SIGUSR1: self.reload,
}
# turn into real watchers
self.signals = dict((
signal,
self.loop.signal(signal, cb),
) for signal, cb in self.signals.iteritems())
# load config variables
self.config = NodeConfigParser(self.config_path)
# configure logging
logging.config.fileConfig(self.config_path)
# rpc connection
self.rpc_con = None
self.auth = AuthHandler(self)
# role
self.role = None
# tag database
self.tag_db = defaultdict(
dict,
__main__=dict((t.name, t)
for t in DEFAULT_TAGS), # db for main objects
# other sub-objects db can go here (for example VMs)
)
# handlers
self.rpc_handler = dict(
get_tags=partial(get_tags, self.tag_db['__main__']),
sub_tags=self.sub_tags,
)
# plugins
self.registered_plugins = set()
# RPC handlers definitions
def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
if sub_id == '__main__':
# FIXME should we raise ?
logger.debug('Invalid request for sub object')
return
sub_db = self.tag_db.get(sub_id)
if sub_db is None:
# FIXME should we also raise here ?
logger.debug('Failed to find sub_id %s', sub_id)
return
return get_tags(sub_db, tags, noresolve_tags)
# End RPC handlers definitions
def reset_tag(self, tag):
"""
:param tag: :class:`Tag` to add/replace
"""
# TODO tag register
tag.start(self.loop)
self.tag_db['__main__'][tag.name] = tag
def remove_tag(self, tag_name):
# TODO tag unregister
self.tag_db['__main__'].pop(tag_name).stop()
def reset_sub_tag(self, sub_id, tag):
# TODO tag register
tag.start(self.loop)
self.tag_db[sub_id][tag.name] = tag
def remove_sub_tag(self, sub_id, tag_name):
# TODO tag unregister
self.tag_db[sub_id].pop(tag_name).stop()
def register_plugin(self, plugin):
# keep track of registered plugins
if plugin in self.registered_plugins:
raise PluginError('Plugin was already registered')
self.registered_plugins.add(plugin)
# register tags
for name, sub_db in plugin.tag_db.iteritems():
self.tag_db[name].update(sub_db)
# register handler
self.rpc_handler.update(plugin.rpc_handler)
plugin.start()
def unregister_plugin(self, plugin):
try:
self.registered_plugins.remove(plugin)
except KeyError:
raise PluginError('Plugin was not registered, cannot remove')
# remove tags
for db_name, tag_db in plugin.tag_db:
for tag_name in tag_db:
del self.tag_db[db_name][tag_name]
if not self.tag_db[db_name]: # if there's no more tag in the db
del self.tag_db[db_name]
# remove handlers
for handler_name in plugin.rpc_handler:
del self.rpc_handler[handler_name]
plugin.stop()
def rpc_connect(self):
# TODO async and error handling
self.rpc_con = RpcConnection.from_addr_ssl(
addr=self.config.server_host,
port=self.config.server_port,
handler=self.rpc_handler,
loop=self.loop,
)
self.proxy = ConnectionProxy(self.rpc_con)
def start(self):
logger.info('Starting node')
for signal in self.signals.itervalues():
signal.start()
logger.debug('About to connect')
self.rpc_connect()
self.auth.start()
logger.debug('About to start ev_loop')
self.loop.start()
def stop(self, watcher=None, revents=None):
logger.info('Exiting node...')
self.auth.stop()
# close rpc
if self.rpc_con is not None:
self.rpc_con.shutdown()
# close all plugins
for plugin in self.registered_plugins:
plugin.stop()
self.registered_plugins = set()
self.role = None
self.loop.stop()
def reload(self, watcher=None, revents=None):
logger.info(u'Reloading logging configuration...')
logging.config.fileConfig(self.config_path)