Skip to content
Snippets Groups Projects
Commit cffa3cba authored by Anael Beutot's avatar Anael Beutot
Browse files

Basics working with libev.

Authentication and host handler.
parent fc74c9dc
No related branches found
No related tags found
No related merge requests found
import time import time
import signal
import logging import logging
from threading import Thread, Lock import logging.config
import pyev
from sjrpc.core import RpcConnection from sjrpc.core import RpcConnection
from sjrpc.utils import ConnectionProxy, RpcHandler from sjrpc.utils import ConnectionProxy, RpcHandler
from ccnode import __version__ from ccnode import __version__
from ccnode.config import NodeConfigParser
from ccnode.tags import Tag, get_tags from ccnode.tags import Tag, get_tags
...@@ -42,124 +45,130 @@ class DefaultHandler(RpcHandler): ...@@ -42,124 +45,130 @@ class DefaultHandler(RpcHandler):
self.tags.pop(tag_name, None) self.tags.pop(tag_name, None)
class Node(Thread): class AuthHandler(object):
"""Main class for ccnode.""" """Handles rpc authentication to the remote cc-server."""
def __init__(self, server_host, server_port, user_name, user_passwd): def __init__(self, loop):
""" self.loop = loop
:param string server_host: hostname for cc-server self.watcher = loop.loop.timer(.5, 5, self.cb)
:param int server_port: port for cc-server self.auth_id = None
:param string user_name: account name for authentication to cc-server
:param string user_passwd: password for cc-server authentication def cb(self, watcher, revents):
""" logger.debug('Callback auth')
Thread.__init__(self) # check is fallback mode on sjrpc is set otherwise our call would block
# the loop
# run thread as daemon if not self.loop.rpc_con._event_fallback.is_set():
self.daemon = True logger.debug('Will try authentication again latter')
return
# settings used as read only
self.server_host = server_host if self.auth_id is not None:
self.server_port = int(server_port) logger.error('Authentication is taking longer than expected')
self.user_name = user_name return
self.user_passwd = user_passwd
# try to authenticate
#: ``sjRpc`` proxy self.auth_id = self.loop.rpc_con.rpc.async_call_cb(
self.proxy = None self.cb_auth,
#: ``sjRpc`` connection 'authentify',
self.connection = None self.loop.config.server_user,
#: role returned by cc-server (set to None unless the authentication self.loop.config.server_passwd,
#: has succeed)
self.role = None
self._connection_lock = Lock()
def init_rpc(self):
"""Init a new connection to ``cc-server``, create a ``sjRpc`` proxy.
"""
self.connection = RpcConnection.from_addr_ssl(
addr=self.server_host,
port=self.server_port,
handler=DefaultHandler(),
) )
self.proxy = ConnectionProxy(self.connection)
def authentify(self): def cb_auth(self, call_id, response=None, error=None):
"""Try to authenticate to the server. If successfull, import and then assert call_id == self.auth_id
set :class:`Handler` corresponding to the role returned by the
``cc-server``.
:raise: exception raised by :func:`proxy.authentify` if error is not None:
""" # we got an error
try: logger.error('Error while authenticating with cc-server: %s("%s")',
role = self.proxy.authentify(self.user_name, self.user_passwd) error['exception'], error.get('message', ''))
except Exception:
logger.exception('Unknow exception while authentifying') # try to recconnect in 5 seconds
raise return
# set handler according to which role was returned by the cc-server # set handler according to which role was returned by the cc-server
if role == u'host': if response == u'host':
logger.debug('Role host affected') logger.debug('Role host affected')
from ccnode.host import Handler as HostHandler from ccnode.host import Handler as HostHandler
self.connection.rpc.set_handler(HostHandler()) self.loop.rpc_con.rpc.set_handler(HostHandler())
self.role = u'host' self.loop.role = u'host'
elif role == u'hv': elif response == u'hv':
logger.debug('Role hypervisor affected') logger.debug('Role hypervisor affected')
from ccnode.hypervisor import Handler as HypervisorHandler from ccnode.hypervisor import Handler as HypervisorHandler
self.connection.rpc.set_handler(HypervisorHandler( self.loop.rpc_con.rpc.set_handler(HypervisorHandler(
proxy=self.proxy, hypervisor_name=self.user_name)) proxy=self.loop.proxy,
self.role = u'hv' hypervisor_name=self.loop.config.server_user))
self.loop.role = u'hv'
else: else:
logger.debug('Wrong role returned: %s', role) logger.info('Failed authentication, role returned: %s', response)
role = None self.loop.role = None
time.sleep(2) self.auth_id = None
return
self.role = role
if self.loop.role is not None:
def rpc(self): self.watcher.stop()
"""Runs ``sjRpc`` main loop. Catches exceptions. :func:`shutdown` the logger.info('Successfully authenticated with role %s', response)
connection before returning.""" self.auth_id = None
try:
self.connection.run() def start(self):
except Exception: self.watcher.start()
logger.exception('Unknown exception:')
finally:
self.shutdown() class MainLoop(object):
def __init__(self, config_path):
def run(self): self.loop = pyev.default_loop()
"""Node main loop.""" self.config_path = config_path
while True:
# init rpc connection # set signal watchers
while True: self.signals = {
try: signal.SIGINT: self.stop,
self.init_rpc() signal.SIGTERM: self.stop,
except Exception as e: signal.SIGUSR1: self.reload,
logger.exception('Error in init.') }
else:
break # turn into real watchers
time.sleep(2) self.signals = dict((
# launch main rpc thread signal,
rpc_thread = Thread(target=self.rpc) self.loop.signal(signal, cb),
rpc_thread.daemon = True ) for signal, cb in self.signals.iteritems())
rpc_thread.start()
# launch auth thread, make sure rpc is still running # load config variables
while rpc_thread.is_alive() and self.role is None: self.config = NodeConfigParser(self.config_path)
auth_thread = Thread(target=self.authentify)
auth_thread.daemon = True # configure logging
auth_thread.start() logging.config.fileConfig(self.config_path)
auth_thread.join()
# FIXME for debug # rpc connection
time.sleep(4) self.rpc_con = None
# wait for rpc thread to terminates (it means error) self.auth = AuthHandler(self)
rpc_thread.join()
logger.error('Reconnecting to server.') # role
# reset settings self.role = None
self.role = None
def rpc_connect(self):
def shutdown(self): # TODO async and error handling
"""Shutdown the ``sjRpc`` connection and reset object state.""" self.rpc_con = RpcConnection.from_addr_ssl(
with self._connection_lock: addr=self.config.server_host,
if self.connection is not None: port=self.config.server_port,
self.connection.shutdown() handler=DefaultHandler(),
self.connection = None loop=self.loop,
self.role = None )
self.proxy = ConnectionProxy(self.rpc_con)
def start(self):
for signal in self.signals.itervalues():
signal.start()
logger.debug('About to connect')
self.rpc_connect()
self.auth.start()
logger.debug('About to start ev_loop')
self.loop.start()
def stop(self, watcher=None, revents=None):
logger.info('Exiting node...')
if self.rpc_con is not None:
self.rpc_con.shutdown()
self.loop.stop()
def reload(self, watcher=None, revents=None):
logger.info(u'Reloading logging configuration...')
logging.config.fileConfig(self.config_path)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment