Skip to content
Snippets Groups Projects
Commit 561c7544 authored by Antoine Millet's avatar Antoine Millet
Browse files

Implementation of sjRpc with libev and Pyev python bindings.

parent 8c77fe39
No related branches found
No related tags found
No related merge requests found
......@@ -27,5 +27,5 @@ from sjrpc.core.callers import *
from sjrpc.core.exceptions import *
from sjrpc.core.async import *
__all__ = ('RpcConnection', 'RpcCaller', 'ThreadedRpcCaller', 'RpcError',
'AsyncWatcher')
__all__ = ('RpcConnection', 'RpcCaller',
'ThreadedRpcCaller', 'RpcError', 'AsyncWatcher')
import logging
class Protocol(object):
def __init__(self, connection, label, logger=None):
self._connection = connection
self._label = label
if logger is None:
logger_name = '%s.protos.%s' % (connection.logger.name, label)
self.logger = logging.getLogger(logger_name)
else:
self.logger = logger
@property
def connection(self):
return self._connection
@property
def label(self):
return self._label
def send(self, payload):
'''
Send a message through the sjRpc connection.
'''
self._connection.send(self._label, payload)
def start_message(self, payload_size):
'''
Start a new incoming message receipt. By default, this method create
a new empty buffer on self._incoming_buf variable.
'''
self._incoming_buf = ''
def feed(self, data):
'''
Handle a chunk of data received from the tunnel. By default, this
method append this chunk to the end of the incoming buffer created by
default by :meth:`start_message` method.
'''
self._incoming_buf += data
def end_of_message(self):
'''
Signal the end of the currently received message. With default
:meth:`start_message` and :meth:`feed` methods, it's a good place to
implements the processing of the incoming message.
'''
pass
def handle_control(self, payload):
'''
Handle a control message received from the Rpc0.
'''
pass
def shutdown(self):
pass
from sjrpc.core.protocols.rpc import RpcProtocol
from sjrpc.core.protocols.tunnel import TunnelProtocol
from __future__ import absolute_import
import json
import logging
from uuid import uuid4
from threading import Event
from sjrpc.core.callers import RpcCaller, ThreadedRpcCaller
from sjrpc.core.exceptions import RpcError
from sjrpc.core.protocols import Protocol
class RpcProtocol(object):
class RpcProtocol(Protocol):
REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}}
RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None}
......@@ -25,80 +27,17 @@ class RpcProtocol(object):
'''
def __init__(self, connection, label, handler=None, on_disconnect=None,
request_decorator=None, timeout=30, logger=None):
self._connection = connection
self._label = label
request_decorator=None, timeout=30, *args, **kwargs):
super(RpcProtocol, self).__init__(connection, label, *args, **kwargs)
self._handler = handler
self._on_disconnect = on_disconnect
self.request_decorator = request_decorator
self._call_timeout = timeout
if logger is None:
logger_name = '%s.protos.%s' % (connection.logger.name, label)
self.logger = logging.getLogger(logger_name)
else:
self.logger = logger
# Store all calls sent to the peer. Key is the id of the call and value
# the event to raise when call is finished.
self._calls = {}
@property
def connection(self):
return self._connection
def handle(self, label, size):
'''
Decode json, and dispatch message.
'''
buf = self._connection.recv_until(size)
msg = json.loads(buf)
self._dispatch(msg)
def shutdown(self):
# Release all waiting calls from this rpc:
for cid in self._calls.keys():
err = {'exception': 'RpcError',
'message': 'Connection reset by peer'}
self._handle_response({'id': cid, 'return': None, 'error': err})
# Execute on_disconnect callback:
callback = None
if self._on_disconnect is not None and not callable(self._on_disconnect):
if self._handler is not None:
try:
callback = self._handler[self._on_disconnect]
except KeyError:
self.logger.warn('Shutdown callback not found in current '
'rpc attached handler, ignoring')
callback = None
else:
self.logger.warn('Shutdown callback specified but no handler '
'binded on rpc, ignoring')
callback = None
if callback is not None:
try:
callback(self._connection)
except Exception as err:
self.logger.debug('Error while execution of shutdown '
'callback: %s', err)
def get_handler(self):
'''
Return the handler binded to the :class:`RpcConnection`.
:return: binded handler
'''
return self._handler
def set_handler(self, handler):
'''
Define a new handler for this connection.
:param handler: the new handler to define.
'''
self._handler = handler
def _dispatch(self, message):
'''
Dispatch a received message according to it type.
......@@ -176,6 +115,22 @@ class RpcProtocol(object):
else:
self.logger.warning('Capabilities message received by non-zero'
' rpc.')
elif message['special'] == 'protoctl':
label = message.get('label')
if label is None:
self.logger.warning('Protoctl message received without label.')
return
try:
proto = self._connection.get_protocol(label)
except KeyError:
self.logger.warning('Protoctl message received for unknown label')
else:
try:
proto.handle_control(message.get('type'),
message.get('payload'))
except Exception as err:
self.logger.error('Protoctl handler failed for proto %s: ',
'%s' % err)
def _send(self, message):
'''
......@@ -232,6 +187,68 @@ class RpcProtocol(object):
self._send(msg)
#
# Public methods:
#
def end_of_message(self):
'''
When the message is fully received, decode the json and dispatch it.
'''
msg = json.loads(self._incoming_buf)
self._dispatch(msg)
def shutdown(self):
'''
Handle the shutdown process of this protocol instance:
* Release all waiting calls with a "Connection reset by peer" error.
* Execute the on_disconnect callback.
'''
# Release all waiting calls from this rpc:
for cid in self._calls.keys():
err = {'exception': 'RpcError',
'message': 'Connection reset by peer'}
self._handle_response({'id': cid, 'return': None, 'error': err})
# Execute on_disconnect callback:
callback = None
if self._on_disconnect is not None and not callable(self._on_disconnect):
if self._handler is not None:
try:
callback = self._handler[self._on_disconnect]
except KeyError:
self.logger.warn('Shutdown callback not found in current '
'rpc attached handler, ignoring')
callback = None
else:
self.logger.warn('Shutdown callback specified but no handler '
'binded on rpc, ignoring')
callback = None
if callback is not None:
try:
callback(self._connection)
except Exception as err:
self.logger.debug('Error while execution of shutdown '
'callback: %s', err)
def get_handler(self):
'''
Return the handler binded to the :class:`RpcConnection`.
:return: binded handler
'''
return self._handler
def set_handler(self, handler):
'''
Define a new handler for this connection.
:param handler: the new handler to define.
'''
self._handler = handler
def send_special(self, special, **kwargs):
'''
Send a "special" message to the peer.
......
......@@ -9,11 +9,13 @@ from __future__ import absolute_import
import ssl
import struct
import socket
import select
import logging
from sjrpc.core.protocols.rpc import RpcProtocol
from sjrpc.core.exceptions import RpcError, SocketRpcError
from sjrpc.core.exceptions import RpcError
import pyev
class RpcConnection(object):
'''
......@@ -53,18 +55,49 @@ class RpcConnection(object):
MESSAGE_HEADER = '!HL'
MESSAGE_HEADER_FALLBACK = '!L'
SHORTCUTS_MAINRPC = ('call', 'async_call')
IOW_EVENTS = {'read': select.POLLIN | select.POLLPRI,
'write': select.POLLOUT, 'hup': select.POLLHUP,
'error': select.POLLERR | select.POLLNVAL}
def __init__(self, sock, *args, **kwargs):
def __init__(self, sock, loop=None, *args, **kwargs):
# Sock of this connection:
self._sock = sock
sock.setblocking(True)
sock.setblocking(False)
# Get the pyev loop:
if loop is None:
self.loop = pyev.default_loop()
else:
self.loop = loop
# Activate TCP keepalive on the connection:
#self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Watcher list:
self._watchers = set()
# Initialize main watcher:
self._sock_reader = self.loop.io(self._sock, pyev.EV_READ, self._dispatch)
self._sock_writer = self.loop.io(self._sock, pyev.EV_WRITE, self._writer)
self._watchers.add(self._sock_reader)
self._watchers.add(self._sock_writer)
# Socket inbound/outbound buffers:
self._inbound_buffer = ''
self._outbound_buffer = ''
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
self._proto_receiving = None
self._sock_reader.start()
# Is the RpcConnection connected to its peer:
self._connected = True
# "Need to send" loop signal:
self._need_to_send = self.create_watcher(pyev.Async,
callback=self._cb_need_to_send)
self._need_to_send.start()
# Setup self.logger.facility:
self.logger = logging.getLogger('sjrpc.%s' % self.getpeername())
......@@ -129,19 +162,6 @@ class RpcConnection(object):
def __nonzero__(self):
return self._connected
def run(self):
'''
Inbound message processing loop.
'''
while self._connected:
try:
self._dispatch()
except SocketRpcError:
# If SocketRpcError occurs while dispatching, shutdown the
# connection if it not already shutdown:
if self._connected:
self.shutdown()
def _enable_fallback(self):
pass
......@@ -162,24 +182,86 @@ class RpcConnection(object):
rpc0 = self.get_protocol(0)
rpc0.send_special('capabilities', capabilities=cap)
def _dispatch(self):
def _dispatch(self, watcher, revents):
'''
Read next message from socket and dispatch it to accoding protocol
handler.
'''
# Read the header:
if self.fallback:
buf = self.recv_until(struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK))
pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER_FALLBACK, buf)[0]
label = 0
# Try to received remaining data from the socket:
buf = self._sock.recv(self._remains)
if buf == '':
self.shutdown()
self._remains -= len(buf)
if self._proto_receiving is None:
self._inbound_buffer += buf
if self._remains == 0:
if self.fallback:
pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER_FALLBACK, self._inbound_buffer)[0]
label = 0
else:
label, pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER, self._inbound_buffer)
# Get the registered protocol for the specified label
self._proto_receiving = self._protocols.get(label)
self._proto_receiving.start_message(pl_size)
self._inbound_buffer = ''
self._remains = pl_size
else:
buf = self.recv_until(struct.calcsize(RpcConnection.MESSAGE_HEADER))
label, pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER, buf)
# Get the registered protocol for the specified label
proto = self._protocols.get(label)
if proto is not None:
proto.handle(label, pl_size)
self._proto_receiving.feed(buf)
if self._remains == 0:
self._proto_receiving.end_of_message()
if self.fallback:
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
else:
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER)
self._inbound_buffer = ''
self._proto_receiving = None
def _writer(self, watcher, revent):
'''
Write data on the socket.
'''
if self._outbound_buffer:
try:
if self.fallback:
sent = self._sock.send(self._outbound_buffer[:4096])
else:
sent = self._sock.send(self._outbound_buffer)
except socket.error as err:
errmsg = 'Fatal error while sending through socket: %s' % err
self.logger.error(errmsg)
raise RpcError('SocketError', errmsg)
self._outbound_buffer = self._outbound_buffer[sent:]
if not self._outbound_buffer:
watcher.stop()
def _cb_need_to_send(self, watcher, revents):
self._sock_writer.start()
#
# Public API
#
@property
def rpc(self):
return self.get_protocol(0)
def run(self):
'''
Main loop execution.
'''
self.loop.start()
def create_watcher(self, watcher_class, **kwargs):
'''
Create a new pyev watcher and return it.
'''
kwargs['loop'] = self.loop
watcher = watcher_class(**kwargs)
self._watchers.add(watcher)
return watcher
def send(self, label, payload):
'''
......@@ -193,26 +275,8 @@ class RpcConnection(object):
header = struct.pack(RpcConnection.MESSAGE_HEADER_FALLBACK, size)
else:
header = struct.pack(RpcConnection.MESSAGE_HEADER, label, size)
try:
if self.fallback:
data = header + payload
while data:
self._sock.sendall(data[:4096])
data = data[4096:]
else:
self._sock.sendall(header + payload)
except socket.error as err:
errmsg = 'Fatal error while sending through socket: %s' % err
self.logger.error(errmsg)
raise RpcError('SocketError', errmsg)
#
# Public API
#
@property
def rpc(self):
return self.get_protocol(0)
self._outbound_buffer += header + payload
self._need_to_send.send()
def set_capabilities(self, capabilities):
'''
......@@ -259,6 +323,9 @@ class RpcConnection(object):
'''
Shutdown this connection.
'''
# Shutdown each registered watcher:
for watcher in self._watchers:
watcher.stop()
# Shutdown each registered protocols:
for proto in self._protocols.itervalues():
......@@ -307,82 +374,3 @@ class RpcConnection(object):
:return: string representing the peer name
'''
return '%s:%s' % self._sock.getpeername()
def recv_until(self, bufsize, flags=None):
'''
Read socket until bufsize is received.
'''
buf = ''
while len(buf) < bufsize:
remains = bufsize - len(buf)
try:
received = self._sock.recv(remains)
except socket.error as err:
if not self._connected:
raise SocketRpcError('Not connected to the peer')
elif err.errno == 11:
continue
errmsg = 'Fatal error while receiving from socket: %s' % err
self.logger.error(errmsg)
raise SocketRpcError(errmsg)
# Handle peer disconnection:
if not received:
self.logger.info('Connection reset by peer')
self.shutdown()
buf += received
return buf
class GreenRpcConnection(RpcConnection):
'''
Cooperative RpcConnection to use with Gevent.
'''
def __init__(self, *args, **kwargs):
super(GreenRpcConnection, self).__init__(*args, **kwargs)
self._greenlet = None
@classmethod
def from_addr(cls, addr, port, conn_timeout=30.0, *args, **kwargs):
'''
Construct the instance of :class:`RpcConnection` without providing
the :class:`socket` object. Socket is automatically created and passed
to the standard constructor before to return the new instance.
:param addr: the target ip address
:param port: the target port
:param conn_timeout: the connection operation timeout
:param *args, **kwargs: extra argument to pass to the constructor (see
constructor doctring)
'''
import gevent.socket
sock = gevent.socket.create_connection((addr, port), conn_timeout)
return cls(sock, *args, **kwargs)
@classmethod
def from_addr_ssl(cls, addr, port, cert, conn_timeout=30, *args, **kwargs):
'''
Construct :class:`RpcConnection` instance like :meth:`from_addr`, but
enable ssl on socket.
:param cert: ssl certificate or None for ssl without certificat
'''
import gevent.socket
sock = gevent.socket.create_connection((addr, port), conn_timeout)
req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED
sock = gevent.ssl.SSLSocket(sock, certfile=None, cert_reqs=req,
ssl_version=ssl.PROTOCOL_TLSv1)
return cls(sock, *args, **kwargs)
def run(self):
import gevent
self._greenlet = gevent.spawn(self.run)
self._greenlet.join()
def shutdown(self):
super(GreenRpcConnection, self).shutdown()
if self._greenlet is not None:
self._greenlet.kill()
......@@ -3,6 +3,6 @@
from __future__ import absolute_import
from sjrpc.server.simple import GreenRpcServer, SSLGreenRpcServer
from sjrpc.server.simple import (RpcServer, )
__all__ = ('GreenRpcServer', 'SSLGreenRpcServer')
__all__ = ('RpcServer', )
import ssl
import time
import errno
import socket
import select
import logging
from sjrpc.core import RpcConnection, GreenRpcConnection
import threading
from sjrpc.core import RpcConnection
import pyev
class RpcServer(object):
......@@ -13,9 +18,48 @@ class RpcServer(object):
Base class for all RpcServer classes.
'''
def __init__(self):
NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK)
def __init__(self, sock, loop=None, conn_args=(), conn_kw={}):
self._clients = set()
self.logger = logging.getLogger('sjrpc')
if loop is None:
self.loop = pyev.default_loop()
else:
self.loop = loop
self._conn_args = conn_args
self._conn_kw = conn_kw
self._sock = sock
self._sock_watcher = self.loop.io(self._sock, pyev.EV_READ, self._handle)
self._sock_watcher.start()
@classmethod
def from_addr(cls, addr, port, *args, **kwargs):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr, port))
sock.setblocking(False)
sock.listen(5)
return cls(sock, *args, **kwargs)
def _handle(self, watcher, revents):
while True:
try:
sock, address = self._sock.accept()
except socket.error as err:
if err.args[0] in self.NONBLOCKING_ERRORS:
break
else:
raise #FIXME
self.logger.info('New incoming connection from %s:%s', *address)
conn = RpcConnection(sock, self.loop,
*self._conn_args, **self._conn_kw)
self.register(conn)
#
# Public methods:
#
def register(self, conn):
'''
......@@ -39,74 +83,17 @@ class RpcServer(object):
self._clients.remove(conn)
def run(self):
raise NotImplementedError('You must use a sub-class of RpcServer.')
'''
Run the :class:`RpcServer`.
'''
self.loop.start()
def shutdown(self):
'''
Shutdown the :class:`RpcServer` instance.
'''
self.logger.info('Shutdown requested')
self._sock_watcher.stop()
for client in self._clients.copy():
self.unregister(client, shutdown=True)
class GreenRpcServer(RpcServer):
'''
An sjrpc server that use Gevent and its Greenlets to handle client
connections.
:param addr: the ip address to connect to
:param port: the tcp port to connect to
:param conn_args, conn_kw: the arguments to pass to the client
:class:`RpcConnection` instance
.. note::
At this time, the server must be ran into that imported this module.
This is a limitation of Gevent 0.x and this should be disappear when
Gevent will be used.
'''
def __init__(self, addr, port, conn_args=(), conn_kw={}, *args, **kwargs):
from gevent.server import StreamServer
super(GreenRpcServer, self).__init__(*args, **kwargs)
self._conn_args = conn_args
self._conn_kw = conn_kw
self._server = StreamServer((addr, port), self._handler)
def _handler(self, sock, address):
conn = GreenRpcConnection(sock, *self._conn_args, **self._conn_kw)
self.register(conn)
RpcConnection.run(conn) #FIXME
def run(self):
#self._server.serve_forever()
#FIXME: Sometime, when shutdown is called, _server.serve_forever stay
# stuck and never return. This is maybe a problem with gevent,
# but this workaround seem to work.
self._server.start()
while not self._server._stopped_event.is_set():
self._server._stopped_event.wait(2)
def shutdown(self):
super(GreenRpcServer, self).shutdown()
self._server.stop()
class SSLGreenRpcServer(GreenRpcServer):
'''
The SSL version of :class:`GreenRpcServer`. All connecting client are
automatically wrapped into and SSL connection.
You must provide certfile and keyfile to make this work properly.
'''
def __init__(self, addr, port, conn_args=(), conn_kw={}, certfile=None,
keyfile=None, *args, **kw):
super(GreenRpcServer, self).__init__(*args, **kw)
from gevent.server import StreamServer
self._conn_args = conn_args
self._conn_kw = conn_kw
self._server = StreamServer((addr, port), self._handler,
certfile=certfile, keyfile=keyfile)
self.loop.stop(pyev.EVBREAK_ALL)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment