Commit b7a16187 authored by Thibault VINCENT's avatar Thibault VINCENT
Browse files

fix: restart/auth/polling more robust

parent 9baa80e7
Loading
Loading
Loading
Loading
+34 −54
Original line number Diff line number Diff line
@@ -30,9 +30,29 @@ DEFAULT_CONFIGURATION = {
    'force_xen' : 'no',
}

def authentication(node, suicide, login, password):
    '''
    Node authentication thread
    '''
    timeout = 1
    while not suicide.is_set() and node.get_manager().is_running():
        logging.debug('Sending authentication request')
        if node.authentify(login, password):
            logging.error('Authentication suscessfull')
            return
        else:
            logging.error('Authentication failure')
            timeout += 0.1
            if timeout >= MAX_AUTH_TIMEOUT:
                timeout = MAX_AUTH_TIMEOUT
            sleep(exp(timeout))

def run_node(options):
    '''
    '''
    # instance life signal
    suicide_event = threading.Event()
    
    # setup logging facility:
    level = logging.ERROR
    verb = int(options['verbosity'])
@@ -43,7 +63,6 @@ def run_node(options):
            level = logging.INFO
        else:
            level = logging.DEBUG

    logger = logging.getLogger()
    logger.setLevel(level)
    if options['stdout']:
@@ -63,76 +82,37 @@ def run_node(options):
    logger.handlers = []
    logger.addHandler(handler)
    
    # setup SIGINT/SIGTERM handler
    def shutdown_handler(signum, frame):
        '''
        Handler called when SIGINT/SIGTERM emited
        '''
        logging.error('Letal signal received, node shutdown in progress')
        try:
            if node:
                logging.debug('Closing server connection')
                node.manager.shutdown()
            pass
        except:
            pass
        finally:
            os._exit(1)
    
    # register SIGINT and SIGTERM handler
    signal.signal(signal.SIGINT, shutdown_handler)
    signal.signal(signal.SIGTERM, shutdown_handler)
    
    # re-authentication thread
    def authentication(node):
        timeout = 1
        while node:
            if node.get_manager().is_running():
                logging.info('Sending authentication request')
                if node.authentify(options['login'], options['password']):
                    logging.error('Authentication suscessfull')
                    return
                else:
                    logging.error('Authentication failure')
                    timeout += 0.1
                    if timeout >= MAX_AUTH_TIMEOUT:
                        timeout = MAX_AUTH_TIMEOUT
                    sleep(exp(timeout))
    
    # start node
    none = None
    auth_thread = None
    try:
        logging.error('Initializing node client')
        # create client
        logging.error('Initializing client')
        try:
            node = CCNode(options['address'], int(options['port']),
                          options['detect_hypervisor'] == 'yes',
                          options['command_execution'] == 'yes',
                          force_xen=(options['force_xen'] == 'yes'))
        except Exception as err:
            logging.error('Client initialization failure: `%s`:`%s`', repr(err),
                                                                            err)
            logging.error('Client initialization failure: `%s`:`%s`',
                                                            repr(err), err)
            raise err
        
        # start main loop and auth thread
        # auth thread
        logging.info('Starting authentication thread')
        auth_thread = threading.Thread(target=authentication, args=(node,),
                                                                    name='Auth')
        auth_thread = threading.Thread(target=authentication, name='Auth',
            args=(node, suicide_event, options['login'], options['password']))
        auth_thread.daemon = True
        auth_thread.start()
        # main loop
        logging.info('Starting main loop')
        node.run()
    
    except Exception as err:
        logging.error('run_node: `%s` -> `%s`', repr(err), err)
        if auth_thread:
            del auth_thread
            auth_thread = None
        if node:
            node.manager.shutdown()
            node = None
        try:
            node.get_manager().shutdown()
        except:
            pass
    finally:
        return
        # ensure everything is killed properly
        suicide_event.set()

if __name__ == '__main__':
    
+15 −8
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-

import logging
from threading import Timer, Lock
from time import sleep
from sjrpc.client import SimpleRpcClient
from sjrpc.utils import ConnectionProxy
from sjrpc.core import RpcError
import handlers
from threading import Timer

class CCNode(object):
    '''
@@ -16,6 +17,7 @@ class CCNode(object):
        '''
        '''
        self._scheduler_timer = None
        self._scheduler_mutex = Lock()
        self._handler = handlers.NodeHandler(self, hypervisor, exec_cmd,
                                                                    force_xen)
        self._manager = SimpleRpcClient.from_addr(server, port, enable_ssl=True,
@@ -33,7 +35,7 @@ class CCNode(object):
        logging.debug('Authenticating user %s' % login)
        try:
            role = self._server.authentify(login, password)
            self._scheduler_run()
            self._scheduler_rearm()
        except RpcError as err:
            if err.exception == 'AuthenticationError':
                logging.warning('Authentication error')
@@ -50,14 +52,19 @@ class CCNode(object):
            else:
                return True
    
    def _scheduler_rearm(self):
        '''
        '''
        self._scheduler_timer = Timer(5, self._scheduler_run)
        self._scheduler_timer.start()
    
    def _scheduler_run(self):
        '''
        '''
        with self._scheduler_mutex:
            self._handler.scheduler_run()
        # reset timer
        del self._scheduler_timer
        self._scheduler_timer = Timer(1, self._scheduler_run)
        self._scheduler_timer.start()
            sleep(0.1)
            self._scheduler_rearm()
    
    def get_server(self):
        '''