Commit 88d12c96 authored by Antoine Millet's avatar Antoine Millet

Whitespaces.

parent ea655571
......@@ -23,39 +23,39 @@ class RpcConnection(object):
'''
This class manage a single peer connection.
'''
RECV_BUF_SIZE = 4096
SEND_BUF_SIZE = 4096
REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}}
RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None}
def __init__(self, sock, manager, handler=None, timeout=None):
# Sock of this connection:
self._sock = sock
# Activate TCP keepalive on the connection:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Inbound buffer containing currently readed message while it sending:
self._inbound_buffer = BytesBuffer()
# The size of the currently readed message, None value significate that
# the next message size have to be readed:
self._cur_msg_size = None
# Outbound buffer containing data to write on the socket:
self._outbound_buffer = BytesBuffer()
# Reference to the RPC handler, this attribute can be set to None if
# no handler is defined for this connection (for example, for a client
# connection).
self._handler = handler
# Store all calls sent to the peer. Key is the id of the call and value
# the event to raise when call is finished.
self._calls = {}
# The manager of this connection
# The manager of this connection
self._manager = manager
# Is the RpcConnection connected to its peer:
......@@ -102,8 +102,8 @@ class RpcConnection(object):
def send(self):
'''
Flush the outbound buffer by sending all it content through socket. This
method is usually called by the :class:`ConnectionManager` of
Flush the outbound buffer by sending all it content through socket. This
method is usually called by the :class:`ConnectionManager` of
this :class:`RpcConnection`.
'''
......@@ -116,7 +116,7 @@ class RpcConnection(object):
with self._outbound_buffer:
if not len(self._outbound_buffer):
self._manager.nothing_to_write(self)
def receive(self):
'''
Receive data from socket into the inbound buffer.
......@@ -153,9 +153,9 @@ class RpcConnection(object):
length = struct.unpack('!L', self._inbound_buffer.pull(4))[0]
self._cur_msg_size = length
return True
# Read the message payload:
if (self._cur_msg_size is not None
if (self._cur_msg_size is not None
and len(self._inbound_buffer) >= self._cur_msg_size):
payload = self._inbound_buffer.pull(self._cur_msg_size)
self._cur_msg_size = None
......@@ -165,12 +165,12 @@ class RpcConnection(object):
return True
return False
def _send(self, message):
'''
Low level method to encode a message in json, calculate it size, and
place result on outbound buffer.
.. warning::
Message must be a jsonisable structure.
'''
......@@ -184,28 +184,28 @@ class RpcConnection(object):
with self._outbound_buffer:
self._outbound_buffer.push(size + json_msg)
self._manager.data_to_write(self)
def _send_call(self, method_name, *args, **kwargs):
'''
Create the message for the call and push them to the outbound queue.
:param method_name: the name of the method to call on the peer
:param *args: arguments to pass to the remote method
:param **kwargs: keyword arguments to pass to the remote method
:return: the generated id for the request
:rtype: :class:`str` object
'''
msg = RpcConnection.REQUEST_MESSAGE.copy()
msg['method'] = method_name
msg['args'] = args
msg['kwargs'] = kwargs
msg['id'] = str(uuid4())
self._send(msg)
return msg['id']
def _send_response(self, msg_id, returned=None, error=None):
'''
Low level method to send a response message to the peer.
......@@ -216,53 +216,53 @@ class RpcConnection(object):
:param error: raised errors
:type error: raised error or None if no error have been raised
'''
msg = RpcConnection.RESPONSE_MESSAGE.copy()
msg['id'] = msg_id
msg['return'] = returned
msg['error'] = error
self._send(msg)
def response(self, msg_id, returned):
'''
Send an "return" response to the peer.
:param msg_id: the id of the replied message
:param returned: the value returned by the function
.. warning::
In case of raised error, use the :meth:`error` method instead of
In case of raised error, use the :meth:`error` method instead of
this one.
'''
self._send_response(msg_id, returned=returned)
def error(self, msg_id, error, message, traceback=None):
'''
Send an error response to the peer.
:param msg_id: the id of the replied message
:param error: the name of the raised exception
:param message: human readable error for the exception
'''
err = {'exception': error, 'message': message}
self._send_response(msg_id, error=err)
def call(self, method_name, *args, **kwargs):
'''
Make a new remote call on the peer.
:param method_name: the method to call on the peer
:param \*args: the arguments for the call
:param \*\*kwargs: the keyword arguments for the call
:return: the data returned by the peer for the call
.. note::
This function will block until the peer response is received. You
can also specify a ``timeout`` argument to specify a number of
seconds before to raise an :exc:`CallTimeout` exception if the peer
This function will block until the peer response is received. You
can also specify a ``timeout`` argument to specify a number of
seconds before to raise an :exc:`CallTimeout` exception if the peer
didnt respond.
'''
......@@ -271,35 +271,35 @@ class RpcConnection(object):
del kwargs['_timeout']
else:
timeout = self._call_timeout
# Send the call to the peer:
msg_id = self._send_call(method_name, *args, **kwargs)
# Create an item in calls dict with reference to the event to raise:
self._calls[msg_id] = {'return': None, 'error': None,
self._calls[msg_id] = {'return': None, 'error': None,
'event': threading.Event()}
# Wait for the response:
self._calls[msg_id]['event'].wait(timeout)
# Check if timeout occured:
if not self._calls[msg_id]['event'].is_set():
raise RpcError('TimeoutError', 'remote method timeout')
# Check if error occured while execution:
if self._calls[msg_id]['error'] is not None:
raise RpcError(self._calls[msg_id]['error']['exception'],
self._calls[msg_id]['error']['message'])
response = self._calls[msg_id]['return']
del self._calls[msg_id]
return response
def async_call(self, method_name, *args, **kwargs):
'''
Make a new asynchronous call on the peer.
:param method_name: the method to call on the peer
:param \*args: the arguments for the call
:param \*\*kwargs: the keyword arguments for the call
......@@ -323,12 +323,12 @@ class RpcConnection(object):
def dispatch(self, message):
'''
Dispatch a received message according to it type.
:param message: the received message to dispatch
.. note::
When dispatching a call, the responsability of response is delegated
to the caller, except for the case where the method isn't found on
to the caller, except for the case where the method isn't found on
the handler.
'''
......@@ -340,7 +340,7 @@ class RpcConnection(object):
except KeyError:
self.error(
message['id'],
'NameError',
'NameError',
"remote name '%s' is not defined" % message['method'],
)
else:
......@@ -378,11 +378,11 @@ class RpcConnection(object):
else:
logging.debug('Malformed message received: %s', message)
def shutdown(self, callback=None):
'''
Shutdown this connection.
:param callback: Name of the callback to call on the handler or a
callable to call when the connection is shutdown.
:type callback: :class:`str` or callable or None
......@@ -406,7 +406,7 @@ class RpcConnection(object):
else:
msg = {'id': cid, 'error': err, 'return': None}
self._manager.signal_arrival(msg)
if callback is not None and not callable(callback):
if self._handler is not None:
try:
......@@ -459,16 +459,16 @@ class RpcConnection(object):
def set_handler(self, handler):
'''
Define a new handler for this connection.
:param handler: the new handler to define.
'''
self._handler = handler
def get_fd(self):
'''
Get the file descriptor of the socket managed by this connection.
:return: the file descriptor number of the socket
'''
try:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment