Commit 4eadc7b5 authored by Antoine Millet's avatar Antoine Millet

Rewrited sjRpc with low level channel multiplexage and gevent usage.

parent d63729bd
#!/usr/bin/env python
#coding:utf8
'''
......@@ -11,14 +9,12 @@ The library is separated into four parts:
* core library contains all common classes.
* server library contains all the server side related stuff.
* client library contains all the client side related stuff.
* utils library contains some helpers used in previous libraries.
'''
import sjrpc.core
import sjrpc.server
import sjrpc.client
import sjrpc.utils
__version__ = 13
__version__ = '14~dev'
#!/usr/bin/env python
#coding:utf8
from sjrpc.client.simple import SimpleRpcClient
__all__ = ('SimpleRpcClient',)
#!/usr/bin/env python
#coding=utf8
import select
import socket
import logging
from sjrpc.core import RpcConnection, ConnectionManager
class SimpleRpcClient(ConnectionManager):
'''
Create a new simple RPC client.
:param connect: the :class:`RpcConnection` object to bind the client manager
:param default_handler: the default handler to bind to the client connection
:param on_disconnect: method on the handler to call when the client
disconnects.
'''
def __init__(self, connection, default_handler=None, on_disconnect=None):
super(SimpleRpcClient, self).__init__()
self._on_disconnect = on_disconnect
self._connection = connection
self._connection.set_handler(default_handler)
self.register(self._connection.get_fd(), self._handle_event)
@classmethod
def from_addr(cls, addr, port, enable_ssl=False, cert=None, timeout=None,
conn_timeout=30.0, default_handler=None, on_disconnect=None):
'''
Construct the instance of :class:`SimpleRpcClient` without providing
the :class:`RpcConnection` object. The :class:`RpcConnection` is
automatically created and passed to the standard constructor before to
return the new instance.
:param addr: the target ip address
:param port: the target port
:param ssl: enable SSL
:param timeout: the global call timeout setting
:param conn_timeout: the connection operation timeout
:param cert: is SSL is enabled, profile the filename of certificate to
check. If None, don't check certificate.
:param default_handler: the default handler to bind to the
client connection
:param on_disconnect: method on the handler to call when the client
disconnects.
'''
connection = RpcConnection.from_addr(addr, port, None, timeout=timeout,
conn_timeout=conn_timeout,
enable_ssl=enable_ssl, cert=cert)
client = cls(connection, default_handler=default_handler,
on_disconnect=on_disconnect)
connection._manager = client
return client
@classmethod
def from_sock(cls, sock, default_handler=None, on_disconnect=None):
'''
Construct the instance of :class:`SimpleRpcClient` without providing
the :class:`RpcConnection` object. The :class:`RpcConnection` is
automatically created and passed to the standard constructor before to
return the new instance
:param sock: the socket object to wrap with :class:`RpcConnection`
object.
:param default_handler: the default handler to bind to the
client connection
:param on_disconnect: method on the handler to call when the client
disconnects.
'''
connection = RpcConnection(sock, None)
client = cls(connection, default_handler, on_disconnect)
connection._manager = client
return client
def shutdown(self):
super(SimpleRpcClient, self).shutdown()
self._connection.shutdown(self._on_disconnect)
def _handle_event(self, fd, events):
if events & select.EPOLLIN:
# Data are ready to be readed on socket
try:
self._connection.receive()
except socket.error as err:
logging.debug('Socket error while receiving from the client '
'fd/%s: %s', fd, err)
self.shutdown()
if events & select.EPOLLOUT:
# Data are ready to be written on socket
try:
self._connection.send()
except socket.error as err:
logging.debug('Socket error while sending to the client '
'fd/%s: %s', fd, err)
self.shutdown()
if events & select.EPOLLHUP:
logging.debug('Socket HUP fd/%s', fd)
self.shutdown()
def all_connections(self):
return set((self._connection,))
def call(self, *args, **kwargs):
return self._connection.call(*args, **kwargs)
......@@ -2,10 +2,9 @@
#coding:utf8
from sjrpc.core.rpcconnection import *
from sjrpc.core.connectionmanagers import *
from sjrpc.core.callers import *
from sjrpc.core.exceptions import *
from sjrpc.core.async import *
__all__ = ('ConnectionManager', 'RpcConnection', 'RpcCaller', 'RpcError',
'ThreadedRpcCaller', )
__all__ = ('RpcConnection', 'RpcCaller', 'ThreadedRpcCaller', 'RpcError',
'AsyncWatcher')
#!/usr/bin/env python
#coding:utf8
import threading
from sjrpc.core.exceptions import RpcError
ERRMSG_RPCERR = ('Unable to send reply to the peer: %s (this error is usualy '
'raised when connection is lost while handler function '
'execution)')
class RpcCaller(object):
'''
A caller execute a callable (function, method, class which implement the
:meth:`__call__` method...) in a particular context (threaded or
"timeouted" for example), and return the result (or the exception) to the
:meth:`__call__` method...) in a particular context (threaded or
"timeouted" for example), and return the result (or the exception) to the
peer through it :class:`RpcConnection` object.
'''
def __init__(self, request, connection, func):
def __init__(self, request, protocol, func):
self._request = request
self._connection = connection
self._protocol = protocol
self._func = func
# Apply the request decorator
#request_decorator = connection.request_decorator
#if request_decorator is not None:
# self._func = request_decorator(self._func)
def run(self):
'''
Run the callable and return the result (or the exception) to the peer.
'''
msg_id = self._request['id']
args = self._request['args']
kwargs = self._request['kwargs']
if not getattr(self._func, '__pure__', False):
args.insert(0, self._connection)
if getattr(self._func, '__pass_rpc__', False):
args.insert(0, self._protocol)
if getattr(self._func, '__pass_connection__', False):
args.insert(0, self._protocol.connection)
try:
returned = self._func(*args, **kwargs)
except Exception as err:
self._connection.error(msg_id, message=str(err),
error=err.__class__.__name__)
try:
self._protocol.error(msg_id, message=str(err),
error=err.__class__.__name__)
except RpcError as err:
self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
else:
self._connection.response(msg_id, returned=returned)
try:
self._protocol.response(msg_id, returned=returned)
except RpcError as err:
self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
def start(self):
'''
Start execution of the callable, the most of time, it just call
Start execution of the callable, the most of time, it just call
:meth:`run` method.
'''
self.run()
class ThreadedRpcCaller(RpcCaller):
'''
A caller which make the call into a separated thread.
'''
def __init__(self, *args, **kwargs):
super(ThreadedRpcCaller, self).__init__(*args, **kwargs)
self._thread = threading.Thread(target=self.run)
self._thread.name = 'Processing of call: %s' % self._request['id']
self._thread.daemon = True
def start(self):
self._thread.start()
#!/usr/bin/env python
#coding:utf8
import select
import threading
class ConnectionManager(object):
'''
Base class for all connection manager classes.
'''
# The timeout to wait before the poll call release the hand with no events:
POLL_TIMEOUT = 1
# Masks for fd registration on poll object:
MASK_NORMAL = (select.EPOLLIN | select.EPOLLPRI |
select.EPOLLERR | select.EPOLLHUP)
MASK_WRITABLE = MASK_NORMAL | select.EPOLLOUT
def __init__(self):
self._poll = select.epoll()
self._running = True
self._received_msg = {}
self._wait_groups = {}
self._poll_callbacks = {}
def register(self, fd, callback, *args, **kwargs):
'''
Register an fd on the poll object with the specified callback. The
callback will be called each time poller drop an event for the specified
fd. Extra args will be passed to the callback after fd and events.
:param fd: the fd to register
:param callback: the callable to use on event
:param *args, **kwargs: extra arguments passed to the callback
'''
if hasattr(fd, 'fileno'):
fd = fd.fileno()
self._poll_callbacks[fd] = {'func': callback,
'extra': args,
'kwextra': kwargs}
self._poll.register(fd, ConnectionManager.MASK_NORMAL)
def unregister(self, fd):
'''
Unregister the specified fd from the manager.
:param fd: the fd to unregister.
'''
self._poll.unregister(fd)
del self._poll_callbacks[fd]
def is_running(self):
return self._running
def run(self):
'''
Run the main loop of the :class:`ConnectionManager`. It will catch
events on registered :class:`RpcConnection` and process them.
'''
while self._running:
try:
events = self._poll.poll(ConnectionManager.POLL_TIMEOUT)
except IOError:
pass
else:
for fd, event in events:
if fd in self._poll_callbacks:
cb = self._poll_callbacks[fd]
cb['func'](fd, event, *cb['extra'], **cb['kwextra'])
def start(self, daemonize=False):
'''
Run the main loop in a separated thread.
:param daemonize: set the thread daemon state
'''
t = threading.Thread(target=self.run)
t.daemon = daemonize
t.start()
def wait(self, msg_id_set, timeout=None, wait_all=True):
'''
Wait for the asynchronous messages in ``msg_id_set``.
When the timeout argument is present and not ``None``, it should be a
floating point number specifying a timeout for the operation in
seconds (or fractions thereof).
You can also set ``wait_all`` to False if you want to unlock the call
when the first response is received.
:param msg_id_set: set of message to wait
:type msg_id_set: :class:`frozenset`
:param timeout: timeout value or None to disable timeout (default: None)
:type timeout: :class:`int` or :class:`None`
:param wait_all: wait for all messages (default: True)
:type wait_all: :class:`bool`
.. warning:
This is important that ``msg_id_set`` is a :class:`frozenset`
and not a :class:`set`.
'''
waiter = {'event': threading.Event(), 'wait_all': wait_all}
self._wait_groups.setdefault(msg_id_set, waiter)
already_completed = self._check_waiter(msg_id_set)
if not already_completed:
waiter['event'].wait(timeout=timeout)
# Clean the call list on each attached RpcConnection
for connection in self.all_connections():
connection.clean_all_calls(msg_id_set)
# Get the messages:
messages = []
for msg_id, msg in self._received_msg.items():
if msg_id in msg_id_set:
messages.append(msg)
del self._received_msg[msg_id]
waiter['responses'] = tuple(messages)
messages = waiter['responses']
del self._wait_groups[msg_id_set]
return messages
def signal_arrival(self, message):
'''
Signal the arrival of a new message to the :class:`ConnectionManager`.
This method is ordinary called by the :class:`RpcConnections` objects,
when a response to an asynchronous call is received.
:param message: the message received
'''
self._received_msg[message['id']] = message
for waitset in self._wait_groups.keys():
self._check_waiter(waitset)
def _check_waiter(self, waitset):
'''
Check if a waitset is completed and process it.
:param waitset: the waitset to check
:return: True if waitset is completed else None
'''
# Make a set of received messages ids:
recv_msg = set(self._received_msg)
try:
waiter = self._wait_groups[waitset]
except KeyError:
return False
is_ok = (waiter['wait_all'] and waitset <= recv_msg
or not waiter['wait_all'] and not recv_msg.isdisjoint(waitset))
if is_ok:
# Unlock the event:
waiter['event'].set()
return True
else:
return False
def all_connections(self):
'''
Return all connection attached to this :class:`ConnectionManager`.
:return: a set of :class:`RpcConnection` attached
to this :class:`ConnectionManager`
'''
raise NotImplementedError
def shutdown(self):
'''
Shutdown the manager properly.
'''
self._running = False
def data_to_write(self, fd):
'''
Method called by a connection to inform the manager that it have data
to send.
:param connection: the fd which have data to write
'''
if fd is not None:
self._poll.modify(fd, ConnectionManager.MASK_WRITABLE)
def nothing_to_write(self, fd):
'''
Method called by a connection to inform the manager that it have no
more data to send.
:param fd: the fd which have no more data to write
'''
if fd is not None:
self._poll.modify(fd, ConnectionManager.MASK_NORMAL)
def handle_event(self, fd, event):
'''
Handle an event and make associated action. This is an abstract method to
overload on derived classes.
:param fd: the fd that have generated the event
:param event: the event as returned by the poller object
'''
pass
#!/usr/bin/env python
#coding:utf8
'''
Contains sjRpc exceptions.
'''
class RpcError(Exception):
'''
Exception raised by caller when an error occurs while execution of remote
procedure call.
'''
def __init__(self, exception, message):
self.exception = exception
self.message = message
def __str__(self):
return '%s' % self.message
class SocketRpcError(Exception):
'''
Exception used internally to raise a socket fault.
'''
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/env python
#coding:utf8
from sjrpc.server.simple import *
from __future__ import absolute_import
__all__ = ('SimpleRpcServer', 'SimpleSslRpcServer')
from sjrpc.server.simple import GreenRpcServer, SSLGreenRpcServer
__all__ = ('GreenRpcServer', 'SSLGreenRpcServer')
#!/usr/bin/env python
#coding=utf8
import ssl
import time
import socket
import select
import logging
from sjrpc.core import RpcConnection, ConnectionManager
from sjrpc.core import RpcConnection, GreenRpcConnection
class RpcServer(object):
'''
Base class for all RpcServer classes.
'''
def __init__(self):
self._clients = set()
self.logger = logging.getLogger('sjrpc')
def register(self, conn):
'''
Register a new connection on this server.
:param conn: the connection to register.
'''
self._clients.add(conn)
def unregister(self, conn, shutdown=False):
'''
Unregister the specified client from this server. If shutdown is
specified, client is shutdown before to be unregistered.
:param conn: the connection to unregister
:param shutdown: shutdown or not the connection before to register
'''
if conn in self._clients:
if shutdown:
conn.shutdown()
self._clients.remove(conn)
def run(self):
raise NotImplementedError('You must use a sub-class of RpcServer.')
def shutdown(self):
'''
Shutdown the :class:`RpcServer` instance.
'''
self.logger.info('Shutdown requested')
for client in self._clients.copy():
self.unregister(client, shutdown=True)
class GreenRpcServer(RpcServer):
class SimpleRpcServer(ConnectionManager):
'''
A simple RPC Server that wait for new connections and dispatch messages
from thoses are already established.
:param sock: the :class:`socket` object to bind to the server connection
:param default_handler: the default handler to bind to the new client
connections
An sjrpc server that use Gevent and its Greenlets to handle client
connections.
:param addr: the ip address to connect to
:param port: the tcp port to connect to
:param conn_args, conn_kw: the arguments to pass to the client
:class:`RpcConnection` instance
.. note::
At this time, the server must be ran into that imported this module.
This is a limitation of Gevent 0.x and this should be disappear when
Gevent will be used.
'''
def __init__(self, sock, default_handler=None, on_disconnect=None):
super(SimpleRpcServer, self).__init__()
sock.setblocking(False)
self._listening_sock = sock
self.register(sock, self._handle_master_event)
self._clients = {}
self._default_handler = default_handler
self._on_disconnect = on_disconnect
def _accept_connection(self):
return self._listening_sock.accept()
def __init__(self, addr, port, conn_args=(), conn_kw={}, *args, **kwargs):
from gevent.server import StreamServer
super(GreenRpcServer, self).__init__(*args, **kwargs)
self._conn_args = conn_args
self._conn_kw = conn_kw
self._server = StreamServer((addr, port), self._handler)
def _handler(self, sock, address):
conn = GreenRpcConnection(sock, *self._conn_args, **self._conn_kw)
self.register(conn)
RpcConnection.run(conn) #FIXME
def run(self):
#self._server.serve_forever()
#FIXME: Sometime, when shutdown is called, _server.serve_forever stay
# stuck and never return. This is maybe a problem with gevent,
# but this workaround seem to work.
self._server.start()
while not self._server._stopped_event.is_set():
self._server._stopped_event.wait(2)
def shutdown(self):
super(SimpleRpcServer, self).shutdown()
time.sleep(ConnectionManager.POLL_TIMEOUT)
for connection in self._clients.values():
connection.shutdown(self._on_disconnect)
self._listening_sock.close()
def shutdown_client(self, fd):
conn = self._clients.get(fd)
try:
self.unregister(fd)
except IOError:
pass
if fd is not None:
try:
del self._clients[fd]
except KeyError:
pass
if conn is not None:
conn.shutdown(callback=self._on_disconnect)
def all_connections(self):
return set(self._clients.values())
def _handle_master_event(self, fd, events):
# Event concerns the listening socket:
if events & select.EPOLLIN:
accepted = self._accept_connection()
if accepted is not None:
sock, address = accepted
sock.setblocking(False)
connection = RpcConnection(sock, self,
handler=self._default_handler)
self.register(connection.get_fd(), self._handle_client_event)
self._clients[connection.get_fd()] = connection
def _handle_client_event(self, fd, events):
connection = self._clients[fd]
if events & select.EPOLLIN:
# Data are ready to be readed on socket
try:
connection.receive()
except socket.error as err:
logging.debug('Socket error while receiving from client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd)
except Exception as err:
logging.debug('Unknown error while receiving from client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd)
if events & select.EPOLLOUT:
# Data are ready to be written on socket
try:
connection.send()
except socket.error as err:
logging.debug('Socket error while sending to the client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd)
except Exception as err:
logging.debug('Unknown error while sending to the client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd)
if events & select.EPOLLHUP:
logging.debug('Socket HUP fd/%s', fd)
self.shutdown_client(fd)
class SimpleSslRpcServer(SimpleRpcServer):
super(GreenRpcServer, self).shutdown()
self._server.stop()
class SSLGreenRpcServer(GreenRpcServer):
'''
A simple RPC Server that wait for new connections and dispatch messages
from thoses are already established. This server version enable SSL on
client sockets.
:param sock: the :class:`socket` object to bind to the server connection
:param default_handler: the default handler to bind to the new client
connections
The SSL version of :class:`GreenRpcServer`. All connecting client are
automatically wrapped into and SSL connection.
You must provide certfile and keyfile to make this work properly.
'''
def __init__(self, sock, certfile=None, keyfile=None, **kwargs):
self._certfile = certfile
self._keyfile = keyfile