Skip to content
rpc.py 14.5 KiB
Newer Older
from __future__ import absolute_import

from threading import Event, Thread

from sjrpc.core.exceptions import RpcError
from sjrpc.core.protocols import Protocol
__all__ = ['RpcProtocol']
ERRMSG_RPCERR = ('Unable to send reply to the peer: %s (this error is usualy '
                 'raised when connection is lost while handler function '
                 'execution)')


class RpcCaller(object):
    '''
    A caller execute a callable (function, method, class which implement the
    :meth:`__call__` method...) in a particular context (threaded or
    "timeouted" for example), and return the result (or the exception) to the
    peer through it :class:`RpcConnection` object.
    '''

    def __init__(self, request, protocol, func):
        self._request = request
        self._protocol = protocol
        self._func = func

        # Apply the request decorator
        #request_decorator = connection.request_decorator
        #if request_decorator is not None:
        #    self._func = request_decorator(self._func)

    def run(self):
        '''
        Run the callable and return the result (or the exception) to the peer.
        '''
        msg_id = self._request['id']
        args = self._request['args']
        kwargs = self._request['kwargs']

        if getattr(self._func, '__pass_rpc__', False):
            args.insert(0, self._protocol)
        if getattr(self._func, '__pass_connection__', False):
            args.insert(0, self._protocol.connection)
        try:
            returned = self._func(*args, **kwargs)
        except Exception as err:
            try:
                self._protocol.error(msg_id, message=str(err),
                                       error=err.__class__.__name__)
            except RpcError as err:
                self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
        else:
            try:
                self._protocol.response(msg_id, returned=returned)
            except RpcError as err:
                self._protocol.connection.logger.error(ERRMSG_RPCERR, err)

    def start(self):
        '''
        Start execution of the callable, the most of time, it just call
        :meth:`run` method.
        '''
        self.run()


class ThreadedRpcCaller(RpcCaller):

    '''
    A caller which make the call into a separated thread.
    '''

    def __init__(self, *args, **kwargs):
        super(ThreadedRpcCaller, self).__init__(*args, **kwargs)
        self._thread = Thread(target=self.run)
        self._thread.name = 'Processing of call: %s' % self._request['id']
        self._thread.daemon = True

    def start(self):
        self._thread.start()


class RpcProtocol(Protocol):
    The standard protocol used to do RPC request/responses.

    :param connection: the connection serving this :class:`RpcProtocol`
    :param label: the label of this :class:`RpcProtocol` instance
    :param handler: command handler to bind by default
    :param on_disconnect: callback called when client disconnect
    :param timeout: global command timeout
    '''

    REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}}
    RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None}
    SPECIAL_MESSAGE = {'special': None}

    def __init__(self, connection, label, handler=None, on_disconnect=None,
                 timeout=30, *args, **kwargs):
        super(RpcProtocol, self).__init__(connection, label, *args, **kwargs)
        self._handler = handler
        self._on_disconnect = on_disconnect
        self._call_timeout = timeout

        # Store all calls sent to the peer. Key is the id of the call and value
        # the event to raise when call is finished.
        self._calls = {}

    def _dispatch(self, message):
        '''
        Dispatch a received message according to it type.

        :param message: the received message to dispatch

        .. note::
           When dispatching a call, the responsability of response is delegated
           to the caller, except for the case where the method isn't found on
           the handler.
        '''

        self.logger.debug('Received: %s', message)
        if set(RpcProtocol.REQUEST_MESSAGE) <= set(message):
            self._handle_request(message)
        elif set(RpcProtocol.RESPONSE_MESSAGE) <= set(message):
            self._handle_response(message)
        elif set(RpcProtocol.SPECIAL_MESSAGE) <= set(message):
            self._handle_special(message)
        else:
            self.logger.debug('Malformed message received: %s', message)

    def _handle_request(self, message):
        '''
        Handle an inbound request message.
        '''
        if self._handler is not None:
            try:
                func = self._handler[message['method']]
            except KeyError:
                self.error(message['id'], 'NameError',
                           "remote name '%s' is not defined" % message['method'])
            else:
                if getattr(func, '__threaded__', True):
                    ThreadedRpcCaller(message, self, func).start()
                else:
                    RpcCaller(message, self, func).start()
        else:
            self.error(message['id'], 'NameError',
                       "remote name '%s' is not defined" % message['method'])

    def _handle_response(self, message):
        '''
        Handle an inbound response message
        '''
        # Handle response message from the peer:
        call = self._calls.get(message['id'])
        if call is not None:
            # Call exists in call list
            if message['error'] is None:
                call['return'] = message['return']
            else:
                call['error'] = message['error']

            if 'event' in call:
                # Release the call if its synchronous:
                call['event'].set()
            else:
                # Else, it's an asynchonous call, we need to push the answer
                # on the queue:
                queue = call['queue']
                del call['queue']
                queue.put(call)

            # Finally, delete the call from the current running call list:
            del self._calls[message['id']]

    def _handle_special(self, message):
        '''
        Handle special message.
        '''
        if message['special'] == 'capabilities':
            if self._label == 0:
                self._connection.set_capabilities(message.get('capabilities'))
            else:
                self.logger.warning('Capabilities message received by non-zero'
                                    ' rpc.')
        elif message['special'] == 'protoctl':
            label = message.get('label')
            if label is None:
                self.logger.warning('Protoctl message received without label.')
                return
            try:
                proto = self._connection.get_protocol(label)
            except KeyError:
                self.logger.warning('Protoctl message received for unknown label')
            else:
                try:
                    proto.handle_control(message.get('type'),
                                         message.get('payload'))
                except Exception as err:
                    self.logger.error('Protoctl handler failed for proto %s: ',
                                      '%s' % err)
    def _send(self, message):
        '''
        Low level method to encode a message in json, calculate it size, and
        place result on outbound buffer.

        .. warning::
           Message must be a jsonisable structure.
        '''

        #if not self._connected: #FIXME
        #    raise RpcError('SendError', 'disconnected from the peer')

        self.logger.debug('Sending: %s', message)
        json_msg = json.dumps(message)
        self._connection.send(self._label, payload=json_msg)

    def _send_call(self, method_name, *args, **kwargs):
        '''
        Create the message for the call and push them to the outbound queue.

        :param method_name: the name of the method to call on the peer
        :param *args: arguments to pass to the remote method
        :param **kwargs: keyword arguments to pass to the remote method
        :return: the generated id for the request
        :rtype: :class:`str` object
        '''

        msg = RpcProtocol.REQUEST_MESSAGE.copy()
        msg['method'] = method_name
        msg['args'] = args
        msg['kwargs'] = kwargs
        msg['id'] = str(uuid4())

        self._send(msg)

        return msg['id']

    def _send_response(self, msg_id, returned=None, error=None):
        '''
        Low level method to send a response message to the peer.

        :param msg_id: the id of the replied message
        :param returned: returned data
        :type returned: returned data or None if errors have been raised
        :param error: raised errors
        :type error: raised error or None if no error have been raised
        '''

        msg = RpcProtocol.RESPONSE_MESSAGE.copy()
        msg['id'] = msg_id
        msg['return'] = returned
        msg['error'] = error

        self._send(msg)

#
# Public methods:
#

    def end_of_message(self):
        '''
        When the message is fully received, decode the json and dispatch it.
        '''
        msg = json.loads(self._incoming_buf)
        self._dispatch(msg)

    def shutdown(self):
        '''
        Handle the shutdown process of this protocol instance:

        * Release all waiting calls with a "Connection reset by peer" error.
        * Execute the on_disconnect callback.
        '''
        # Release all waiting calls from this rpc:
        for cid in self._calls.keys():
            err = {'exception': 'RpcError',
                   'message': 'Connection reset by peer'}
            self._handle_response({'id': cid, 'return': None, 'error': err})

        # Execute on_disconnect callback:
        callback = None
        if self._on_disconnect is not None and not callable(self._on_disconnect):
            if self._handler is not None:
                try:
                    callback = self._handler[self._on_disconnect]
                except KeyError:
                    self.logger.warn('Shutdown callback not found in current '
                                     'rpc attached handler, ignoring')
                    callback = None
            else:
                self.logger.warn('Shutdown callback specified but no handler '
                                 'binded on rpc, ignoring')
                callback = None

        if callback is not None:
            try:
                callback(self._connection)
            except Exception as err:
                self.logger.debug('Error while execution of shutdown '
                                  'callback: %s', err)

    def get_handler(self):
        '''
        Return the handler binded to the :class:`RpcConnection`.

        :return: binded handler
        '''
        return self._handler

    def set_handler(self, handler):
        '''
        Define a new handler for this connection.

        :param handler: the new handler to define.
        '''
        self._handler = handler

    def send_special(self, special, **kwargs):
        '''
        Send a "special" message to the peer.

        :param special: type of the special message
        :param \*\*kwargs: fields of the special message
        '''
        msg = {'special': special}
        msg.update(kwargs)
        self._send(msg)

    def response(self, msg_id, returned):
        '''
        Send an "return" response to the peer.

        :param msg_id: the id of the replied message
        :param returned: the value returned by the function

        .. warning::
           In case of raised error, use the :meth:`error` method instead of
           this one.
        '''

        self._send_response(msg_id, returned=returned)

    def error(self, msg_id, error, message, traceback=None):
        '''
        Send an error response to the peer.

        :param msg_id: the id of the replied message
        :param error: the name of the raised exception
        :param message: human readable error for the exception
        '''

        err = {'exception': error, 'message': message}
        self._send_response(msg_id, error=err)

    def call(self, method_name, *args, **kwargs):
        '''
        Make a new remote call on the peer.

        :param method_name: the method to call on the peer
        :param \*args: the arguments for the call
        :param \*\*kwargs: the keyword arguments for the call
        :return: the data returned by the peer for the call

        .. note::
           This function will block until the peer response is received. You
           can also specify a ``timeout`` argument to specify a number of
           seconds before to raise an :exc:`CallTimeout` exception if the peer
           didnt respond.
        '''

        if '_timeout' in kwargs:
            timeout = kwargs['_timeout']
            del kwargs['_timeout']
        else:
            timeout = self._call_timeout

        # Send the call to the peer:
        msg_id = self._send_call(method_name, *args, **kwargs)

        # Create an item in calls dict with reference to the event to raise:
        call = {'return': None, 'error': None, 'event': Event(), 'id': msg_id}
        self._calls[msg_id] = call

        # Wait for the response:
        call['event'].wait(timeout)

        # Check if timeout occured:
        if not call['event'].is_set():
            raise RpcError('TimeoutError', 'remote method timeout')

        # Check if error occured while execution:
        if call['error'] is not None:
            raise RpcError(call['error']['exception'],
                           call['error']['message'])

        return call['return']

    def async_call(self, queue, method_name, *args, **kwargs):
        '''
        Make a new asynchronous call on the peer.

        :param queue: the queue where to push the response when received
        :param method_name: the method to call on the peer
        :param _data: local data to give back on the response
        :param \*args: the arguments for the call
        :param \*\*kwargs: the keyword arguments for the call
        :return: the message id of the call
        '''

        # Extract _data from argument:
        if '_data' in kwargs:
            data = kwargs['_data']
            del kwargs['_data']
        else:
            data = None
        # Send the call to the peer:
        msg_id = self._send_call(method_name, *args, **kwargs)
        # Register the call but don't wait for the response:
        self._calls[msg_id] = {'id': msg_id, 'async': True, 
                               'data': data, 'queue': queue}
        return msg_id