From 88d12c96195a2600185395684a90e535559cc2b7 Mon Sep 17 00:00:00 2001 From: Antoine Millet Date: Thu, 26 May 2011 18:37:35 +0200 Subject: [PATCH] Whitespaces. --- sjrpc/core/rpcconnection.py | 114 ++++++++++++++++++------------------ 1 file changed, 57 insertions(+), 57 deletions(-) diff --git a/sjrpc/core/rpcconnection.py b/sjrpc/core/rpcconnection.py index e8d3d04..b5f1561 100644 --- a/sjrpc/core/rpcconnection.py +++ b/sjrpc/core/rpcconnection.py @@ -23,39 +23,39 @@ class RpcConnection(object): ''' This class manage a single peer connection. ''' - + RECV_BUF_SIZE = 4096 SEND_BUF_SIZE = 4096 REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}} RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None} - + def __init__(self, sock, manager, handler=None, timeout=None): # Sock of this connection: self._sock = sock # Activate TCP keepalive on the connection: self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - + # Inbound buffer containing currently readed message while it sending: self._inbound_buffer = BytesBuffer() - + # The size of the currently readed message, None value significate that # the next message size have to be readed: self._cur_msg_size = None - + # Outbound buffer containing data to write on the socket: self._outbound_buffer = BytesBuffer() - + # Reference to the RPC handler, this attribute can be set to None if # no handler is defined for this connection (for example, for a client # connection). self._handler = handler - + # Store all calls sent to the peer. Key is the id of the call and value # the event to raise when call is finished. self._calls = {} - - # The manager of this connection + + # The manager of this connection self._manager = manager # Is the RpcConnection connected to its peer: @@ -102,8 +102,8 @@ class RpcConnection(object): def send(self): ''' - Flush the outbound buffer by sending all it content through socket. This - method is usually called by the :class:`ConnectionManager` of + Flush the outbound buffer by sending all it content through socket. This + method is usually called by the :class:`ConnectionManager` of this :class:`RpcConnection`. ''' @@ -116,7 +116,7 @@ class RpcConnection(object): with self._outbound_buffer: if not len(self._outbound_buffer): self._manager.nothing_to_write(self) - + def receive(self): ''' Receive data from socket into the inbound buffer. @@ -153,9 +153,9 @@ class RpcConnection(object): length = struct.unpack('!L', self._inbound_buffer.pull(4))[0] self._cur_msg_size = length return True - + # Read the message payload: - if (self._cur_msg_size is not None + if (self._cur_msg_size is not None and len(self._inbound_buffer) >= self._cur_msg_size): payload = self._inbound_buffer.pull(self._cur_msg_size) self._cur_msg_size = None @@ -165,12 +165,12 @@ class RpcConnection(object): return True return False - + def _send(self, message): ''' Low level method to encode a message in json, calculate it size, and place result on outbound buffer. - + .. warning:: Message must be a jsonisable structure. ''' @@ -184,28 +184,28 @@ class RpcConnection(object): with self._outbound_buffer: self._outbound_buffer.push(size + json_msg) self._manager.data_to_write(self) - + def _send_call(self, method_name, *args, **kwargs): ''' Create the message for the call and push them to the outbound queue. - + :param method_name: the name of the method to call on the peer :param *args: arguments to pass to the remote method :param **kwargs: keyword arguments to pass to the remote method :return: the generated id for the request :rtype: :class:`str` object ''' - + msg = RpcConnection.REQUEST_MESSAGE.copy() msg['method'] = method_name msg['args'] = args msg['kwargs'] = kwargs msg['id'] = str(uuid4()) - + self._send(msg) - + return msg['id'] - + def _send_response(self, msg_id, returned=None, error=None): ''' Low level method to send a response message to the peer. @@ -216,53 +216,53 @@ class RpcConnection(object): :param error: raised errors :type error: raised error or None if no error have been raised ''' - + msg = RpcConnection.RESPONSE_MESSAGE.copy() msg['id'] = msg_id msg['return'] = returned msg['error'] = error - + self._send(msg) - + def response(self, msg_id, returned): ''' Send an "return" response to the peer. - + :param msg_id: the id of the replied message :param returned: the value returned by the function - + .. warning:: - In case of raised error, use the :meth:`error` method instead of + In case of raised error, use the :meth:`error` method instead of this one. ''' - + self._send_response(msg_id, returned=returned) - + def error(self, msg_id, error, message, traceback=None): ''' Send an error response to the peer. - + :param msg_id: the id of the replied message :param error: the name of the raised exception :param message: human readable error for the exception ''' - + err = {'exception': error, 'message': message} self._send_response(msg_id, error=err) - + def call(self, method_name, *args, **kwargs): ''' Make a new remote call on the peer. - + :param method_name: the method to call on the peer :param \*args: the arguments for the call :param \*\*kwargs: the keyword arguments for the call :return: the data returned by the peer for the call - + .. note:: - This function will block until the peer response is received. You - can also specify a ``timeout`` argument to specify a number of - seconds before to raise an :exc:`CallTimeout` exception if the peer + This function will block until the peer response is received. You + can also specify a ``timeout`` argument to specify a number of + seconds before to raise an :exc:`CallTimeout` exception if the peer didnt respond. ''' @@ -271,35 +271,35 @@ class RpcConnection(object): del kwargs['_timeout'] else: timeout = self._call_timeout - + # Send the call to the peer: msg_id = self._send_call(method_name, *args, **kwargs) - + # Create an item in calls dict with reference to the event to raise: - self._calls[msg_id] = {'return': None, 'error': None, + self._calls[msg_id] = {'return': None, 'error': None, 'event': threading.Event()} - + # Wait for the response: self._calls[msg_id]['event'].wait(timeout) # Check if timeout occured: if not self._calls[msg_id]['event'].is_set(): raise RpcError('TimeoutError', 'remote method timeout') - + # Check if error occured while execution: if self._calls[msg_id]['error'] is not None: raise RpcError(self._calls[msg_id]['error']['exception'], self._calls[msg_id]['error']['message']) - + response = self._calls[msg_id]['return'] del self._calls[msg_id] - + return response - + def async_call(self, method_name, *args, **kwargs): ''' Make a new asynchronous call on the peer. - + :param method_name: the method to call on the peer :param \*args: the arguments for the call :param \*\*kwargs: the keyword arguments for the call @@ -323,12 +323,12 @@ class RpcConnection(object): def dispatch(self, message): ''' Dispatch a received message according to it type. - + :param message: the received message to dispatch - + .. note:: When dispatching a call, the responsability of response is delegated - to the caller, except for the case where the method isn't found on + to the caller, except for the case where the method isn't found on the handler. ''' @@ -340,7 +340,7 @@ class RpcConnection(object): except KeyError: self.error( message['id'], - 'NameError', + 'NameError', "remote name '%s' is not defined" % message['method'], ) else: @@ -378,11 +378,11 @@ class RpcConnection(object): else: logging.debug('Malformed message received: %s', message) - + def shutdown(self, callback=None): ''' Shutdown this connection. - + :param callback: Name of the callback to call on the handler or a callable to call when the connection is shutdown. :type callback: :class:`str` or callable or None @@ -406,7 +406,7 @@ class RpcConnection(object): else: msg = {'id': cid, 'error': err, 'return': None} self._manager.signal_arrival(msg) - + if callback is not None and not callable(callback): if self._handler is not None: try: @@ -459,16 +459,16 @@ class RpcConnection(object): def set_handler(self, handler): ''' Define a new handler for this connection. - + :param handler: the new handler to define. ''' - + self._handler = handler def get_fd(self): ''' Get the file descriptor of the socket managed by this connection. - + :return: the file descriptor number of the socket ''' try: -- GitLab