From 4eadc7b5018a6077b6f43a9d57405f3e912c8d50 Mon Sep 17 00:00:00 2001 From: Antoine Millet Date: Fri, 9 Sep 2011 14:20:21 +0200 Subject: [PATCH] Rewrited sjRpc with low level channel multiplexage and gevent usage. --- sjrpc/__init__.py | 6 +- sjrpc/client/__init__.py | 6 - sjrpc/client/simple.py | 109 ------ sjrpc/core/__init__.py | 7 +- sjrpc/core/callers.py | 57 ++-- sjrpc/core/connectionmanagers.py | 221 ------------ sjrpc/core/exceptions.py | 19 +- sjrpc/core/protocols/__init__.py | 0 sjrpc/core/protocols/rpc.py | 319 ++++++++++++++++++ sjrpc/core/rpcconnection.py | 553 ++++++++++--------------------- sjrpc/server/__init__.py | 5 +- sjrpc/server/simple.py | 214 ++++++------ sjrpc/utils/__init__.py | 4 +- sjrpc/utils/datastructures.py | 71 ---- sjrpc/utils/handlers.py | 31 +- 15 files changed, 687 insertions(+), 935 deletions(-) delete mode 100644 sjrpc/client/__init__.py delete mode 100644 sjrpc/client/simple.py delete mode 100644 sjrpc/core/connectionmanagers.py create mode 100644 sjrpc/core/protocols/__init__.py create mode 100644 sjrpc/core/protocols/rpc.py delete mode 100644 sjrpc/utils/datastructures.py diff --git a/sjrpc/__init__.py b/sjrpc/__init__.py index db2a98f..4b90c73 100644 --- a/sjrpc/__init__.py +++ b/sjrpc/__init__.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python -#coding:utf8 ''' @@ -11,14 +9,12 @@ The library is separated into four parts: * core library contains all common classes. * server library contains all the server side related stuff. - * client library contains all the client side related stuff. * utils library contains some helpers used in previous libraries. ''' import sjrpc.core import sjrpc.server -import sjrpc.client import sjrpc.utils -__version__ = 13 +__version__ = '14~dev' diff --git a/sjrpc/client/__init__.py b/sjrpc/client/__init__.py deleted file mode 100644 index 209d728..0000000 --- a/sjrpc/client/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -#!/usr/bin/env python -#coding:utf8 - -from sjrpc.client.simple import SimpleRpcClient - -__all__ = ('SimpleRpcClient',) diff --git a/sjrpc/client/simple.py b/sjrpc/client/simple.py deleted file mode 100644 index b599dfa..0000000 --- a/sjrpc/client/simple.py +++ /dev/null @@ -1,109 +0,0 @@ -#!/usr/bin/env python -#coding=utf8 - -import select -import socket -import logging -from sjrpc.core import RpcConnection, ConnectionManager - -class SimpleRpcClient(ConnectionManager): - ''' - Create a new simple RPC client. - - :param connect: the :class:`RpcConnection` object to bind the client manager - :param default_handler: the default handler to bind to the client connection - :param on_disconnect: method on the handler to call when the client - disconnects. - ''' - - def __init__(self, connection, default_handler=None, on_disconnect=None): - super(SimpleRpcClient, self).__init__() - - self._on_disconnect = on_disconnect - self._connection = connection - self._connection.set_handler(default_handler) - self.register(self._connection.get_fd(), self._handle_event) - - @classmethod - def from_addr(cls, addr, port, enable_ssl=False, cert=None, timeout=None, - conn_timeout=30.0, default_handler=None, on_disconnect=None): - ''' - Construct the instance of :class:`SimpleRpcClient` without providing - the :class:`RpcConnection` object. The :class:`RpcConnection` is - automatically created and passed to the standard constructor before to - return the new instance. - - :param addr: the target ip address - :param port: the target port - :param ssl: enable SSL - :param timeout: the global call timeout setting - :param conn_timeout: the connection operation timeout - :param cert: is SSL is enabled, profile the filename of certificate to - check. If None, don't check certificate. - :param default_handler: the default handler to bind to the - client connection - :param on_disconnect: method on the handler to call when the client - disconnects. - ''' - - connection = RpcConnection.from_addr(addr, port, None, timeout=timeout, - conn_timeout=conn_timeout, - enable_ssl=enable_ssl, cert=cert) - client = cls(connection, default_handler=default_handler, - on_disconnect=on_disconnect) - connection._manager = client - return client - - @classmethod - def from_sock(cls, sock, default_handler=None, on_disconnect=None): - ''' - Construct the instance of :class:`SimpleRpcClient` without providing - the :class:`RpcConnection` object. The :class:`RpcConnection` is - automatically created and passed to the standard constructor before to - return the new instance - - :param sock: the socket object to wrap with :class:`RpcConnection` - object. - :param default_handler: the default handler to bind to the - client connection - :param on_disconnect: method on the handler to call when the client - disconnects. - ''' - - connection = RpcConnection(sock, None) - client = cls(connection, default_handler, on_disconnect) - connection._manager = client - return client - - def shutdown(self): - super(SimpleRpcClient, self).shutdown() - self._connection.shutdown(self._on_disconnect) - - def _handle_event(self, fd, events): - if events & select.EPOLLIN: - # Data are ready to be readed on socket - try: - self._connection.receive() - except socket.error as err: - logging.debug('Socket error while receiving from the client ' - 'fd/%s: %s', fd, err) - self.shutdown() - - if events & select.EPOLLOUT: - # Data are ready to be written on socket - try: - self._connection.send() - except socket.error as err: - logging.debug('Socket error while sending to the client ' - 'fd/%s: %s', fd, err) - self.shutdown() - - if events & select.EPOLLHUP: - logging.debug('Socket HUP fd/%s', fd) - self.shutdown() - - def all_connections(self): - return set((self._connection,)) - - def call(self, *args, **kwargs): - return self._connection.call(*args, **kwargs) diff --git a/sjrpc/core/__init__.py b/sjrpc/core/__init__.py index 88485d4..e0ba8af 100644 --- a/sjrpc/core/__init__.py +++ b/sjrpc/core/__init__.py @@ -2,10 +2,9 @@ #coding:utf8 from sjrpc.core.rpcconnection import * -from sjrpc.core.connectionmanagers import * from sjrpc.core.callers import * from sjrpc.core.exceptions import * +from sjrpc.core.async import * -__all__ = ('ConnectionManager', 'RpcConnection', 'RpcCaller', 'RpcError', - 'ThreadedRpcCaller', ) - +__all__ = ('RpcConnection', 'RpcCaller', 'ThreadedRpcCaller', 'RpcError', + 'AsyncWatcher') diff --git a/sjrpc/core/callers.py b/sjrpc/core/callers.py index 785e3de..e837edf 100644 --- a/sjrpc/core/callers.py +++ b/sjrpc/core/callers.py @@ -1,60 +1,75 @@ -#!/usr/bin/env python -#coding:utf8 - import threading +from sjrpc.core.exceptions import RpcError + +ERRMSG_RPCERR = ('Unable to send reply to the peer: %s (this error is usualy ' + 'raised when connection is lost while handler function ' + 'execution)') + + class RpcCaller(object): ''' A caller execute a callable (function, method, class which implement the - :meth:`__call__` method...) in a particular context (threaded or - "timeouted" for example), and return the result (or the exception) to the + :meth:`__call__` method...) in a particular context (threaded or + "timeouted" for example), and return the result (or the exception) to the peer through it :class:`RpcConnection` object. ''' - - def __init__(self, request, connection, func): + + def __init__(self, request, protocol, func): self._request = request - self._connection = connection + self._protocol = protocol self._func = func + # Apply the request decorator + #request_decorator = connection.request_decorator + #if request_decorator is not None: + # self._func = request_decorator(self._func) + def run(self): ''' Run the callable and return the result (or the exception) to the peer. ''' - msg_id = self._request['id'] args = self._request['args'] kwargs = self._request['kwargs'] - - if not getattr(self._func, '__pure__', False): - args.insert(0, self._connection) + if getattr(self._func, '__pass_rpc__', False): + args.insert(0, self._protocol) + if getattr(self._func, '__pass_connection__', False): + args.insert(0, self._protocol.connection) try: returned = self._func(*args, **kwargs) except Exception as err: - self._connection.error(msg_id, message=str(err), - error=err.__class__.__name__) + try: + self._protocol.error(msg_id, message=str(err), + error=err.__class__.__name__) + except RpcError as err: + self._protocol.connection.logger.error(ERRMSG_RPCERR, err) else: - self._connection.response(msg_id, returned=returned) - + try: + self._protocol.response(msg_id, returned=returned) + except RpcError as err: + self._protocol.connection.logger.error(ERRMSG_RPCERR, err) + def start(self): ''' - Start execution of the callable, the most of time, it just call + Start execution of the callable, the most of time, it just call :meth:`run` method. ''' - self.run() class ThreadedRpcCaller(RpcCaller): - + ''' A caller which make the call into a separated thread. ''' - + def __init__(self, *args, **kwargs): super(ThreadedRpcCaller, self).__init__(*args, **kwargs) self._thread = threading.Thread(target=self.run) + self._thread.name = 'Processing of call: %s' % self._request['id'] self._thread.daemon = True - + def start(self): self._thread.start() diff --git a/sjrpc/core/connectionmanagers.py b/sjrpc/core/connectionmanagers.py deleted file mode 100644 index 0568213..0000000 --- a/sjrpc/core/connectionmanagers.py +++ /dev/null @@ -1,221 +0,0 @@ -#!/usr/bin/env python -#coding:utf8 - -import select -import threading - -class ConnectionManager(object): - ''' - Base class for all connection manager classes. - ''' - - # The timeout to wait before the poll call release the hand with no events: - POLL_TIMEOUT = 1 - - # Masks for fd registration on poll object: - MASK_NORMAL = (select.EPOLLIN | select.EPOLLPRI | - select.EPOLLERR | select.EPOLLHUP) - MASK_WRITABLE = MASK_NORMAL | select.EPOLLOUT - - def __init__(self): - self._poll = select.epoll() - self._running = True - self._received_msg = {} - self._wait_groups = {} - self._poll_callbacks = {} - - def register(self, fd, callback, *args, **kwargs): - ''' - Register an fd on the poll object with the specified callback. The - callback will be called each time poller drop an event for the specified - fd. Extra args will be passed to the callback after fd and events. - - :param fd: the fd to register - :param callback: the callable to use on event - :param *args, **kwargs: extra arguments passed to the callback - ''' - - if hasattr(fd, 'fileno'): - fd = fd.fileno() - self._poll_callbacks[fd] = {'func': callback, - 'extra': args, - 'kwextra': kwargs} - self._poll.register(fd, ConnectionManager.MASK_NORMAL) - - def unregister(self, fd): - ''' - Unregister the specified fd from the manager. - - :param fd: the fd to unregister. - ''' - - self._poll.unregister(fd) - del self._poll_callbacks[fd] - - def is_running(self): - return self._running - - def run(self): - ''' - Run the main loop of the :class:`ConnectionManager`. It will catch - events on registered :class:`RpcConnection` and process them. - ''' - - while self._running: - try: - events = self._poll.poll(ConnectionManager.POLL_TIMEOUT) - except IOError: - pass - else: - for fd, event in events: - if fd in self._poll_callbacks: - cb = self._poll_callbacks[fd] - cb['func'](fd, event, *cb['extra'], **cb['kwextra']) - - def start(self, daemonize=False): - ''' - Run the main loop in a separated thread. - - :param daemonize: set the thread daemon state - ''' - - t = threading.Thread(target=self.run) - t.daemon = daemonize - t.start() - - def wait(self, msg_id_set, timeout=None, wait_all=True): - ''' - Wait for the asynchronous messages in ``msg_id_set``. - - When the timeout argument is present and not ``None``, it should be a - floating point number specifying a timeout for the operation in - seconds (or fractions thereof). - - You can also set ``wait_all`` to False if you want to unlock the call - when the first response is received. - - :param msg_id_set: set of message to wait - :type msg_id_set: :class:`frozenset` - :param timeout: timeout value or None to disable timeout (default: None) - :type timeout: :class:`int` or :class:`None` - :param wait_all: wait for all messages (default: True) - :type wait_all: :class:`bool` - - .. warning: - This is important that ``msg_id_set`` is a :class:`frozenset` - and not a :class:`set`. - ''' - - waiter = {'event': threading.Event(), 'wait_all': wait_all} - self._wait_groups.setdefault(msg_id_set, waiter) - already_completed = self._check_waiter(msg_id_set) - - if not already_completed: - waiter['event'].wait(timeout=timeout) - - # Clean the call list on each attached RpcConnection - for connection in self.all_connections(): - connection.clean_all_calls(msg_id_set) - - # Get the messages: - messages = [] - - for msg_id, msg in self._received_msg.items(): - if msg_id in msg_id_set: - messages.append(msg) - del self._received_msg[msg_id] - waiter['responses'] = tuple(messages) - - messages = waiter['responses'] - del self._wait_groups[msg_id_set] - - return messages - - def signal_arrival(self, message): - ''' - Signal the arrival of a new message to the :class:`ConnectionManager`. - This method is ordinary called by the :class:`RpcConnections` objects, - when a response to an asynchronous call is received. - - :param message: the message received - ''' - - self._received_msg[message['id']] = message - for waitset in self._wait_groups.keys(): - self._check_waiter(waitset) - - def _check_waiter(self, waitset): - ''' - Check if a waitset is completed and process it. - - :param waitset: the waitset to check - :return: True if waitset is completed else None - ''' - - # Make a set of received messages ids: - recv_msg = set(self._received_msg) - - try: - waiter = self._wait_groups[waitset] - except KeyError: - return False - - is_ok = (waiter['wait_all'] and waitset <= recv_msg - or not waiter['wait_all'] and not recv_msg.isdisjoint(waitset)) - - if is_ok: - # Unlock the event: - waiter['event'].set() - - return True - else: - return False - - def all_connections(self): - ''' - Return all connection attached to this :class:`ConnectionManager`. - - :return: a set of :class:`RpcConnection` attached - to this :class:`ConnectionManager` - ''' - - raise NotImplementedError - - def shutdown(self): - ''' - Shutdown the manager properly. - ''' - - self._running = False - - def data_to_write(self, fd): - ''' - Method called by a connection to inform the manager that it have data - to send. - - :param connection: the fd which have data to write - ''' - - if fd is not None: - self._poll.modify(fd, ConnectionManager.MASK_WRITABLE) - - def nothing_to_write(self, fd): - ''' - Method called by a connection to inform the manager that it have no - more data to send. - - :param fd: the fd which have no more data to write - ''' - - if fd is not None: - self._poll.modify(fd, ConnectionManager.MASK_NORMAL) - - def handle_event(self, fd, event): - ''' - Handle an event and make associated action. This is an abstract method to - overload on derived classes. - - :param fd: the fd that have generated the event - :param event: the event as returned by the poller object - ''' - pass diff --git a/sjrpc/core/exceptions.py b/sjrpc/core/exceptions.py index f1c8742..629e09d 100644 --- a/sjrpc/core/exceptions.py +++ b/sjrpc/core/exceptions.py @@ -1,16 +1,25 @@ -#!/usr/bin/env python -#coding:utf8 + +''' +Contains sjRpc exceptions. +''' class RpcError(Exception): - + ''' Exception raised by caller when an error occurs while execution of remote procedure call. ''' - + def __init__(self, exception, message): self.exception = exception self.message = message - + def __str__(self): return '%s' % self.message + + +class SocketRpcError(Exception): + + ''' + Exception used internally to raise a socket fault. + ''' diff --git a/sjrpc/core/protocols/__init__.py b/sjrpc/core/protocols/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sjrpc/core/protocols/rpc.py b/sjrpc/core/protocols/rpc.py new file mode 100644 index 0000000..d6e33dc --- /dev/null +++ b/sjrpc/core/protocols/rpc.py @@ -0,0 +1,319 @@ +import json +import logging +from uuid import uuid4 + +from threading import Event + +from sjrpc.core.callers import RpcCaller, ThreadedRpcCaller +from sjrpc.core.exceptions import RpcError + + +class RpcProtocol(object): + + REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}} + RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None} + + ''' + :param connection: the connection serving this :class:`RpcProtocol` + :param label: the label of this :class:`RpcProtocol` instance + :param handler: command handler to bind by default + :param on_disconnect: callback called when client disconnect + :param request_decorator: decorator applied on each handler function + :param timeout: global command timeout + :param logger: logging module :class:`Logger` instance + ''' + + def __init__(self, connection, label, handler=None, on_disconnect=None, + request_decorator=None, timeout=30, logger=None): + self._connection = connection + self._label = label + self._handler = handler + self._on_disconnect = on_disconnect + self.request_decorator = request_decorator + self._call_timeout = timeout + if logger is None: + logger_name = '%s.protos.%s' % (connection.logger.name, label) + self.logger = logging.getLogger(logger_name) + else: + self.logger = logger + + # Store all calls sent to the peer. Key is the id of the call and value + # the event to raise when call is finished. + self._calls = {} + + @property + def connection(self): + return self._connection + + def handle(self, label, size): + ''' + Decode json, and dispatch message. + ''' + buf = self._connection.recv_until(size) + msg = json.loads(buf) + self._dispatch(msg) + + def shutdown(self): + # Release all waiting calls from this rpc: + for cid, call in self._calls.iteritems(): + err = {'exception': 'RpcError', + 'message': 'Connection reset by peer'} + if 'event' in call: + call['error'] = err + call['return'] = None + call['event'].set() + else: + msg = {'id': cid, 'error': err, 'return': None} + self._manager.signal_arrival(msg) + + # Execute on_disconnect callback: + callback = None + if self._on_disconnect is not None and not callable(self._on_disconnect): + if self._handler is not None: + try: + callback = self._handler[self._on_disconnect] + except KeyError: + self.logger.warn('Shutdown callback not found in current ' + 'rpc attached handler, ignoring') + callback = None + else: + self.logger.warn('Shutdown callback specified but no handler ' + 'binded on rpc, ignoring') + callback = None + + if callback is not None: + try: + callback(self._connection) + except Exception as err: + self.logger.debug('Error while execution of shutdown ' + 'callback: %s', err) + + def get_handler(self): + ''' + Return the handler binded to the :class:`RpcConnection`. + + :return: binded handler + ''' + return self._handler + + def set_handler(self, handler): + ''' + Define a new handler for this connection. + + :param handler: the new handler to define. + ''' + self._handler = handler + + def _dispatch(self, message): + ''' + Dispatch a received message according to it type. + + :param message: the received message to dispatch + + .. note:: + When dispatching a call, the responsability of response is delegated + to the caller, except for the case where the method isn't found on + the handler. + ''' + + self.logger.debug('Received: %s', message) + if set(RpcProtocol.REQUEST_MESSAGE) <= set(message): + self._handle_request(message) + elif set(RpcProtocol.RESPONSE_MESSAGE) <= set(message): + self._handle_response(message) + else: + self.logger.debug('Malformed message received: %s', message) + + def _handle_request(self, message): + ''' + Handle an inbound request message. + ''' + if self._handler is not None: + try: + func = self._handler[message['method']] + except KeyError: + self.error(message['id'], 'NameError', + "remote name '%s' is not defined" % message['method']) + else: + if getattr(func, '__threaded__', True): + ThreadedRpcCaller(message, self, func).start() + else: + RpcCaller(message, self, func).start() + else: + self.error(message['id'], 'NameError', + "remote name '%s' is not defined" % message['method']) + + def _handle_response(self, message): + ''' + Handle an inbound response message + ''' + # Handle response message from the peer: + call = self._calls.get(message['id']) + if call is not None: + # Call exists in call list + if message['error'] is None: + call['return'] = message['return'] + else: + call['error'] = message['error'] + + if 'event' in call: + # Release the call if its synchronous: + call['event'].set() + else: + # Else, it's an asynchonous call, we need to push the answer + # on the queue: + queue = call['queue'] + del call['queue'] + queue.put(call) + + # Finally, delete the call from the current running call list: + del self._calls[message['id']] + + def _send(self, message): + ''' + Low level method to encode a message in json, calculate it size, and + place result on outbound buffer. + + .. warning:: + Message must be a jsonisable structure. + ''' + + #if not self._connected: #FIXME + # raise RpcError('SendError', 'disconnected from the peer') + + self.logger.debug('Sending: %s', message) + json_msg = json.dumps(message) + self._connection.send(self._label, payload=json_msg) + + def _send_call(self, method_name, *args, **kwargs): + ''' + Create the message for the call and push them to the outbound queue. + + :param method_name: the name of the method to call on the peer + :param *args: arguments to pass to the remote method + :param **kwargs: keyword arguments to pass to the remote method + :return: the generated id for the request + :rtype: :class:`str` object + ''' + + msg = RpcProtocol.REQUEST_MESSAGE.copy() + msg['method'] = method_name + msg['args'] = args + msg['kwargs'] = kwargs + msg['id'] = str(uuid4()) + + self._send(msg) + + return msg['id'] + + def _send_response(self, msg_id, returned=None, error=None): + ''' + Low level method to send a response message to the peer. + + :param msg_id: the id of the replied message + :param returned: returned data + :type returned: returned data or None if errors have been raised + :param error: raised errors + :type error: raised error or None if no error have been raised + ''' + + msg = RpcProtocol.RESPONSE_MESSAGE.copy() + msg['id'] = msg_id + msg['return'] = returned + msg['error'] = error + + self._send(msg) + + def response(self, msg_id, returned): + ''' + Send an "return" response to the peer. + + :param msg_id: the id of the replied message + :param returned: the value returned by the function + + .. warning:: + In case of raised error, use the :meth:`error` method instead of + this one. + ''' + + self._send_response(msg_id, returned=returned) + + def error(self, msg_id, error, message, traceback=None): + ''' + Send an error response to the peer. + + :param msg_id: the id of the replied message + :param error: the name of the raised exception + :param message: human readable error for the exception + ''' + + err = {'exception': error, 'message': message} + self._send_response(msg_id, error=err) + + def call(self, method_name, *args, **kwargs): + ''' + Make a new remote call on the peer. + + :param method_name: the method to call on the peer + :param \*args: the arguments for the call + :param \*\*kwargs: the keyword arguments for the call + :return: the data returned by the peer for the call + + .. note:: + This function will block until the peer response is received. You + can also specify a ``timeout`` argument to specify a number of + seconds before to raise an :exc:`CallTimeout` exception if the peer + didnt respond. + ''' + + if '_timeout' in kwargs: + timeout = kwargs['_timeout'] + del kwargs['_timeout'] + else: + timeout = self._call_timeout + + # Send the call to the peer: + msg_id = self._send_call(method_name, *args, **kwargs) + + # Create an item in calls dict with reference to the event to raise: + call = {'return': None, 'error': None, 'event': Event(), 'id': msg_id} + self._calls[msg_id] = call + + # Wait for the response: + call['event'].wait(timeout) + + # Check if timeout occured: + if not call['event'].is_set(): + raise RpcError('TimeoutError', 'remote method timeout') + + # Check if error occured while execution: + if call['error'] is not None: + raise RpcError(call['error']['exception'], + call['error']['message']) + + return call['return'] + + def async_call(self, queue, method_name, *args, **kwargs): + ''' + Make a new asynchronous call on the peer. + + :param queue: the queue where to push the response when received + :param method_name: the method to call on the peer + :param _data: local data to give back on the response + :param \*args: the arguments for the call + :param \*\*kwargs: the keyword arguments for the call + :return: the message id of the call + ''' + + # Extract _data from argument: + if '_data' in kwargs: + data = kwargs['_data'] + del kwargs['_data'] + else: + data = None + # Send the call to the peer: + msg_id = self._send_call(method_name, *args, **kwargs) + # Register the call but don't wait for the response: + self._calls[msg_id] = {'id': msg_id, 'async': True, + 'data': data, 'queue': queue} + return msg_id diff --git a/sjrpc/core/rpcconnection.py b/sjrpc/core/rpcconnection.py index 2b7371f..8a9c910 100644 --- a/sjrpc/core/rpcconnection.py +++ b/sjrpc/core/rpcconnection.py @@ -1,72 +1,57 @@ -#!/usr/bin/env python -#coding:utf8 ''' This module contains the RpcConnection class, more informations about this class are located in it docstring. ''' +from __future__ import absolute_import + import ssl -import json import struct import socket import logging -import threading -from uuid import uuid4 -from sjrpc.utils import BytesBuffer -from sjrpc.core.callers import RpcCaller, ThreadedRpcCaller -from sjrpc.core.exceptions import RpcError +from sjrpc.core.protocols.rpc import RpcProtocol +from sjrpc.core.exceptions import RpcError, SocketRpcError class RpcConnection(object): ''' This class manage a single peer connection. + + :param sock: the socket object of this newly created :class:`RpcConnection` + :param *args, **kwargs: arguments to pass to the default rpc protocol + automatically registered on label 0. ''' - RECV_BUF_SIZE = 4096 - SEND_BUF_SIZE = 4096 - REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}} - RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None} + MESSAGE_HEADER = '!HL' + SHORTCUTS_MAINRPC = ('call', 'async_call') - def __init__(self, sock, manager, handler=None, timeout=None): + def __init__(self, sock, *args, **kwargs): + #super(RpcConnection, self).__init__() # Sock of this connection: self._sock = sock + self._sock.settimeout(None) # Activate TCP keepalive on the connection: - self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - - # Inbound buffer containing currently readed message while it sending: - self._inbound_buffer = BytesBuffer() - - # The size of the currently readed message, None value significate that - # the next message size have to be readed: - self._cur_msg_size = None - - # Outbound buffer containing data to write on the socket: - self._outbound_buffer = BytesBuffer() - - # Reference to the RPC handler, this attribute can be set to None if - # no handler is defined for this connection (for example, for a client - # connection). - self._handler = handler - - # Store all calls sent to the peer. Key is the id of the call and value - # the event to raise when call is finished. - self._calls = {} - - # The manager of this connection - self._manager = manager + #self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # Is the RpcConnection connected to its peer: self._connected = True - # The global call timeout setting: - self._call_timeout = timeout + # Setup self.logger.facility: + self.logger = logging.getLogger('sjrpc.%s' % self.getpeername()) + + # Protocols registered on this connection: + self._protocols = {} + self.register_protocol(0, RpcProtocol, *args, **kwargs) + + # Create shortcuts to the main rpc (protocol 0) for convenience: + for name in RpcConnection.SHORTCUTS_MAINRPC: + setattr(self, name, getattr(self.get_protocol(0), name)) @classmethod - def from_addr(cls, addr, port, manager, enable_ssl=False, timeout=None, - conn_timeout=30.0, cert=None, handler=None): + def from_addr(cls, addr, port, conn_timeout=30.0, *args, **kwargs): ''' Construct the instance of :class:`RpcConnection` without providing the :class:`socket` object. Socket is automatically created and passed @@ -74,25 +59,30 @@ class RpcConnection(object): :param addr: the target ip address :param port: the target port - :param manager: manager of this connection - :param enable_ssl: enable SSL - :param timeout: the global call timeout setting :param conn_timeout: the connection operation timeout - :param cert: is SSL is enabled, profile the filename of certificate to - check. If None, don't check certificate. - :param handler: Handler to attach to this :class:`RpcConnection` object + :param *args, **kwargs: extra argument to pass to the constructor (see + constructor doctring) ''' - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - if enable_ssl: - req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED - sock = ssl.wrap_socket(sock, certfile=None, cert_reqs=req, - ssl_version=ssl.PROTOCOL_TLSv1) sock.settimeout(conn_timeout) sock.connect((addr, port)) - sock.setblocking(False) - return cls(sock, manager, handler, timeout=timeout) + return cls(sock, *args, **kwargs) + + @classmethod + def from_addr_ssl(cls, addr, port, cert=None, + conn_timeout=30, *args, **kwargs): + ''' + Construct :class:`RpcConnection` instance like :meth:`from_addr`, but + enable ssl on socket. + + :param cert: ssl certificate or None for ssl without certificat + ''' + connection = cls.from_addr(addr, port, conn_timeout, *args, **kwargs) + req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED + connection._sock = ssl.wrap_socket(connection._sock, certfile=cert, + cert_reqs=req, + ssl_version=ssl.PROTOCOL_TLSv1) + return connection def __repr__(self): return '' @@ -100,352 +90,102 @@ class RpcConnection(object): def __hash__(self): return self._sock.__hash__() - def send(self): - ''' - Flush the outbound buffer by sending all it content through socket. This - method is usually called by the :class:`ConnectionManager` of - this :class:`RpcConnection`. - ''' - - data = self._outbound_buffer.pull(RpcConnection.SEND_BUF_SIZE) - - while data: - sent = self._sock.send(data) - data = data[sent:] - - with self._outbound_buffer: - if not len(self._outbound_buffer): - self._manager.nothing_to_write(self.get_fd()) - - def receive(self): + def run(self): ''' - Receive data from socket into the inbound buffer. + Inbound message processing loop. ''' + while self._connected: + try: + self._dispatch() + except SocketRpcError: + # If SocketRpcError occurs while dispatching, shutdown the + # connection if it not already shutdown: + if self._connected: + self.shutdown() - try: - buf = self._sock.recv(RpcConnection.RECV_BUF_SIZE) - except socket.error as err: - if isinstance(err, socket.error) and err.errno == 11: - return - elif isinstance(err, ssl.SSLError) and err.errno == 2: - return - else: - raise err - - if not buf: - # Empty buffer = closed socket. - raise socket.error() - - self._inbound_buffer.push(buf) - - while self._process_inbound(): - pass - - def _process_inbound(self): + def _dispatch(self): ''' - Process the inbound buffer and :meth:`dispatch` messages. - - :return: False if nothing happened or True + Read next message from socket and dispatch it to accoding protocol + handler. ''' - # Read the message length: - if self._cur_msg_size is None and len(self._inbound_buffer) >= 4: - length = struct.unpack('!L', self._inbound_buffer.pull(4))[0] - self._cur_msg_size = length - return True - - # Read the message payload: - if (self._cur_msg_size is not None - and len(self._inbound_buffer) >= self._cur_msg_size): - payload = self._inbound_buffer.pull(self._cur_msg_size) - self._cur_msg_size = None - message = json.loads(payload) - logging.debug('Received: %s', message) - self.dispatch(message) - return True - - return False + # Read the header: + buf = self.recv_until(struct.calcsize(RpcConnection.MESSAGE_HEADER)) + label, pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER, buf) + # Get the registered protocol for the specified label + proto = self._protocols.get(label) + if proto is not None: + proto.handle(label, pl_size) - def _send(self, message): + def send(self, label, payload): ''' - Low level method to encode a message in json, calculate it size, and - place result on outbound buffer. - - .. warning:: - Message must be a jsonisable structure. + Low level method to send a message through the socket, generally + used by protocols. ''' - if not self._connected: - raise RpcError('SendError', 'disconnected from the peer') - - logging.debug('Sending: %s', message) - json_msg = json.dumps(message) - size = struct.pack('!L', len(json_msg)) - with self._outbound_buffer: - self._outbound_buffer.push(size + json_msg) - self._manager.data_to_write(self.get_fd()) - - def _send_call(self, method_name, *args, **kwargs): - ''' - Create the message for the call and push them to the outbound queue. - - :param method_name: the name of the method to call on the peer - :param *args: arguments to pass to the remote method - :param **kwargs: keyword arguments to pass to the remote method - :return: the generated id for the request - :rtype: :class:`str` object - ''' - - msg = RpcConnection.REQUEST_MESSAGE.copy() - msg['method'] = method_name - msg['args'] = args - msg['kwargs'] = kwargs - msg['id'] = str(uuid4()) - - self._send(msg) - - return msg['id'] - - def _send_response(self, msg_id, returned=None, error=None): - ''' - Low level method to send a response message to the peer. - - :param msg_id: the id of the replied message - :param returned: returned data - :type returned: returned data or None if errors have been raised - :param error: raised errors - :type error: raised error or None if no error have been raised - ''' - - msg = RpcConnection.RESPONSE_MESSAGE.copy() - msg['id'] = msg_id - msg['return'] = returned - msg['error'] = error - - self._send(msg) - - def response(self, msg_id, returned): - ''' - Send an "return" response to the peer. - - :param msg_id: the id of the replied message - :param returned: the value returned by the function - - .. warning:: - In case of raised error, use the :meth:`error` method instead of - this one. - ''' + raise RpcError('RpcError', 'Not connected to the peer') + size = len(payload) + header = struct.pack(RpcConnection.MESSAGE_HEADER, label, size) + try: + self._sock.sendall(header + payload) + except socket.error as err: + errmsg = 'Fatal error while sending through socket: %s' % err + self.logger.error(errmsg) + raise RpcError('SocketError', errmsg) - self._send_response(msg_id, returned=returned) +# +# Public API +# - def error(self, msg_id, error, message, traceback=None): + def register_protocol(self, label, protocol_class, *args, **kwargs): ''' - Send an error response to the peer. - - :param msg_id: the id of the replied message - :param error: the name of the raised exception - :param message: human readable error for the exception + Register a new protocol for the specified label. ''' - err = {'exception': error, 'message': message} - self._send_response(msg_id, error=err) + if label in self._protocols: + raise KeyError('A protocol is already registered for this label') + elif not isinstance(label, int): + raise ValueError('Label must be an integer') + self._protocols[label] = protocol_class(self, label, *args, **kwargs) + return self._protocols[label] - def call(self, method_name, *args, **kwargs): + def unregister_protocol(self, label): ''' - Make a new remote call on the peer. - - :param method_name: the method to call on the peer - :param \*args: the arguments for the call - :param \*\*kwargs: the keyword arguments for the call - :return: the data returned by the peer for the call - - .. note:: - This function will block until the peer response is received. You - can also specify a ``timeout`` argument to specify a number of - seconds before to raise an :exc:`CallTimeout` exception if the peer - didnt respond. + Unregister the specified protocol label for this connection. ''' - if '_timeout' in kwargs: - timeout = kwargs['_timeout'] - del kwargs['_timeout'] + if label in self._protocols and label != 0: + del self._protocols[label] else: - timeout = self._call_timeout + raise KeyError('No protocol registered for this label') - # Send the call to the peer: - msg_id = self._send_call(method_name, *args, **kwargs) - - # Create an item in calls dict with reference to the event to raise: - self._calls[msg_id] = {'return': None, 'error': None, - 'event': threading.Event()} - - # Wait for the response: - self._calls[msg_id]['event'].wait(timeout) - - # Check if timeout occured: - if not self._calls[msg_id]['event'].is_set(): - raise RpcError('TimeoutError', 'remote method timeout') - - # Check if error occured while execution: - if self._calls[msg_id]['error'] is not None: - raise RpcError(self._calls[msg_id]['error']['exception'], - self._calls[msg_id]['error']['message']) - - response = self._calls[msg_id]['return'] - del self._calls[msg_id] - - return response - - def async_call(self, method_name, *args, **kwargs): + def get_protocol(self, label): ''' - Make a new asynchronous call on the peer. - - :param method_name: the method to call on the peer - :param \*args: the arguments for the call - :param \*\*kwargs: the keyword arguments for the call - :return: the message id of the call + Get the protocol registered for specified label. ''' - # Extract _data from argument: - if '_data' in kwargs: - data = kwargs['_data'] - del kwargs['_data'] - else: - data = None - - # Send the call to the peer: - msg_id = self._send_call(method_name, *args, **kwargs) - - self._calls[msg_id] = {'async': True, 'data': data} - - return msg_id - - def dispatch(self, message): - ''' - Dispatch a received message according to it type. - - :param message: the received message to dispatch - - .. note:: - When dispatching a call, the responsability of response is delegated - to the caller, except for the case where the method isn't found on - the handler. - ''' - - if set(RpcConnection.REQUEST_MESSAGE) <= set(message): - # Handle call requests from the peer: - if self._handler is not None: - try: - func = self._handler[message['method']] - except KeyError: - self.error( - message['id'], - 'NameError', - "remote name '%s' is not defined" % message['method'], - ) - else: - if getattr(func, '__threaded__', True): - ThreadedRpcCaller(message, self, func).start() - else: - RpcCaller(message, self, func).start() - else: - self.error( - message['id'], - 'NameError', - "remote name '%s' is not defined" % message['method'], - ) - elif set(RpcConnection.RESPONSE_MESSAGE) <= set(message): - # Handle response message from the peer: - call = self._calls.get(message['id']) - if call is not None: - # Call exists in call list - if message['error'] is None: - call['return'] = message['return'] - else: - call['error'] = message['error'] - - if 'event' in call: - # Release the call if its synchronous: - call['event'].set() - else: - # Else, it's an asynchonous call, so, we need to notify - # manager that the response has been received: - message['data'] = call['data'] - self._manager.signal_arrival(message) - - # Message will be deleted from the call list by the - # manager while cleanup. - - else: - logging.debug('Malformed message received: %s', message) + proto = self._protocols.get(label) + if proto is None: + raise KeyError('No protocol registered for this label') + return proto - def shutdown(self, callback=None): + def shutdown(self): ''' Shutdown this connection. - - :param callback: Name of the callback to call on the handler or a - callable to call when the connection is shutdown. - :type callback: :class:`str` or callable or None ''' - self._connected = False + # Shutdown each registered protocols: + for proto in self._protocols.itervalues(): + proto.shutdown() + # Close the connection socket: + self._connected = False try: + self._sock.shutdown(socket.SHUT_RDWR) self._sock.close() except socket.error as err: - logging.debug('Error while socket close: %s.', err) - - # Release all running calls from this connection: - for cid, call in self._calls.iteritems(): - err = {'exception': 'RpcError', - 'message': 'Connection reset by peer'} - if 'event' in call: - call['error'] = err - call['return'] = None - call['event'].set() - else: - msg = {'id': cid, 'error': err, 'return': None} - self._manager.signal_arrival(msg) - - if callback is not None and not callable(callback): - if self._handler is not None: - try: - callback = self._handler[callback] - except KeyError: - callback = None - else: - callback = None - - if callback is not None: - try: - callback(self) - except Exception as err: - logging.debug('Error while execution of shutdown ' - 'callback: %s', err) - - def clean_call(self, msg_id): - ''' - Delete waiting call by it id in the call list. - - :param msg_id: the id of the call to delete - :return: True if call have been deleted or False if it doesn't exists - ''' - try: - del self._calls[msg_id] - except KeyError: - return False - else: - return True - - def clean_all_calls(self, msg_ids): - ''' - Delete all call by their ids in the call list. - - :param msg_id: list of call id to delete - ''' - - for msg_id in msg_ids: - self.clean_call(msg_id) + #self.logger.warn('Error while socket close: %s', err) + pass def get_handler(self): ''' @@ -453,7 +193,6 @@ class RpcConnection(object): :return: binded handler ''' - return self._handler def set_handler(self, handler): @@ -462,7 +201,6 @@ class RpcConnection(object): :param handler: the new handler to define. ''' - self._handler = handler def get_fd(self): @@ -482,6 +220,81 @@ class RpcConnection(object): :return: string representing the peer name ''' - return '%s:%s' % self._sock.getpeername() + def recv_until(self, bufsize, flags=None): + ''' + Read socket until bufsize is received. + ''' + buf = '' + while len(buf) < bufsize: + remains = bufsize - len(buf) + try: + received = self._sock.recv(remains) + except socket.error as err: + if not self._connected: + raise SocketRpcError('Not connected to the peer') + errmsg = 'Fatal error while receiving from socket: %s' % err + self.logger.error(errmsg) + raise SocketRpcError(errmsg) + + # Handle peer disconnection: + if not received: + self.logger.info('Connection reset by peer') + self.shutdown() + + buf += received + return buf + + +class GreenRpcConnection(RpcConnection): + + ''' + Cooperative RpcConnection to use with Gevent. + ''' + + def __init__(self, *args, **kwargs): + super(GreenRpcConnection, self).__init__(*args, **kwargs) + self._greenlet = None + + @classmethod + def from_addr(cls, addr, port, conn_timeout=30.0, *args, **kwargs): + ''' + Construct the instance of :class:`RpcConnection` without providing + the :class:`socket` object. Socket is automatically created and passed + to the standard constructor before to return the new instance. + + :param addr: the target ip address + :param port: the target port + :param conn_timeout: the connection operation timeout + :param *args, **kwargs: extra argument to pass to the constructor (see + constructor doctring) + ''' + import gevent.socket + sock = gevent.socket.create_connection((addr, port), conn_timeout) + return cls(sock, *args, **kwargs) + + @classmethod + def from_addr_ssl(cls, addr, port, cert, conn_timeout=30, *args, **kwargs): + ''' + Construct :class:`RpcConnection` instance like :meth:`from_addr`, but + enable ssl on socket. + + :param cert: ssl certificate or None for ssl without certificat + ''' + import gevent.socket + sock = gevent.socket.create_connection((addr, port), conn_timeout) + req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED + sock = gevent.ssl.SSLSocket(sock, certfile=None, cert_reqs=req, + ssl_version=ssl.PROTOCOL_TLSv1) + return cls(sock, *args, **kwargs) + + def run(self): + import gevent + self._greenlet = gevent.spawn(self.run) + self._greenlet.join() + + def shutdown(self): + super(GreenRpcConnection, self).shutdown() + if self._greenlet is not None: + self._greenlet.kill() diff --git a/sjrpc/server/__init__.py b/sjrpc/server/__init__.py index e08285e..09f2cdd 100644 --- a/sjrpc/server/__init__.py +++ b/sjrpc/server/__init__.py @@ -1,7 +1,8 @@ #!/usr/bin/env python #coding:utf8 -from sjrpc.server.simple import * +from __future__ import absolute_import -__all__ = ('SimpleRpcServer', 'SimpleSslRpcServer') +from sjrpc.server.simple import GreenRpcServer, SSLGreenRpcServer +__all__ = ('GreenRpcServer', 'SSLGreenRpcServer') diff --git a/sjrpc/server/simple.py b/sjrpc/server/simple.py index 190f4d1..1bdf9c9 100644 --- a/sjrpc/server/simple.py +++ b/sjrpc/server/simple.py @@ -1,130 +1,112 @@ -#!/usr/bin/env python -#coding=utf8 import ssl import time import socket import select import logging -from sjrpc.core import RpcConnection, ConnectionManager +from sjrpc.core import RpcConnection, GreenRpcConnection + + +class RpcServer(object): + + ''' + Base class for all RpcServer classes. + ''' + + def __init__(self): + self._clients = set() + self.logger = logging.getLogger('sjrpc') + + def register(self, conn): + ''' + Register a new connection on this server. + + :param conn: the connection to register. + ''' + self._clients.add(conn) + + def unregister(self, conn, shutdown=False): + ''' + Unregister the specified client from this server. If shutdown is + specified, client is shutdown before to be unregistered. + + :param conn: the connection to unregister + :param shutdown: shutdown or not the connection before to register + ''' + if conn in self._clients: + if shutdown: + conn.shutdown() + self._clients.remove(conn) + + def run(self): + raise NotImplementedError('You must use a sub-class of RpcServer.') + + def shutdown(self): + ''' + Shutdown the :class:`RpcServer` instance. + ''' + self.logger.info('Shutdown requested') + for client in self._clients.copy(): + self.unregister(client, shutdown=True) + + +class GreenRpcServer(RpcServer): -class SimpleRpcServer(ConnectionManager): ''' - A simple RPC Server that wait for new connections and dispatch messages - from thoses are already established. - - :param sock: the :class:`socket` object to bind to the server connection - :param default_handler: the default handler to bind to the new client - connections + An sjrpc server that use Gevent and its Greenlets to handle client + connections. + + :param addr: the ip address to connect to + :param port: the tcp port to connect to + :param conn_args, conn_kw: the arguments to pass to the client + :class:`RpcConnection` instance + + .. note:: + At this time, the server must be ran into that imported this module. + This is a limitation of Gevent 0.x and this should be disappear when + Gevent will be used. ''' - - def __init__(self, sock, default_handler=None, on_disconnect=None): - super(SimpleRpcServer, self).__init__() - sock.setblocking(False) - self._listening_sock = sock - self.register(sock, self._handle_master_event) - self._clients = {} - self._default_handler = default_handler - self._on_disconnect = on_disconnect - - def _accept_connection(self): - return self._listening_sock.accept() + + def __init__(self, addr, port, conn_args=(), conn_kw={}, *args, **kwargs): + from gevent.server import StreamServer + super(GreenRpcServer, self).__init__(*args, **kwargs) + self._conn_args = conn_args + self._conn_kw = conn_kw + self._server = StreamServer((addr, port), self._handler) + + def _handler(self, sock, address): + conn = GreenRpcConnection(sock, *self._conn_args, **self._conn_kw) + self.register(conn) + RpcConnection.run(conn) #FIXME + + def run(self): + #self._server.serve_forever() + #FIXME: Sometime, when shutdown is called, _server.serve_forever stay + # stuck and never return. This is maybe a problem with gevent, + # but this workaround seem to work. + self._server.start() + while not self._server._stopped_event.is_set(): + self._server._stopped_event.wait(2) def shutdown(self): - super(SimpleRpcServer, self).shutdown() - time.sleep(ConnectionManager.POLL_TIMEOUT) - for connection in self._clients.values(): - connection.shutdown(self._on_disconnect) - self._listening_sock.close() - - def shutdown_client(self, fd): - conn = self._clients.get(fd) - try: - self.unregister(fd) - except IOError: - pass - if fd is not None: - try: - del self._clients[fd] - except KeyError: - pass - if conn is not None: - conn.shutdown(callback=self._on_disconnect) - - def all_connections(self): - return set(self._clients.values()) - - def _handle_master_event(self, fd, events): - # Event concerns the listening socket: - if events & select.EPOLLIN: - accepted = self._accept_connection() - if accepted is not None: - sock, address = accepted - sock.setblocking(False) - connection = RpcConnection(sock, self, - handler=self._default_handler) - self.register(connection.get_fd(), self._handle_client_event) - self._clients[connection.get_fd()] = connection - - def _handle_client_event(self, fd, events): - connection = self._clients[fd] - - if events & select.EPOLLIN: - # Data are ready to be readed on socket - try: - connection.receive() - except socket.error as err: - logging.debug('Socket error while receiving from client ' - 'fd/%s: %s', fd, err) - self.shutdown_client(fd) - except Exception as err: - logging.debug('Unknown error while receiving from client ' - 'fd/%s: %s', fd, err) - self.shutdown_client(fd) - - if events & select.EPOLLOUT: - # Data are ready to be written on socket - try: - connection.send() - except socket.error as err: - logging.debug('Socket error while sending to the client ' - 'fd/%s: %s', fd, err) - self.shutdown_client(fd) - except Exception as err: - logging.debug('Unknown error while sending to the client ' - 'fd/%s: %s', fd, err) - self.shutdown_client(fd) - - if events & select.EPOLLHUP: - logging.debug('Socket HUP fd/%s', fd) - self.shutdown_client(fd) - - -class SimpleSslRpcServer(SimpleRpcServer): + super(GreenRpcServer, self).shutdown() + self._server.stop() + + +class SSLGreenRpcServer(GreenRpcServer): + ''' - A simple RPC Server that wait for new connections and dispatch messages - from thoses are already established. This server version enable SSL on - client sockets. - - :param sock: the :class:`socket` object to bind to the server connection - :param default_handler: the default handler to bind to the new client - connections + The SSL version of :class:`GreenRpcServer`. All connecting client are + automatically wrapped into and SSL connection. + + You must provide certfile and keyfile to make this work properly. ''' - def __init__(self, sock, certfile=None, keyfile=None, **kwargs): - self._certfile = certfile - self._keyfile = keyfile - super(SimpleSslRpcServer, self).__init__(sock, **kwargs) - - def _accept_connection(self): - sock, address = self._listening_sock.accept() - try: - sslsock = ssl.wrap_socket(sock, server_side=True, - keyfile=self._keyfile, - certfile=self._certfile, - ssl_version=ssl.PROTOCOL_TLSv1, - do_handshake_on_connect=True) - except ssl.SSLError as err: - logging.debug('Error when accepting ssl connection: %s', err) - else: - return sslsock, address + def __init__(self, addr, port, conn_args=(), conn_kw={}, certfile=None, + keyfile=None, *args, **kw): + super(GreenRpcServer, self).__init__(*args, **kw) + from gevent.server import StreamServer + self._conn_args = conn_args + self._conn_kw = conn_kw + self._server = StreamServer((addr, port), self._handler, + certfile=certfile, keyfile=keyfile) diff --git a/sjrpc/utils/__init__.py b/sjrpc/utils/__init__.py index 30dbdd3..6f19e2b 100644 --- a/sjrpc/utils/__init__.py +++ b/sjrpc/utils/__init__.py @@ -1,9 +1,9 @@ #!/usr/bin/env python #coding:utf8 -from sjrpc.utils.datastructures import * from sjrpc.utils.proxies import * from sjrpc.utils.handlers import * -__all__ = ('BytesBuffer', 'ConnectionProxy', 'RpcHandler', 'threadless', 'pure') +__all__ = ('ConnectionProxy', 'RpcHandler', 'threadless', 'pure', + 'pass_connection', 'pass_rpc') diff --git a/sjrpc/utils/datastructures.py b/sjrpc/utils/datastructures.py deleted file mode 100644 index 31d6fee..0000000 --- a/sjrpc/utils/datastructures.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python -#coding:utf8 - -import threading - -class BytesBuffer(object): - ''' - BytesBuffers objects are useful to create socket buffers. - - Its behavior is pretty simple and looks like a FIFO queue: you can push data - on it with push method, and pull a specified amount of data. - - .. note:: - All calls to :class:`BytesBuffer` objects are thread-safe. - ''' - - def __init__(self, data=None): - if data is None: - self._buf = '' - else: - self._buf = data - self._lock = threading.RLock() - - def push(self, data): - ''' - Push data on buffer. - - :param data: the data to push on the buffer - :type data: :class:`str` object - ''' - - self._lock.acquire() - self._buf += data - self._lock.release() - - def pull(self, size=0): - ''' - Pull the specified amount of data on the buffer. - - If size is equal to 0, will return the entire content. If buffer - contains less of data than asked, :meth:`pull` will return all the - available data. - - :param size: size (in bytes) of data to pull on the buffer - :type size: 0 or positive integer - :return: asked data - :rtype: :class:`str` object - ''' - - self._lock.acquire() - if size == 0: - buf = self._buf - self._buf = '' - else: - buf = self._buf[:size] - self._buf = self._buf[size:] - self._lock.release() - return buf - - def __len__(self): - ''' - Return the size of the buffer. - ''' - - return len(self._buf) - - def __enter__(self): - return self._lock.__enter__() - - def __exit__(self, exc_type, exc_value, traceback): - return self._lock.__exit__(exc_type, exc_value, traceback) diff --git a/sjrpc/utils/handlers.py b/sjrpc/utils/handlers.py index 7529f9d..50c956f 100644 --- a/sjrpc/utils/handlers.py +++ b/sjrpc/utils/handlers.py @@ -24,15 +24,40 @@ def threadless(func): Function handler decorator -- don't spawn a new thread when function is called. ''' - + func.__threaded__ = False return func + def pure(func): ''' Function handler decorator -- the function is a pure fonction, caller will not pass :class:`RpcConnection` object as first call parameters. + + .. note:: + This decorator is useless since the default behavior have change. You + can use :func:`pass_connection` decorator to do the opposite. + + This function is kept for compatibility only, and will be removed latter. + ''' + + return func + + +def pass_connection(func): + ''' + Function handler decorator -- pass on first parameter the connection which + called this function. + ''' + + func.__pass_connection__ = True + return func + +def pass_rpc(func): ''' - - func.__pure__ = True + Function handler decorator -- pass on first (or after connection) the + rpc protocol which called this function. + ''' + + func.__pass_rpc__ = True return func -- GitLab