import json import logging from uuid import uuid4 from threading import Event from sjrpc.core.callers import RpcCaller, ThreadedRpcCaller from sjrpc.core.exceptions import RpcError class RpcProtocol(object): REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}} RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None} ''' :param connection: the connection serving this :class:`RpcProtocol` :param label: the label of this :class:`RpcProtocol` instance :param handler: command handler to bind by default :param on_disconnect: callback called when client disconnect :param request_decorator: decorator applied on each handler function :param timeout: global command timeout :param logger: logging module :class:`Logger` instance ''' def __init__(self, connection, label, handler=None, on_disconnect=None, request_decorator=None, timeout=30, logger=None): self._connection = connection self._label = label self._handler = handler self._on_disconnect = on_disconnect self.request_decorator = request_decorator self._call_timeout = timeout if logger is None: logger_name = '%s.protos.%s' % (connection.logger.name, label) self.logger = logging.getLogger(logger_name) else: self.logger = logger # Store all calls sent to the peer. Key is the id of the call and value # the event to raise when call is finished. self._calls = {} @property def connection(self): return self._connection def handle(self, label, size): ''' Decode json, and dispatch message. ''' buf = self._connection.recv_until(size) msg = json.loads(buf) self._dispatch(msg) def shutdown(self): # Release all waiting calls from this rpc: for cid in self._calls.keys(): err = {'exception': 'RpcError', 'message': 'Connection reset by peer'} self._handle_response({'id': cid, 'return': None, 'error': err}) # Execute on_disconnect callback: callback = None if self._on_disconnect is not None and not callable(self._on_disconnect): if self._handler is not None: try: callback = self._handler[self._on_disconnect] except KeyError: self.logger.warn('Shutdown callback not found in current ' 'rpc attached handler, ignoring') callback = None else: self.logger.warn('Shutdown callback specified but no handler ' 'binded on rpc, ignoring') callback = None if callback is not None: try: callback(self._connection) except Exception as err: self.logger.debug('Error while execution of shutdown ' 'callback: %s', err) def get_handler(self): ''' Return the handler binded to the :class:`RpcConnection`. :return: binded handler ''' return self._handler def set_handler(self, handler): ''' Define a new handler for this connection. :param handler: the new handler to define. ''' self._handler = handler def _dispatch(self, message): ''' Dispatch a received message according to it type. :param message: the received message to dispatch .. note:: When dispatching a call, the responsability of response is delegated to the caller, except for the case where the method isn't found on the handler. ''' self.logger.debug('Received: %s', message) if set(RpcProtocol.REQUEST_MESSAGE) <= set(message): self._handle_request(message) elif set(RpcProtocol.RESPONSE_MESSAGE) <= set(message): self._handle_response(message) else: self.logger.debug('Malformed message received: %s', message) def _handle_request(self, message): ''' Handle an inbound request message. ''' if self._handler is not None: try: func = self._handler[message['method']] except KeyError: self.error(message['id'], 'NameError', "remote name '%s' is not defined" % message['method']) else: if getattr(func, '__threaded__', True): ThreadedRpcCaller(message, self, func).start() else: RpcCaller(message, self, func).start() else: self.error(message['id'], 'NameError', "remote name '%s' is not defined" % message['method']) def _handle_response(self, message): ''' Handle an inbound response message ''' # Handle response message from the peer: call = self._calls.get(message['id']) if call is not None: # Call exists in call list if message['error'] is None: call['return'] = message['return'] else: call['error'] = message['error'] if 'event' in call: # Release the call if its synchronous: call['event'].set() else: # Else, it's an asynchonous call, we need to push the answer # on the queue: queue = call['queue'] del call['queue'] queue.put(call) # Finally, delete the call from the current running call list: del self._calls[message['id']] def _send(self, message): ''' Low level method to encode a message in json, calculate it size, and place result on outbound buffer. .. warning:: Message must be a jsonisable structure. ''' #if not self._connected: #FIXME # raise RpcError('SendError', 'disconnected from the peer') self.logger.debug('Sending: %s', message) json_msg = json.dumps(message) self._connection.send(self._label, payload=json_msg) def _send_call(self, method_name, *args, **kwargs): ''' Create the message for the call and push them to the outbound queue. :param method_name: the name of the method to call on the peer :param *args: arguments to pass to the remote method :param **kwargs: keyword arguments to pass to the remote method :return: the generated id for the request :rtype: :class:`str` object ''' msg = RpcProtocol.REQUEST_MESSAGE.copy() msg['method'] = method_name msg['args'] = args msg['kwargs'] = kwargs msg['id'] = str(uuid4()) self._send(msg) return msg['id'] def _send_response(self, msg_id, returned=None, error=None): ''' Low level method to send a response message to the peer. :param msg_id: the id of the replied message :param returned: returned data :type returned: returned data or None if errors have been raised :param error: raised errors :type error: raised error or None if no error have been raised ''' msg = RpcProtocol.RESPONSE_MESSAGE.copy() msg['id'] = msg_id msg['return'] = returned msg['error'] = error self._send(msg) def response(self, msg_id, returned): ''' Send an "return" response to the peer. :param msg_id: the id of the replied message :param returned: the value returned by the function .. warning:: In case of raised error, use the :meth:`error` method instead of this one. ''' self._send_response(msg_id, returned=returned) def error(self, msg_id, error, message, traceback=None): ''' Send an error response to the peer. :param msg_id: the id of the replied message :param error: the name of the raised exception :param message: human readable error for the exception ''' err = {'exception': error, 'message': message} self._send_response(msg_id, error=err) def call(self, method_name, *args, **kwargs): ''' Make a new remote call on the peer. :param method_name: the method to call on the peer :param \*args: the arguments for the call :param \*\*kwargs: the keyword arguments for the call :return: the data returned by the peer for the call .. note:: This function will block until the peer response is received. You can also specify a ``timeout`` argument to specify a number of seconds before to raise an :exc:`CallTimeout` exception if the peer didnt respond. ''' if '_timeout' in kwargs: timeout = kwargs['_timeout'] del kwargs['_timeout'] else: timeout = self._call_timeout # Send the call to the peer: msg_id = self._send_call(method_name, *args, **kwargs) # Create an item in calls dict with reference to the event to raise: call = {'return': None, 'error': None, 'event': Event(), 'id': msg_id} self._calls[msg_id] = call # Wait for the response: call['event'].wait(timeout) # Check if timeout occured: if not call['event'].is_set(): raise RpcError('TimeoutError', 'remote method timeout') # Check if error occured while execution: if call['error'] is not None: raise RpcError(call['error']['exception'], call['error']['message']) return call['return'] def async_call(self, queue, method_name, *args, **kwargs): ''' Make a new asynchronous call on the peer. :param queue: the queue where to push the response when received :param method_name: the method to call on the peer :param _data: local data to give back on the response :param \*args: the arguments for the call :param \*\*kwargs: the keyword arguments for the call :return: the message id of the call ''' # Extract _data from argument: if '_data' in kwargs: data = kwargs['_data'] del kwargs['_data'] else: data = None # Send the call to the peer: msg_id = self._send_call(method_name, *args, **kwargs) # Register the call but don't wait for the response: self._calls[msg_id] = {'id': msg_id, 'async': True, 'data': data, 'queue': queue} return msg_id