Skip to content
Snippets Groups Projects
Commit 8b1a63b1 authored by Anael Beutot's avatar Anael Beutot
Browse files

Refactored node.py

Moved some parts to cc-common
Added logging handler for cc-common
Modified binary
parent 54951231
No related branches found
No related tags found
No related merge requests found
......@@ -9,7 +9,7 @@ from os.path import isfile, abspath
from daemon import DaemonContext
from cloudcontrol.node import __version__
from cloudcontrol.node.node import MainLoop
from cloudcontrol.node.node import NodeLoop
DEFAULT_CONFIG_FILE = '/etc/cc-node.conf'
......@@ -58,4 +58,4 @@ with DaemonContext(detach_process=options.daemonize,
pidfile.truncate()
pidfile.flush()
MainLoop(options.config).start()
NodeLoop(options.config).start()
......@@ -47,7 +47,7 @@ def configure_logging(level):
# create config parser for logging configuration
logging.config.fileConfig(StringIO("""
[loggers]
keys=root,ccnode,sjrpc
keys=root,ccnode,cccommon,sjrpc
[handlers]
# keys=syslog
......@@ -62,10 +62,15 @@ level=ERROR
handlers=consoleHandler
[logger_ccnode]
level=%s
level=%(level)s
handlers=
qualname=cloudcontrol.node
[logger_cccommon]
level=%(level)s
handlers=
qualname=cloudcontrol.common
[logger_sjrpc]
level=ERROR
handlers=
......@@ -83,4 +88,4 @@ args=(sys.stderr,)
[formatter_simpleFormatter]
format=cc-node - %%(asctime)s - %%(name)s - %%(levelname)s - %%(message)s
""" % level))
""" % dict(level=level)))
import os
import time
import signal
import logging
import logging.config
from threading import Thread
from collections import defaultdict
from functools import partial
import pyev
from sjrpc.core import RpcConnection, RpcError
from sjrpc.utils import ConnectionProxy, RpcHandler, threadless
from cloudcontrol.common.client.loop import RPCStartHandler, MainLoop
from cloudcontrol.common.client.tags import Tag
from cloudcontrol.node import __version__
from cloudcontrol.node.config import NodeConfigParser, configure_logging
from cloudcontrol.node.tags import Tag, get_tags, RootTagDB
from cloudcontrol.node.jobs import JobManager
from cloudcontrol.node.exc import PluginError
logger = logging.getLogger(__name__)
DEFAULT_TAGS = (Tag(u'version', __version__),)
class RPCStartHandler(Thread):
"""Handles rpc connection and authentication to the remote cc-server.
This class inherits from :class:`Thread` but only the connection part is run
in a background thread.
Note:
As :class:`Thread`, it can be started only one time.
"""
def __init__(self, loop):
"""
:param loop: MainLoop instance
"""
Thread.__init__(self)
self.daemon = True
self.run = self.rpc_connect
self.loop = loop
# signal to the main thread when connection is done
self.async = loop.evloop.async(self.async_cb)
self.timer = loop.evloop.timer(5., 5., self.in_progress_cb)
self.rpc_con = None
# id for async rpc call
self.auth_id = None
# Method run in the thread
def rpc_connect(self):
while True:
try:
self.rpc_con = RpcConnection.from_addr_ssl(
addr=self.loop.config.server_host,
port=self.loop.config.server_port,
handler=self.loop.rpc_handler,
loop=self.loop.evloop,
on_disconnect=self.loop.restart_rpc_connection,
)
except IOError:
logger.exception('Error while connecting to the cc-server')
time.sleep(5)
else:
break
self.async.send() # success
def start(self):
self.timer.start()
self.async.start()
Thread.start(self)
def stop(self):
self.timer.stop()
self.async.stop()
def in_progress_cb(self, *args):
logger.info('Connection to the cc-server still in progress')
def async_cb(self, *args):
logger.debug('Async callback')
self.timer.stop()
# connect is done
self.loop.rpc_con = self.rpc_con
# start authentication
self.timer = self.loop.evloop.timer(.5, 5., self.auth_cb)
self.timer.start()
def auth_cb(self, *args):
# check is fallback mode on sjrpc is set otherwise our call would block
# the loop
if not self.rpc_con._event_fallback.is_set():
logger.debug('Will try authentication again latter')
return
if self.auth_id is not None:
logger.error('Authentication is taking longer than expected')
return
# try to authenticate
try:
self.auth_id = self.rpc_con.rpc.async_call_cb(
self.auth_done_cb,
'authentify',
self.loop.config.server_user,
self.loop.config.server_passwd,
)
except RpcError as exc:
if exc.exception == 'RpcConnectionError':
logger.error('Authentication failed: connection lost')
else:
logger.exception('Unexpected exception while authenticating')
self.stop()
self.rpc_con.shutdown()
self.loop.restart_rpc_connection()
class NodeRPCStartHandler(RPCStartHandler):
def auth_done_cb(self, call_id, response=None, error=None):
assert call_id == self.auth_id
if error is not None:
# we got an error
logger.error('Error while authenticating with cc-server: %s("%s")',
error['exception'], error.get('message', ''))
self.set_reconnect()
return
RPCStartHandler.auth_done_cb(self, call_id, response, error)
# set handler according to which role was returned by the cc-server
if response == self.loop.role and response is not None:
......@@ -177,197 +62,21 @@ class RPCStartHandler(Thread):
logger.info('Successfully authenticated with role %s', str(response))
self.auth_id = None
def set_reconnect(self):
self.loop.role = None
self.auth_id = None
# we also close the previously opened plugins
self.loop.close_plugins()
self.timer.stop()
# we don't directly shutdown the rpc connection as the current
# callback is not fully completed, thus it will be called again
self.timer = self.loop.evloop.timer(.5, .0, self.handle_reconnect)
self.timer.start()
def handle_reconnect(self, *args):
self.stop()
self.rpc_con.shutdown()
self.loop.restart_rpc_connection()
class NodeLoop(MainLoop):
CONFIG_CLASS = NodeConfigParser
CONNECT_CLASS = NodeRPCStartHandler
DEFAULT_TAGS = (Tag(u'version', __version__),)
class MainLoop(object):
def __init__(self, config_path):
self.config_path = config_path
# load config variables
self.config = NodeConfigParser(self.config_path)
# configure logging
configure_logging(self.config.logging_level)
self.evloop = pyev.Loop(debug=self.config.debug)
# set signal watchers
self.signals = {
signal.SIGINT: self.stop,
signal.SIGTERM: self.stop,
signal.SIGUSR1: self.reload,
}
# turn into real watchers
self.signals = dict((
signal_,
self.evloop.signal(signal_, cb),
) for signal_, cb in self.signals.iteritems())
# rpc connection
self.rpc_con = None
self.connect = RPCStartHandler(self)
self.reconnect = None
# role
self.role = None
self.main_plugin = None
# tag database
self.tag_db = RootTagDB(self, tags=DEFAULT_TAGS)
# job manages
MainLoop.__init__(self, config_path)
self.job_manager = JobManager(self)
# handlers
self.rpc_handler = dict(
get_tags=partial(threadless(get_tags), self.tag_db['__main__']),
sub_tags=self.sub_tags,
)
# plugins
self.registered_plugins = set()
@property
def rpc_connected(self):
return self.rpc_con is not None
@property
def rpc_authenticated(self):
return self.rpc_connected and self.role is not None
# RPC handlers definitions
@threadless
def sub_tags(self, sub_id, tags=None):
if sub_id == '__main__':
# FIXME should we raise ?
logger.debug('Invalid request for sub object')
return {}
sub_db = self.tag_db.get(sub_id)
if sub_db is None:
# FIXME should we also raise here ?
logger.debug('Failed to find sub_id %s', sub_id)
return {}
return get_tags(sub_db, tags)
@threadless
def job_list(self):
pass
# End RPC handlers definitions
def reset_handler(self, name, handl):
self.rpc_handler[name] = handl
def remove_handler(self, name):
self.rpc_handler.pop(name, None)
def register_plugin(self, plugin):
# keep track of registered plugins
if plugin in self.registered_plugins:
raise PluginError('Plugin was already registered')
self.registered_plugins.add(plugin)
# register tags
plugin.tag_db.set_parent(self.tag_db)
# register handler
self.rpc_handler.update(plugin.rpc_handler)
plugin.start()
def unregister_plugin(self, plugin):
try:
self.registered_plugins.remove(plugin)
except KeyError:
raise PluginError('Plugin was not registered, cannot remove')
# remove tags
plugin.tag_db.set_parent(None)
# remove handlers
for handler_name in plugin.rpc_handler:
del self.rpc_handler[handler_name]
plugin.stop()
def close_plugins(self):
"""Unregister all plugins from the loop."""
for plugin in self.registered_plugins.copy():
self.unregister_plugin(plugin)
def restart_rpc_connection(self, *args):
if not self.rpc_connected:
return
# clear connection
self.rpc_con = None
logger.error('Lost connection to the cc-server, will attempt'
' reconnection')
# reconnection atempt in one second
self.reconnect = self.evloop.timer(
1.,0., self.restart_rpc_connection_cb)
self.reconnect.start()
def restart_rpc_connection_cb(self, *args):
# attempt to connect to the cc-server again
self.connect = RPCStartHandler(self)
self.connect.start()
self.reconnect.stop()
self.reconnect = None
def start(self):
logger.info('Starting node')
for signal_ in self.signals.itervalues():
signal_.start()
logger.debug('About to connect')
self.connect.start()
logger.debug('About to start ev_loop')
self.evloop.start()
def configure_logging(self):
configure_logging(self.config.logging_level)
def stop(self, watcher=None, revents=None):
logger.info('Exiting node...')
if self.connect is not None:
self.connect.stop()
if self.reconnect is not None:
self.reconnect.stop()
MainLoop.stop(self, watcher, revents)
# stop running jobs
self.job_manager.stop()
# close rpc
if self.rpc_con is not None:
# disable callback to prevent trampoline calls
self.rpc_con._on_disconnect = None # FIXME doesn't work
self.rpc_con.shutdown()
# close all plugins
for plugin in self.registered_plugins:
plugin.stop()
self.registered_plugins = set()
# FIXME check for closing of main tags that were not in plugin
self.role = None
self.main_plugin = None
self.evloop.stop()
def reload(self, watcher=None, revents=None):
logger.info('Reloading logging configuration...')
try:
configure_logging(NodeConfigParser(self.config_path).logging_level)
except:
logger.exception('Invalid config file')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment