Commit a66b93d7 authored by Antoine Millet's avatar Antoine Millet
Browse files

Implemented deferred responses

parent 7ab26906
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -40,3 +40,9 @@ class FallbackModeEnabledError(RpcConnectionError):
    """ Exception raised when a feature which is not compatible with fallback
        mode is used.
    """


class AlreadyAnsweredError(Exception):

    """ Exception raised when a deferred response has already been answered.
    """
 No newline at end of file
+49 −5
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@ from uuid import uuid4

from threading import Event, Thread

from sjrpc.core.exceptions import RpcError, RpcConnectionError
from sjrpc.core.exceptions import RpcError, RpcConnectionError, AlreadyAnsweredError
from sjrpc.core.protocols import Protocol

__all__ = ['RpcProtocol']
@@ -15,6 +15,47 @@ ERRMSG_RPCERR = ('Unable to send reply to the peer: %s (this error is usualy '
                 'execution)')


class DeferredResponse(object):
    """ Allow to defer an RPC response.
    """

    def __init__(self, rpc, msg_id):
        self._rpc = rpc
        self._msg_id = msg_id
        self._already_answered = False

    def error(self, message, error):
        """ Return an error.
        """
        if not self._already_answered:
            self._already_answered = True
            self._rpc.error(self._msg_id, message=message, error=error)
        else:
            raise AlreadyAnsweredError('You already answered to this request')

    def error_exception(self, exception):
        """ Return an error using an exception.
        """
        self.error(str(exception), error=exception.__class__.__name__)

    def response(self, value):
        """ Return a value.
        """
        if not self._already_answered:
            self._already_answered = True
            self._rpc.response(self._msg_id, returned=value)
        else:
            raise AlreadyAnsweredError('You already answered to this request')

    def __enter__(self):
        return self.response

    def __exit__(self, exc_type, exc_value, tb):
        if exc_type is not None:
            self.error(str(exc_value), exc_type.__name__)
            return True


class RpcCaller(object):
    """ A caller execute a callable (function, method, class which implement the
        :meth:`__call__` method...) in a particular context (threaded or
@@ -39,6 +80,8 @@ class RpcCaller(object):
            args.insert(0, self._protocol)
        if getattr(self._func, '__pass_connection__', False):
            args.insert(0, self._protocol.connection)
        if getattr(self._func, '__pass_deferred_response__', False):
            args.insert(0, DeferredResponse(self._protocol, msg_id))
        try:
            returned = self._func(*args, **kwargs)
        except Exception as err:
@@ -48,6 +91,7 @@ class RpcCaller(object):
            except RpcError as err:
                self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
        else:
            if not isinstance(returned, DeferredResponse):
                try:
                    self._protocol.response(msg_id, returned=returned)
                except RpcError as err:
+1 −1
Original line number Diff line number Diff line
@@ -2,5 +2,5 @@ from sjrpc.utils.proxies import *
from sjrpc.utils.handlers import *

__all__ = ('ConnectionProxy', 'RpcHandler', 'threadless', 'pure',
           'pass_connection', 'pass_rpc')
           'pass_connection', 'pass_rpc', 'pass_deferred_response')
+8 −0
Original line number Diff line number Diff line
@@ -53,3 +53,11 @@ def pass_rpc(func):

    func.__pass_rpc__ = True
    return func

def pass_deferred_response(func):
    """ Function handler decorator -- pass on first argument a deferred reponse
        object.
    """

    func.__pass_deferred_response__ = True
    return func