Commit 91fc5eec authored by Antoine Millet's avatar Antoine Millet

Moved callers to sjrpc.core.protocols.rpc module

parent 9d3610c5
import threading
from sjrpc.core.exceptions import RpcError
ERRMSG_RPCERR = ('Unable to send reply to the peer: %s (this error is usualy '
'raised when connection is lost while handler function '
'execution)')
class RpcCaller(object):
'''
A caller execute a callable (function, method, class which implement the
:meth:`__call__` method...) in a particular context (threaded or
"timeouted" for example), and return the result (or the exception) to the
peer through it :class:`RpcConnection` object.
'''
def __init__(self, request, protocol, func):
self._request = request
self._protocol = protocol
self._func = func
# Apply the request decorator
#request_decorator = connection.request_decorator
#if request_decorator is not None:
# self._func = request_decorator(self._func)
def run(self):
'''
Run the callable and return the result (or the exception) to the peer.
'''
msg_id = self._request['id']
args = self._request['args']
kwargs = self._request['kwargs']
if getattr(self._func, '__pass_rpc__', False):
args.insert(0, self._protocol)
if getattr(self._func, '__pass_connection__', False):
args.insert(0, self._protocol.connection)
try:
returned = self._func(*args, **kwargs)
except Exception as err:
try:
self._protocol.error(msg_id, message=str(err),
error=err.__class__.__name__)
except RpcError as err:
self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
else:
try:
self._protocol.response(msg_id, returned=returned)
except RpcError as err:
self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
def start(self):
'''
Start execution of the callable, the most of time, it just call
:meth:`run` method.
'''
self.run()
class ThreadedRpcCaller(RpcCaller):
'''
A caller which make the call into a separated thread.
'''
def __init__(self, *args, **kwargs):
super(ThreadedRpcCaller, self).__init__(*args, **kwargs)
self._thread = threading.Thread(target=self.run)
self._thread.name = 'Processing of call: %s' % self._request['id']
self._thread.daemon = True
def start(self):
self._thread.start()
......@@ -3,14 +3,86 @@ from __future__ import absolute_import
import json
from uuid import uuid4
from threading import Event
from threading import Event, Thread
from sjrpc.core.callers import RpcCaller, ThreadedRpcCaller
from sjrpc.core.exceptions import RpcError
from sjrpc.core.protocols import Protocol
__all__ = ['RpcProtocol']
ERRMSG_RPCERR = ('Unable to send reply to the peer: %s (this error is usualy '
'raised when connection is lost while handler function '
'execution)')
class RpcCaller(object):
'''
A caller execute a callable (function, method, class which implement the
:meth:`__call__` method...) in a particular context (threaded or
"timeouted" for example), and return the result (or the exception) to the
peer through it :class:`RpcConnection` object.
'''
def __init__(self, request, protocol, func):
self._request = request
self._protocol = protocol
self._func = func
# Apply the request decorator
#request_decorator = connection.request_decorator
#if request_decorator is not None:
# self._func = request_decorator(self._func)
def run(self):
'''
Run the callable and return the result (or the exception) to the peer.
'''
msg_id = self._request['id']
args = self._request['args']
kwargs = self._request['kwargs']
if getattr(self._func, '__pass_rpc__', False):
args.insert(0, self._protocol)
if getattr(self._func, '__pass_connection__', False):
args.insert(0, self._protocol.connection)
try:
returned = self._func(*args, **kwargs)
except Exception as err:
try:
self._protocol.error(msg_id, message=str(err),
error=err.__class__.__name__)
except RpcError as err:
self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
else:
try:
self._protocol.response(msg_id, returned=returned)
except RpcError as err:
self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
def start(self):
'''
Start execution of the callable, the most of time, it just call
:meth:`run` method.
'''
self.run()
class ThreadedRpcCaller(RpcCaller):
'''
A caller which make the call into a separated thread.
'''
def __init__(self, *args, **kwargs):
super(ThreadedRpcCaller, self).__init__(*args, **kwargs)
self._thread = Thread(target=self.run)
self._thread.name = 'Processing of call: %s' % self._request['id']
self._thread.daemon = True
def start(self):
self._thread.start()
class RpcProtocol(Protocol):
'''
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment