Skip to content
rpc.py 10.6 KiB
Newer Older
import json
import logging
from uuid import uuid4

from threading import Event

from sjrpc.core.callers import RpcCaller, ThreadedRpcCaller
from sjrpc.core.exceptions import RpcError


class RpcProtocol(object):

    REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}}
    RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None}

    '''
    :param connection: the connection serving this :class:`RpcProtocol`
    :param label: the label of this :class:`RpcProtocol` instance
    :param handler: command handler to bind by default
    :param on_disconnect: callback called when client disconnect
    :param request_decorator: decorator applied on each handler function
    :param timeout: global command timeout
    :param logger: logging module :class:`Logger` instance
    '''

    def __init__(self, connection, label, handler=None, on_disconnect=None,
                 request_decorator=None, timeout=30, logger=None):
        self._connection = connection
        self._label = label
        self._handler = handler
        self._on_disconnect = on_disconnect
        self.request_decorator = request_decorator
        self._call_timeout = timeout
        if logger is None:
            logger_name = '%s.protos.%s' % (connection.logger.name, label)
            self.logger = logging.getLogger(logger_name)
        else:
            self.logger = logger

        # Store all calls sent to the peer. Key is the id of the call and value
        # the event to raise when call is finished.
        self._calls = {}

    @property
    def connection(self):
        return self._connection

    def handle(self, label, size):
        '''
        Decode json, and dispatch message.
        '''
        buf = self._connection.recv_until(size)
        msg = json.loads(buf)
        self._dispatch(msg)

    def shutdown(self):
        # Release all waiting calls from this rpc:
        for cid in self._calls.keys():
            err = {'exception': 'RpcError',
                   'message': 'Connection reset by peer'}
            self._handle_response({'id': cid, 'return': None, 'error': err})

        # Execute on_disconnect callback:
        callback = None
        if self._on_disconnect is not None and not callable(self._on_disconnect):
            if self._handler is not None:
                try:
                    callback = self._handler[self._on_disconnect]
                except KeyError:
                    self.logger.warn('Shutdown callback not found in current '
                                     'rpc attached handler, ignoring')
                    callback = None
            else:
                self.logger.warn('Shutdown callback specified but no handler '
                                 'binded on rpc, ignoring')
                callback = None

        if callback is not None:
            try:
                callback(self._connection)
            except Exception as err:
                self.logger.debug('Error while execution of shutdown '
                                  'callback: %s', err)

    def get_handler(self):
        '''
        Return the handler binded to the :class:`RpcConnection`.

        :return: binded handler
        '''
        return self._handler

    def set_handler(self, handler):
        '''
        Define a new handler for this connection.

        :param handler: the new handler to define.
        '''
        self._handler = handler

    def _dispatch(self, message):
        '''
        Dispatch a received message according to it type.

        :param message: the received message to dispatch

        .. note::
           When dispatching a call, the responsability of response is delegated
           to the caller, except for the case where the method isn't found on
           the handler.
        '''

        self.logger.debug('Received: %s', message)
        if set(RpcProtocol.REQUEST_MESSAGE) <= set(message):
            self._handle_request(message)
        elif set(RpcProtocol.RESPONSE_MESSAGE) <= set(message):
            self._handle_response(message)
        else:
            self.logger.debug('Malformed message received: %s', message)

    def _handle_request(self, message):
        '''
        Handle an inbound request message.
        '''
        if self._handler is not None:
            try:
                func = self._handler[message['method']]
            except KeyError:
                self.error(message['id'], 'NameError',
                           "remote name '%s' is not defined" % message['method'])
            else:
                if getattr(func, '__threaded__', True):
                    ThreadedRpcCaller(message, self, func).start()
                else:
                    RpcCaller(message, self, func).start()
        else:
            self.error(message['id'], 'NameError',
                       "remote name '%s' is not defined" % message['method'])

    def _handle_response(self, message):
        '''
        Handle an inbound response message
        '''
        # Handle response message from the peer:
        call = self._calls.get(message['id'])
        if call is not None:
            # Call exists in call list
            if message['error'] is None:
                call['return'] = message['return']
            else:
                call['error'] = message['error']

            if 'event' in call:
                # Release the call if its synchronous:
                call['event'].set()
            else:
                # Else, it's an asynchonous call, we need to push the answer
                # on the queue:
                queue = call['queue']
                del call['queue']
                queue.put(call)

            # Finally, delete the call from the current running call list:
            del self._calls[message['id']]

    def _send(self, message):
        '''
        Low level method to encode a message in json, calculate it size, and
        place result on outbound buffer.

        .. warning::
           Message must be a jsonisable structure.
        '''

        #if not self._connected: #FIXME
        #    raise RpcError('SendError', 'disconnected from the peer')

        self.logger.debug('Sending: %s', message)
        json_msg = json.dumps(message)
        self._connection.send(self._label, payload=json_msg)

    def _send_call(self, method_name, *args, **kwargs):
        '''
        Create the message for the call and push them to the outbound queue.

        :param method_name: the name of the method to call on the peer
        :param *args: arguments to pass to the remote method
        :param **kwargs: keyword arguments to pass to the remote method
        :return: the generated id for the request
        :rtype: :class:`str` object
        '''

        msg = RpcProtocol.REQUEST_MESSAGE.copy()
        msg['method'] = method_name
        msg['args'] = args
        msg['kwargs'] = kwargs
        msg['id'] = str(uuid4())

        self._send(msg)

        return msg['id']

    def _send_response(self, msg_id, returned=None, error=None):
        '''
        Low level method to send a response message to the peer.

        :param msg_id: the id of the replied message
        :param returned: returned data
        :type returned: returned data or None if errors have been raised
        :param error: raised errors
        :type error: raised error or None if no error have been raised
        '''

        msg = RpcProtocol.RESPONSE_MESSAGE.copy()
        msg['id'] = msg_id
        msg['return'] = returned
        msg['error'] = error

        self._send(msg)

    def response(self, msg_id, returned):
        '''
        Send an "return" response to the peer.

        :param msg_id: the id of the replied message
        :param returned: the value returned by the function

        .. warning::
           In case of raised error, use the :meth:`error` method instead of
           this one.
        '''

        self._send_response(msg_id, returned=returned)

    def error(self, msg_id, error, message, traceback=None):
        '''
        Send an error response to the peer.

        :param msg_id: the id of the replied message
        :param error: the name of the raised exception
        :param message: human readable error for the exception
        '''

        err = {'exception': error, 'message': message}
        self._send_response(msg_id, error=err)

    def call(self, method_name, *args, **kwargs):
        '''
        Make a new remote call on the peer.

        :param method_name: the method to call on the peer
        :param \*args: the arguments for the call
        :param \*\*kwargs: the keyword arguments for the call
        :return: the data returned by the peer for the call

        .. note::
           This function will block until the peer response is received. You
           can also specify a ``timeout`` argument to specify a number of
           seconds before to raise an :exc:`CallTimeout` exception if the peer
           didnt respond.
        '''

        if '_timeout' in kwargs:
            timeout = kwargs['_timeout']
            del kwargs['_timeout']
        else:
            timeout = self._call_timeout

        # Send the call to the peer:
        msg_id = self._send_call(method_name, *args, **kwargs)

        # Create an item in calls dict with reference to the event to raise:
        call = {'return': None, 'error': None, 'event': Event(), 'id': msg_id}
        self._calls[msg_id] = call

        # Wait for the response:
        call['event'].wait(timeout)

        # Check if timeout occured:
        if not call['event'].is_set():
            raise RpcError('TimeoutError', 'remote method timeout')

        # Check if error occured while execution:
        if call['error'] is not None:
            raise RpcError(call['error']['exception'],
                           call['error']['message'])

        return call['return']

    def async_call(self, queue, method_name, *args, **kwargs):
        '''
        Make a new asynchronous call on the peer.

        :param queue: the queue where to push the response when received
        :param method_name: the method to call on the peer
        :param _data: local data to give back on the response
        :param \*args: the arguments for the call
        :param \*\*kwargs: the keyword arguments for the call
        :return: the message id of the call
        '''

        # Extract _data from argument:
        if '_data' in kwargs:
            data = kwargs['_data']
            del kwargs['_data']
        else:
            data = None
        # Send the call to the peer:
        msg_id = self._send_call(method_name, *args, **kwargs)
        # Register the call but don't wait for the response:
        self._calls[msg_id] = {'id': msg_id, 'async': True, 
                               'data': data, 'queue': queue}
        return msg_id