Commit 3fc8ede8 authored by Anael Beutot's avatar Anael Beutot Committed by Antoine Millet
Browse files

Rewritten RPCStartHandler to be more robust

parent 1321cdb5
Loading
Loading
Loading
Loading
+284 −67
Original line number Diff line number Diff line
import sys
import ssl
import time
import Queue
import errno
import signal
import socket
import threading
import logging
import logging.config
from threading import Thread
from functools import partial

import pyev
@@ -18,89 +21,275 @@ from cloudcontrol.common.client.exc import PluginError
logger = logging.getLogger(__name__)


class RPCStartHandler(Thread):
class RPCStartHandler(object):
    """Handles rpc connection and authentication to the remote cc-server.

    This class inherits from :class:`Thread` but only the connection part is
    run in a background thread.
    It performs the following steps (modelized as a DFA):
    | Name service resolution
    ->
    | TCP connection
    ->
    | SSL handshake
    ->
    | authentication to the cc-server
    ->
    | done
    """

    Note:
        As :class:`Thread`, it can be started only one time.
    AUTH_TIMEOUT = 30.

    """
    def __init__(self, loop):
        """
        :param loop: MainLoop instance
        """
        Thread.__init__(self)
        self.daemon = True
        self.run = self.rpc_connect
        # internal DFA state handling
        self._states = {
            # FORMAT IS: STATE: (INIT, CLEANUP)
            # INIT is a pre state execution that is called to setup things like
            # watchers, threads
            # CLEANUP is a post state that cleans what is set in the INIT
            self.handle_name: (self.init_name, self.cleanup_name),
            self.handle_connect: (self.init_connect, self.cleanup_connect),
            self.handle_handshake: (self.init_handshake, self.cleanup_handshake),
            self.handle_authentication: (self.init_authentication,
                                         self.cleanup_authentication),
            self.handle_done: (None, None),
            self.handle_error: (None, None),  # obviously error state
        }
        self._current_state = None

        self.loop = loop
        # signal to the main thread when connection is done
        self.async = loop.evloop.async(self.async_cb)
        self.timer = loop.evloop.timer(5., 5., self.in_progress_cb)

        # we don't declare all attributes in the constructor since some are only
        # specified to a single state
        self.sock = None
        self.rpc_con = None
        # id for async rpc call
        self.auth_id = None

    def _goto(self, where):
        """Method that is used to perform a state change, it will automatically
        call init/cleanup methods if needed.

        :param where: state where we wanna go or None
        """
        cleanup = self._states.get(self._current_state, (None, None))[1]
        if cleanup is not None:
            cleanup()
        self._current_state = where
        if where is not None:
            init = self._states[where][0]
            if init is not None:
                init()
            where()

    def init_name(self):
        self.thread = threading.Thread(target=self._handle_name)
        self.thread.daemon = True
        self.async_w = self.loop.evloop.async(self.handle_name_cb)

    def handle_name(self):
        self.async_w.start()
        self.thread.start()

    # Method run in the thread
    def rpc_connect(self):
    def _handle_name(self):
        while True:
            logger.debug('getaddrinfo')
            try:
                self.rpc_con = RpcConnection.from_addr_ssl(
                    addr=self.loop.config.server_host,
                    port=self.loop.config.server_port,
                    handler=self.loop.rpc_handler,
                    loop=self.loop.evloop,
                    on_disconnect=self.loop.restart_rpc_connection,
                self.addr_info = iter(socket.getaddrinfo(
                    self.loop.config.server_host,
                    self.loop.config.server_port,
                    socket.AF_UNSPEC,
                    socket.SOCK_STREAM,
                ))
            except socket.gaierror as exc:
                logger.error(
                    'Error while resolving hostname for cc-server: %s',
                    exc.strerror,
                )
            except IOError:
                logger.exception('Error while connecting to the cc-server')
                time.sleep(5)
            except Exception:
                logger.exception(
                    'Unexpected error while resolving cc-server hostname')
            else:
                break

        self.async.send()  # success
            time.sleep(5.)

    def start(self):
        self.timer.start()
        self.async.start()
        self.async_w.send()

        Thread.start(self)
    def handle_name_cb(self, watcher, revents):
        self._goto(self.handle_connect)

    def stop(self):
        self.timer.stop()
        self.async.stop()
    def cleanup_name(self):
        del self.thread
        self.async_w.stop()
        del self.async_w

    def in_progress_cb(self, *args):
        logger.info('Connection to the cc-server still in progress')
    def init_connect(self):
        # watch for non-blocking connection
        self.sock_write_w = None
        # tick is used for connect -> name transition to introduce some delay
        # between attempts
        self.tick_w = self.loop.evloop.timer(2., 0., self.handle_connect_error)

    def async_cb(self, *args):
        logger.debug('Async callback')
        self.timer.stop()
        # connect is done
        self.loop.rpc_con = self.rpc_con
        # start authentication
        self.timer = self.loop.evloop.timer(.5, 5., self.auth_cb)
        self.timer.start()
    def handle_connect_error(self, watcher, revents):
        self._goto(self.handle_name)

    def handle_connect(self):
        while True:
            try:
                (
                    family,
                    type_,
                    proto,
                    cannonname,
                    sockaddr,
                ) = self.current_addr = self.addr_info.next()
            except StopIteration:
                logger.error('Did try all addrinfo entries for cc-server without '
                             'success, will try to resolve hostname again')
                # we need to cleanup socket for the connect -> name transition
                # only, so we don't write the code in the cleanup method
                if self.sock is not None:
                    self.sock.close()
                self.tick_w.start()
                return

            try:
                self.sock = socket.socket(family, type_, proto)
            except EnvironmentError as exc:
                logger.error(
                    'Cannot create socket to connect to cc-server '
                    '(Errno %s: %s)', exc.errno, exc.strerror,
                )
                sys.exit(1)
                return

            try:
                self.sock.setblocking(0)
            except EnvironmentError as exc:
                logger.error(
                    'Cannot set socket in non blocking mode (Errno %s: %s),'
                    ' will exit', exc.errno, exc.strerror,
                )
                sys.exit(1)
                return

            error = self.sock.connect_ex(sockaddr)

            if error == errno.EINPROGRESS:  # most likely to happen
                self.sock_write_w = self.loop.evloop.io(self.sock, pyev.EV_WRITE,
                                                     self.handle_connected)
                self.sock_write_w.start()
                return
            elif error:
                logger.error(
                    'Error while trying to connect to cc-server using %s',
                    sockaddr,
                )
                continue
            else:
                # connection already succeeded, very unlikely as we're using TCP
                self._goto(self.handle_handshake)
                return

    def handle_connected(self, watcher=None, revents=None):
        # check that there is no error
        error = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if error:
            logger.error(
                'Error while trying to connect to cc-server using %s,'
                ' (Errno %s: %s)', self.current_addr[-1],
                error, os.strerror(error),
            )
            self._goto(self.handle_error)
            return

        self._goto(self.handle_handshake)
        return

    def auth_cb(self, *args):
    def cleanup_connect(self):
        if self.sock_write_w is not None:
            self.sock_write_w.stop()
        del self.sock_write_w
        if self.tick_w.active:
            self.tick_w.stop()
        del self.tick_w

    def init_handshake(self):
        self.sock = ssl.wrap_socket(self.sock, do_handshake_on_connect=False,
                                    ssl_version=ssl.PROTOCOL_TLSv1)
        self.sock_read_w = self.loop.evloop.io(self.sock, pyev.EV_READ,
                                        self.handle_handshake)
        self.sock_write_w = self.loop.evloop.io(self.sock, pyev.EV_WRITE,
                                         self.handle_handshake)

    def handle_handshake(self, watcher=None, revents=None):
        assert self.sock is not None
        try:
            self.sock.do_handshake()
        except ssl.SSLError as exc:
            if exc.args[0] == ssl.SSL_ERROR_WANT_READ:
                self.sock_write_w.stop()
                self.sock_read_w.start()
            elif exc.args[0] == ssl.SSL_ERROR_WANT_WRITE:
                self.sock_read_w.stop()
                self.sock_write_w.start()
            else:
                logger.exception('SSL error:')
                self._goto(self.handle_error)
                return
        else:
            # create rpc_connection object
            self.rpc_con = RpcConnection(
                sock=self.sock,
                loop=self.loop.evloop,
                handler=self.loop.rpc_handler,
                on_disconnect=self.loop.restart_rpc_connection,
            )
            # since the socket is referenced by rpc_con, we don't need it
            # anymore
            self.sock = None
            self._goto(self.handle_authentication)
            return

    def cleanup_handshake(self):
        self.sock_read_w.stop()
        del self.sock_read_w
        self.sock_write_w.stop()
        del self.sock_write_w

    def init_authentication(self):
        self.timeout_w = self.loop.evloop.timer(
            self.AUTH_TIMEOUT, 0., self.handle_authentication_timeout)
        # since the sjrpc connection start in fallback mode, it could block so
        # we need to check this before attempting authentication;
        # as there is no reliable way to check when we can authenticate, we try
        # every 2 seconds
        self.tick_w = self.loop.evloop.timer(
            .5, 2., self.handle_authentication_tick)
        self.auth_id = None

    def handle_authentication(self):
        assert self.sock is None
        assert self.rpc_con is not None

        self.tick_w.start()

    def handle_authentication_tick(self, watcher=None, revents=None):
        # check is fallback mode on sjrpc is set otherwise our call would block
        # the loop
        if not self.rpc_con._event_fallback.is_set():
            logger.debug('Will try authentication again latter')
            return

        if self.auth_id is not None:
            logger.error('Authentication is taking longer than expected')
            return

        self.tick_w.stop()
        self.timeout_w.start()
        # try to authenticate
        try:
            self.auth_id = self.rpc_con.rpc.async_call_cb(
                self.auth_done_cb,
                self.handle_authentication_cb,
                'authentify',
                self.loop.config.server_user,
                self.loop.config.server_passwd,
@@ -111,11 +300,11 @@ class RPCStartHandler(Thread):
            else:
                logger.exception('Unexpected exception while authenticating')

            self.stop()
            self.rpc_con.shutdown()
            self.loop.restart_rpc_connection()
            self._goto(self.handle_error)
            return

    def auth_done_cb(self, call_id, response=None, error=None):
    def handle_authentication_cb(self, call_id, response=None, error=None):
        # RPC callback
        assert call_id == self.auth_id

        if error is not None:
@@ -123,24 +312,53 @@ class RPCStartHandler(Thread):
            logger.error('Error while authenticating with cc-server: %s("%s")',
                         error['exception'], error.get('message', ''))

            self.set_reconnect()
            self._goto(self.handle_error)
            return

    def set_reconnect(self):
        self.loop.role = None
        self.auth_id = None
        # we also close the previously opened plugins
        self.loop.close_plugins()
        self.timer.stop()
        # we don't directly shutdown the rpc connection as the current
        # callback is not fully completed, thus it will be called again
        self.timer = self.loop.evloop.timer(.5, .0, self.handle_reconnect)
        self.timer.start()

    def handle_reconnect(self, *args):
        self.loop.rpc_con = self.rpc_con
        # The rest is up to the subclass :)
        self.handle_authentication_response(response)

    def handle_authentication_response(self, response):
        """To be subclassed. Decides what to do about authentication response."""
        raise NotImplementedError

    def handle_authentication_timeout(self, watcher, revents):
        logger.error('Timeout while authenticating with cc-server %s',
                     self.current_addr[-1])
        self._goto(self.handle_error)

    def cleanup_authentication(self):
        if self.tick_w.active:
            self.tick_w.stop()
        del self.tick_w
        if self.timeout_w.active:
            self.timeout_w.stop()
        del self.timeout_w

    def handle_done(self):
        logger.info('Successfully authenticated with role %s', str(self.loop.role))
        self.stop()

    def handle_error(self):
        # cleanup and goto handle_connect
        if self.rpc_con is not None:
            self.rpc_con.rpc._on_disconnect = None  # we don't want the cb
            self.rpc_con.shutdown()
        self.loop.restart_rpc_connection()
            self.rpc_con = None
        elif self.sock is not None:
            self.sock.close()
            self.sock = None
        self._goto(self.handle_connect)
        return

    def start(self):
        self._goto(self.handle_name)
        return

    def stop(self):
        self._goto(None)
        return


class MainLoop(object):
@@ -335,7 +553,6 @@ class MainLoop(object):

    def restart_rpc_connection_cb(self, *args):
        # attempt to connect to the cc-server again
        self.connect = self.CONNECT_CLASS(self)
        self.connect.start()
        self.reconnect.stop()
        self.reconnect = None