Newer
Older
#!/usr/bin/env python
#coding:utf8
'''
This module contains the RpcConnection class, more informations about this
class are located in it docstring.
'''
import ssl
import json
import struct
import socket
import logging
import threading
from sjrpc.utils import BytesBuffer
from sjrpc.core.callers import RpcCaller, ThreadedRpcCaller
from sjrpc.core.exceptions import RpcError
class RpcConnection(object):
'''
This class manage a single peer connection.
'''
RECV_BUF_SIZE = 4096
SEND_BUF_SIZE = 4096
REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}}
RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None}
def __init__(self, sock, manager, handler=None, timeout=None):
# Sock of this connection:
self._sock = sock
# Inbound buffer containing currently readed message while it sending:
self._inbound_buffer = BytesBuffer()
# The size of the currently readed message, None value significate that
# the next message size have to be readed:
self._cur_msg_size = None
# Outbound buffer containing data to write on the socket:
self._outbound_buffer = BytesBuffer()
# Reference to the RPC handler, this attribute can be set to None if
# no handler is defined for this connection (for example, for a client
# connection).
self._handler = handler
# Store all calls sent to the peer. Key is the id of the call and value
# the event to raise when call is finished.
self._calls = {}
# The manager of this connection
self._manager = manager
Antoine Millet
committed
# Is the RpcConnection connected to its peer:
self._connected = True
# The global call timeout setting:
self._call_timeout = timeout
def from_addr(cls, addr, port, manager, enable_ssl=False, timeout=None,
conn_timeout=30.0,cert=None, handler=None):
'''
Construct the instance of :class:`RpcConnection` without providing
the :class:`socket` object. Socket is automatically created and passed
to the standard constructor before to return the new instance.
:param addr: the target ip address
:param port: the target port
:param manager: manager of this connection
:param timeout: the global call timeout setting
:param conn_timeout: the connection operation timeout
:param cert: is SSL is enabled, profile the filename of certificate to
check. If None, don't check certificate.
:param handler: Handler to attach to this :class:`RpcConnection` object
'''
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if enable_ssl:
req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED
sock = ssl.wrap_socket(sock, certfile=None, cert_reqs=req,
ssl_version=ssl.PROTOCOL_TLSv1)
sock.settimeout(conn_timeout)
return cls(sock, manager, handler, timeout=timeout)
Antoine Millet
committed
return '<RpcConnection object>'
def __hash__(self):
return self._sock.__hash__()
def send(self):
'''
Flush the outbound buffer by sending all it content through socket. This
method is usually called by the :class:`ConnectionManager` of
this :class:`RpcConnection`.
'''
data = self._outbound_buffer.pull(RpcConnection.SEND_BUF_SIZE)
while data:
sent = self._sock.send(data)
data = data[sent:]
with self._outbound_buffer:
if not len(self._outbound_buffer):
self._manager.nothing_to_write(self)
Receive data from socket into the inbound buffer.
try:
buf = self._sock.recv(RpcConnection.RECV_BUF_SIZE)
except socket.error as err:
if isinstance(err, socket.error) and err.errno == 11:
elif isinstance(err, ssl.SSLError) and err.errno == 2:
return
else:
raise err
# Empty buffer = closed socket.
while self._process_inbound():
pass
def _process_inbound(self):
'''
Process the inbound buffer and :meth:`dispatch` messages.
:return: False if nothing happened or True
'''
# Read the message length:
if self._cur_msg_size is None and len(self._inbound_buffer) >= 4:
length = struct.unpack('!L', self._inbound_buffer.pull(4))[0]
self._cur_msg_size = length
# Read the message payload:
if (self._cur_msg_size is not None
and len(self._inbound_buffer) >= self._cur_msg_size):
payload = self._inbound_buffer.pull(self._cur_msg_size)
self._cur_msg_size = None
message = json.loads(payload)
logging.debug('Received: %s', message)
return True
return False
def _send(self, message):
'''
Low level method to encode a message in json, calculate it size, and
place result on outbound buffer.
.. warning::
Message must be a jsonisable structure.
'''
Antoine Millet
committed
if not self._connected:
raise RpcError('SendError', 'disconnected from the peer')
logging.debug('Sending: %s', message)
json_msg = json.dumps(message)
size = struct.pack('!L', len(json_msg))
with self._outbound_buffer:
self._outbound_buffer.push(size + json_msg)
self._manager.data_to_write(self)
def _send_call(self, method_name, *args, **kwargs):
'''
Create the message for the call and push them to the outbound queue.
:param method_name: the name of the method to call on the peer
:param *args: arguments to pass to the remote method
:param **kwargs: keyword arguments to pass to the remote method
:return: the generated id for the request
:rtype: :class:`str` object
'''
msg = RpcConnection.REQUEST_MESSAGE.copy()
msg['method'] = method_name
msg['args'] = args
msg['kwargs'] = kwargs
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
self._send(msg)
return msg['id']
def _send_response(self, msg_id, returned=None, error=None):
'''
Low level method to send a response message to the peer.
:param msg_id: the id of the replied message
:param returned: returned data
:type returned: returned data or None if errors have been raised
:param error: raised errors
:type error: raised error or None if no error have been raised
'''
msg = RpcConnection.RESPONSE_MESSAGE.copy()
msg['id'] = msg_id
msg['return'] = returned
msg['error'] = error
self._send(msg)
def response(self, msg_id, returned):
'''
Send an "return" response to the peer.
:param msg_id: the id of the replied message
:param returned: the value returned by the function
.. warning::
In case of raised error, use the :meth:`error` method instead of
this one.
'''
self._send_response(msg_id, returned=returned)
def error(self, msg_id, error, message, traceback=None):
'''
Send an error response to the peer.
:param msg_id: the id of the replied message
:param error: the name of the raised exception
:param message: human readable error for the exception
'''
err = {'exception': error, 'message': message}
self._send_response(msg_id, error=err)
def call(self, method_name, *args, **kwargs):
'''
Make a new remote call on the peer.
:param method_name: the method to call on the peer
:param \*args: the arguments for the call
:param \*\*kwargs: the keyword arguments for the call
:return: the data returned by the peer for the call
.. note::
This function will block until the peer response is received. You
can also specify a ``timeout`` argument to specify a number of
seconds before to raise an :exc:`CallTimeout` exception if the peer
didnt respond.
'''
if '_timeout' in kwargs:
timeout = kwargs['_timeout']
del kwargs['_timeout']
else:
timeout = self._call_timeout
# Send the call to the peer:
msg_id = self._send_call(method_name, *args, **kwargs)
# Create an item in calls dict with reference to the event to raise:
self._calls[msg_id] = {'return': None, 'error': None,
'event': threading.Event()}
# Wait for the response:
self._calls[msg_id]['event'].wait(timeout)
# Check if timeout occured:
if not self._calls[msg_id]['event'].is_set():
raise RpcError('TimeoutError', 'remote method timeout')
# Check if error occured while execution:
if self._calls[msg_id]['error'] is not None:
raise RpcError(self._calls[msg_id]['error']['exception'],
self._calls[msg_id]['error']['message'])
response = self._calls[msg_id]['return']
del self._calls[msg_id]
return response
def async_call(self, method_name, *args, **kwargs):
'''
Make a new asynchronous call on the peer.
:param method_name: the method to call on the peer
:param \*args: the arguments for the call
:param \*\*kwargs: the keyword arguments for the call
:return: the message id of the call
'''
# Extract _data from argument:
if '_data' in kwargs:
data = kwargs['_data']
del kwargs['_data']
else:
data = None
# Send the call to the peer:
msg_id = self._send_call(method_name, *args, **kwargs)
self._calls[msg_id] = {'async': True, 'data': data}
return msg_id
def dispatch(self, message):
'''
Dispatch a received message according to it type.
:param message: the received message to dispatch
.. note::
When dispatching a call, the responsability of response is delegated
to the caller, except for the case where the method isn't found on
the handler.
'''
if set(RpcConnection.REQUEST_MESSAGE) <= set(message):
# Handle call requests from the peer:
if self._handler is not None:
try:
func = self._handler[message['method']]
except KeyError:
self.error(
message['id'],
'NameError',
"remote name '%s' is not defined" % message['method'],
)
else:
if getattr(func, '__threaded__', True):
ThreadedRpcCaller(message, self, func).start()
else:
RpcCaller(message, self, func).start()
Antoine Millet
committed
else:
self.error(
message['id'],
'NameError',
"remote name '%s' is not defined" % message['method'],
)
elif set(RpcConnection.RESPONSE_MESSAGE) <= set(message):
# Handle response message from the peer:
call = self._calls.get(message['id'])
if call is not None:
# Call exists in call list
if message['error'] is None:
call['return'] = message['return']
else:
call['error'] = message['error']
if 'event' in call:
# Release the call if its synchronous:
call['event'].set()
else:
# Else, it's an asynchonous call, so, we need to notify
# manager that the response has been received:
message['data'] = call['data']
self._manager.signal_arrival(message)
# Message will be deleted from the call list by the
# manager while cleanup.
else:
logging.debug('Malformed message received: %s', message)
Antoine Millet
committed
def shutdown(self, callback=None):
Antoine Millet
committed
:param callback: Name of the callback to call on the handler or a
callable to call when the connection is shutdown.
:type callback: :class:`str` or callable or None
Antoine Millet
committed
self._connected = False
try:
self._sock.close()
except socket.error as err:
logging.debug('Error while socket close: %s.', err)
Antoine Millet
committed
# Release all running calls from this connection:
for call in self._calls.values():
if 'event' in call:
call['error'] = {'exception': 'RpcError',
'message': 'Connection reset by peer'}
call['return'] = None
call['event'].set()
Antoine Millet
committed
if callback is not None and not callable(callback):
if self._handler is not None:
try:
callback = self._handler[callback]
except KeyError:
callback = None
Antoine Millet
committed
callback = None
if callback is not None:
try:
callback(self)
except Exception as err:
logging.debug('Error while execution of shutdown '
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def clean_call(self, msg_id):
'''
Delete waiting call by it id in the call list.
:param msg_id: the id of the call to delete
:return: True if call have been deleted or False if it doesn't exists
'''
try:
del self._calls[msg_id]
except KeyError:
return False
else:
return True
def clean_all_calls(self, msg_ids):
'''
Delete all call by their ids in the call list.
:param msg_id: list of call id to delete
'''
for msg_id in msg_ids:
self.clean_call(msg_id)
def get_handler(self):
'''
Return the handler binded to the :class:`RpcConnection`.
:return: binded handler
'''
return self._handler
def set_handler(self, handler):
'''
Define a new handler for this connection.
:param handler: the new handler to define.
'''
self._handler = handler
def get_fd(self):
'''
Get the file descriptor of the socket managed by this connection.
:return: the file descriptor number of the socket
'''
try:
return self._sock.fileno()
except socket.error:
return None