Skip to content
rpcconnection.py 15.6 KiB
Newer Older
Antoine Millet's avatar
Antoine Millet committed
#!/usr/bin/env python
#coding:utf8

'''
This module contains the RpcConnection class, more informations about this
class are located in it docstring.
'''

import ssl
import json
import struct
import socket
import logging
import threading
from uuid import uuid4
Antoine Millet's avatar
Antoine Millet committed

from sjrpc.utils import BytesBuffer
from sjrpc.core.callers import RpcCaller, ThreadedRpcCaller
from sjrpc.core.exceptions import RpcError


class RpcConnection(object):
    '''
    This class manage a single peer connection.
    '''
Antoine Millet's avatar
Antoine Millet committed

    RECV_BUF_SIZE = 4096
    SEND_BUF_SIZE = 4096
Antoine Millet's avatar
Antoine Millet committed
    REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}}
    RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None}
Antoine Millet's avatar
Antoine Millet committed

    def __init__(self, sock, manager, handler=None, timeout=None):
Antoine Millet's avatar
Antoine Millet committed
        # Sock of this connection:
        self._sock = sock

        # Activate TCP keepalive on the connection:
        self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        # Inbound buffer containing currently readed message while it sending:
        self._inbound_buffer = BytesBuffer()
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        # The size of the currently readed message, None value significate that
        # the next message size have to be readed:
        self._cur_msg_size = None
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        # Outbound buffer containing data to write on the socket:
        self._outbound_buffer = BytesBuffer()
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        # Reference to the RPC handler, this attribute can be set to None if
        # no handler is defined for this connection (for example, for a client
        # connection).
        self._handler = handler
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        # Store all calls sent to the peer. Key is the id of the call and value
        # the event to raise when call is finished.
        self._calls = {}
Antoine Millet's avatar
Antoine Millet committed

        # The manager of this connection
Antoine Millet's avatar
Antoine Millet committed
        self._manager = manager

        # Is the RpcConnection connected to its peer:
        self._connected = True

        # The global call timeout setting:
        self._call_timeout = timeout

Antoine Millet's avatar
Antoine Millet committed
    @classmethod
    def from_addr(cls, addr, port, manager, enable_ssl=False, timeout=None,
Antoine Millet's avatar
Antoine Millet committed
                  conn_timeout=30.0, cert=None, handler=None):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Construct the instance of :class:`RpcConnection` without providing
        the :class:`socket` object. Socket is automatically created and passed
        to the standard constructor before to return the new instance.

        :param addr: the target ip address
        :param port: the target port
        :param manager: manager of this connection
Antoine Millet's avatar
Antoine Millet committed
        :param enable_ssl: enable SSL
        :param timeout: the global call timeout setting
        :param conn_timeout: the connection operation timeout
Antoine Millet's avatar
Antoine Millet committed
        :param cert: is SSL is enabled, profile the filename of certificate to
            check. If None, don't check certificate.
        :param handler: Handler to attach to this :class:`RpcConnection` object
Antoine Millet's avatar
Antoine Millet committed
        '''

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if enable_ssl:
            req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED
            sock = ssl.wrap_socket(sock, certfile=None, cert_reqs=req,
                                   ssl_version=ssl.PROTOCOL_TLSv1)
        sock.settimeout(conn_timeout)
Antoine Millet's avatar
Antoine Millet committed
        sock.connect((addr, port))
        sock.setblocking(False)
        return cls(sock, manager, handler, timeout=timeout)
Antoine Millet's avatar
Antoine Millet committed

    def __repr__(self):
Antoine Millet's avatar
Antoine Millet committed

    def __hash__(self):
        return self._sock.__hash__()

    def send(self):
        '''
Antoine Millet's avatar
Antoine Millet committed
        Flush the outbound buffer by sending all it content through socket. This
        method is usually called by the :class:`ConnectionManager` of
Antoine Millet's avatar
Antoine Millet committed
        this :class:`RpcConnection`.
        '''

        data = self._outbound_buffer.pull(RpcConnection.SEND_BUF_SIZE)

        while data:
            sent = self._sock.send(data)
            data = data[sent:]

        with self._outbound_buffer:
            if not len(self._outbound_buffer):
                self._manager.nothing_to_write(self.get_fd())
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
    def receive(self):
        '''
        Receive data from socket into the inbound buffer.
Antoine Millet's avatar
Antoine Millet committed
        '''

        try:
            buf = self._sock.recv(RpcConnection.RECV_BUF_SIZE)
        except socket.error as err:
            if isinstance(err, socket.error) and err.errno == 11:
            elif isinstance(err, ssl.SSLError) and err.errno == 2:
                return
            else:
                raise err
Antoine Millet's avatar
Antoine Millet committed
        if not buf:
            # Empty buffer = closed socket.
Antoine Millet's avatar
Antoine Millet committed
            raise socket.error()
Antoine Millet's avatar
Antoine Millet committed
        self._inbound_buffer.push(buf)

        while self._process_inbound():
            pass

    def _process_inbound(self):
        '''
        Process the inbound buffer and :meth:`dispatch` messages.

        :return: False if nothing happened or True
        '''

Antoine Millet's avatar
Antoine Millet committed
        # Read the message length:
        if self._cur_msg_size is None and len(self._inbound_buffer) >= 4:
            length = struct.unpack('!L', self._inbound_buffer.pull(4))[0]
            self._cur_msg_size = length
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        # Read the message payload:
Antoine Millet's avatar
Antoine Millet committed
        if (self._cur_msg_size is not None
Antoine Millet's avatar
Antoine Millet committed
                           and len(self._inbound_buffer) >= self._cur_msg_size):
            payload = self._inbound_buffer.pull(self._cur_msg_size)
            self._cur_msg_size = None
            message = json.loads(payload)
            logging.debug('Received: %s', message)
Antoine Millet's avatar
Antoine Millet committed
            self.dispatch(message)
            return True

        return False
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
    def _send(self, message):
        '''
        Low level method to encode a message in json, calculate it size, and
        place result on outbound buffer.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        .. warning::
           Message must be a jsonisable structure.
        '''

        if not self._connected:
            raise RpcError('SendError', 'disconnected from the peer')

        logging.debug('Sending: %s', message)
Antoine Millet's avatar
Antoine Millet committed
        json_msg = json.dumps(message)
        size = struct.pack('!L', len(json_msg))
        with self._outbound_buffer:
            self._outbound_buffer.push(size + json_msg)
            self._manager.data_to_write(self.get_fd())
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
    def _send_call(self, method_name, *args, **kwargs):
        '''
        Create the message for the call and push them to the outbound queue.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :param method_name: the name of the method to call on the peer
        :param *args: arguments to pass to the remote method
        :param **kwargs: keyword arguments to pass to the remote method
        :return: the generated id for the request
        :rtype: :class:`str` object
        '''
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        msg = RpcConnection.REQUEST_MESSAGE.copy()
        msg['method'] = method_name
        msg['args'] = args
        msg['kwargs'] = kwargs
        msg['id'] = str(uuid4())
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        self._send(msg)
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        return msg['id']
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
    def _send_response(self, msg_id, returned=None, error=None):
        '''
        Low level method to send a response message to the peer.

        :param msg_id: the id of the replied message
        :param returned: returned data
        :type returned: returned data or None if errors have been raised
        :param error: raised errors
        :type error: raised error or None if no error have been raised
        '''
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        msg = RpcConnection.RESPONSE_MESSAGE.copy()
        msg['id'] = msg_id
        msg['return'] = returned
        msg['error'] = error
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        self._send(msg)
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
    def response(self, msg_id, returned):
        '''
        Send an "return" response to the peer.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :param msg_id: the id of the replied message
        :param returned: the value returned by the function
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        .. warning::
Antoine Millet's avatar
Antoine Millet committed
           In case of raised error, use the :meth:`error` method instead of
Antoine Millet's avatar
Antoine Millet committed
           this one.
        '''
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        self._send_response(msg_id, returned=returned)
Antoine Millet's avatar
Antoine Millet committed

    def error(self, msg_id, error, message, traceback=None):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Send an error response to the peer.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :param msg_id: the id of the replied message
        :param error: the name of the raised exception
        :param message: human readable error for the exception
        '''
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        err = {'exception': error, 'message': message}
        self._send_response(msg_id, error=err)
Antoine Millet's avatar
Antoine Millet committed

    def call(self, method_name, *args, **kwargs):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Make a new remote call on the peer.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :param method_name: the method to call on the peer
        :param \*args: the arguments for the call
        :param \*\*kwargs: the keyword arguments for the call
        :return: the data returned by the peer for the call
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        .. note::
Antoine Millet's avatar
Antoine Millet committed
           This function will block until the peer response is received. You
           can also specify a ``timeout`` argument to specify a number of
           seconds before to raise an :exc:`CallTimeout` exception if the peer
Antoine Millet's avatar
Antoine Millet committed
           didnt respond.
        '''

        if '_timeout' in kwargs:
            timeout = kwargs['_timeout']
            del kwargs['_timeout']
        else:
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        # Send the call to the peer:
        msg_id = self._send_call(method_name, *args, **kwargs)
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        # Create an item in calls dict with reference to the event to raise:
Antoine Millet's avatar
Antoine Millet committed
        self._calls[msg_id] = {'return': None, 'error': None,
Antoine Millet's avatar
Antoine Millet committed
                               'event': threading.Event()}
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        # Wait for the response:
        self._calls[msg_id]['event'].wait(timeout)

        # Check if timeout occured:
        if not self._calls[msg_id]['event'].is_set():
            raise RpcError('TimeoutError', 'remote method timeout')
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        # Check if error occured while execution:
        if self._calls[msg_id]['error'] is not None:
            raise RpcError(self._calls[msg_id]['error']['exception'],
                           self._calls[msg_id]['error']['message'])
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        response = self._calls[msg_id]['return']
        del self._calls[msg_id]
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        return response
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
    def async_call(self, method_name, *args, **kwargs):
        '''
        Make a new asynchronous call on the peer.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :param method_name: the method to call on the peer
        :param \*args: the arguments for the call
        :param \*\*kwargs: the keyword arguments for the call
        :return: the message id of the call
        '''

        # Extract _data from argument:
        if '_data' in kwargs:
            data = kwargs['_data']
            del kwargs['_data']
        else:
            data = None

Antoine Millet's avatar
Antoine Millet committed
        # Send the call to the peer:
        msg_id = self._send_call(method_name, *args, **kwargs)

        self._calls[msg_id] = {'async': True, 'data': data}
Antoine Millet's avatar
Antoine Millet committed

        return msg_id

    def dispatch(self, message):
        '''
        Dispatch a received message according to it type.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :param message: the received message to dispatch
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        .. note::
           When dispatching a call, the responsability of response is delegated
Antoine Millet's avatar
Antoine Millet committed
           to the caller, except for the case where the method isn't found on
Antoine Millet's avatar
Antoine Millet committed
           the handler.
        '''
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        if set(RpcConnection.REQUEST_MESSAGE) <= set(message):
            # Handle call requests from the peer:
            if self._handler is not None:
                try:
                    func = self._handler[message['method']]
                except KeyError:
                    self.error(
                        message['id'],
Antoine Millet's avatar
Antoine Millet committed
                        'NameError',
Antoine Millet's avatar
Antoine Millet committed
                        "remote name '%s' is not defined" % message['method'],
                    )
                else:
                    if getattr(func, '__threaded__', True):
                        ThreadedRpcCaller(message, self, func).start()
                    else:
                        RpcCaller(message, self, func).start()
Antoine Millet's avatar
Antoine Millet committed
                self.error(
                    message['id'],
                    'NameError',
                    "remote name '%s' is not defined" % message['method'],
                )
Antoine Millet's avatar
Antoine Millet committed
        elif set(RpcConnection.RESPONSE_MESSAGE) <= set(message):
            # Handle response message from the peer:
            call = self._calls.get(message['id'])
            if call is not None:
                # Call exists in call list
                if message['error'] is None:
                    call['return'] = message['return']
                else:
                    call['error'] = message['error']

                if 'event' in call:
                    # Release the call if its synchronous:
                    call['event'].set()
                else:
                    # Else, it's an asynchonous call, so, we need to notify
                    # manager that the response has been received:
                    message['data'] = call['data']
Antoine Millet's avatar
Antoine Millet committed
                    self._manager.signal_arrival(message)

                    # Message will be deleted from the call list by the
                    # manager while cleanup.

        else:
            logging.debug('Malformed message received: %s', message)
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        '''
        Shutdown this connection.
Antoine Millet's avatar
Antoine Millet committed

        :param callback: Name of the callback to call on the handler or a
            callable to call when the connection is shutdown.
        :type callback: :class:`str` or callable or None
Antoine Millet's avatar
Antoine Millet committed
        '''

        try:
            self._sock.close()
        except socket.error as err:
            logging.debug('Error while socket close: %s.', err)
        # Release all running calls from this connection:
        for cid, call in self._calls.iteritems():
            err = {'exception': 'RpcError',
                   'message': 'Connection reset by peer'}
                call['return'] = None
                call['event'].set()
            else:
                msg = {'id': cid, 'error': err, 'return': None}
                self._manager.signal_arrival(msg)
Antoine Millet's avatar
Antoine Millet committed

        if callback is not None and not callable(callback):
            if self._handler is not None:
                try:
                    callback = self._handler[callback]
                except KeyError:
                    callback = None
Antoine Millet's avatar
Antoine Millet committed
            else:
                callback = None

        if callback is not None:
            try:
                callback(self)
            except Exception as err:
                logging.debug('Error while execution of shutdown '
                              'callback: %s', err)
Antoine Millet's avatar
Antoine Millet committed

    def clean_call(self, msg_id):
        '''
        Delete waiting call by it id in the call list.

        :param msg_id: the id of the call to delete
        :return: True if call have been deleted or False if it doesn't exists
        '''
        try:
            del self._calls[msg_id]
        except KeyError:
            return False
        else:
            return True

    def clean_all_calls(self, msg_ids):
        '''
        Delete all call by their ids in the call list.

        :param msg_id: list of call id to delete
        '''

        for msg_id in msg_ids:
            self.clean_call(msg_id)

    def get_handler(self):
        '''
        Return the handler binded to the :class:`RpcConnection`.

        :return: binded handler
        '''

        return self._handler

    def set_handler(self, handler):
        '''
        Define a new handler for this connection.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :param handler: the new handler to define.
        '''
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        self._handler = handler

    def get_fd(self):
        '''
        Get the file descriptor of the socket managed by this connection.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :return: the file descriptor number of the socket
        '''
        try:
            return self._sock.fileno()
        except socket.error:
            return None

    def getpeername(self):
        '''
        Get the peer name.

        :return: string representing the peer name
        '''

        return '%s:%s' % self._sock.getpeername()