Commit 0197b19b authored by Antoine Millet's avatar Antoine Millet

Initial import

parents
*.pyc
doc/_build/*
*.swp
*.log
test_*.py
This diff is collapsed.
sjrpc (3-1) unstable; urgency=low
* Fixed a bug that disallow to send a large amound of
data in a single message.
-- Antoine Millet <antoine.millet@smartjog.com> Thu, 16 Dec 2010 12:47:05 +0100
sjrpc (2-1) unstable; urgency=low
* Added is_running method on ConnectionManager
* Added get_handler method on ConnectionManager
* Compatibility with SSL sockets (added SimpleSslRpcServer
to handle ssl clients)
-- Antoine Millet <antoine.millet@smartjog.com> Mon, 13 Dec 2010 18:56:12 +0100
sjrpc (1-1) unstable; urgency=low
* Initial release.
-- Antoine Millet <antoine.millet@smartjog.com> Mon, 06 Dec 2010 14:42:27 +0100
Source: sjrpc
Section: python
Priority: optional
Maintainer: Antoine Millet <antoine.millet@smartjog.com>
Build-Depends: debhelper (>= 7), python-central (>= 0.6), cdbs (>= 0.4.50), python-setuptools, python
XS-Python-Version: >= 2.6
Standards-Version: 3.9.1
Package: python-sjrpc
Architecture: all
Depends: ${misc:Depends}, ${python:Depends}
XB-Python-Version: ${python:Versions}
Description: SmartJog RPC
This package provides a Python module implementing a bidirectionnal
RPC using JSON.
Files: *
Copyright: © 2010 Smartjog
License: GPL2
#!/usr/bin/make -f
# -*- makefile -*-
DEB_PYTHON_SYSTEM=pycentral
# Debhelper must be included before python-distutils to use
# dh_python / dh_pycentral / dh_pysupport
include /usr/share/cdbs/1/rules/debhelper.mk
include /usr/share/cdbs/1/class/python-distutils.mk
PYTHON_PACKAGES := python-sjrpc
$(patsubst %,binary-install/%,$(PYTHON_PACKAGES))::
dh_pycentral -p$(cdbs_curpkg)
from setuptools import setup
import os
ldesc = open(os.path.join(os.path.dirname(__file__), 'README')).read()
setup(
name='sjrpc',
version='3',
description='Smartjog RPC',
long_description=ldesc,
author='Antoine Millet',
author_email='antoine.millet@smartjog.com',
license='GPL2',
packages=['sjrpc', 'sjrpc.client', 'sjrpc.server', 'sjrpc.core',
'sjrpc.utils'],
classifiers=[
'Intended Audience :: Developers',
'Operating System :: Unix',
'Programming Language :: Python',
],
)
#!/usr/bin/env python
#coding:utf8
'''
This module implements a Remote Procedure Call system using socket objects as
transport. The main feature of this RPC is to be bidirectionnal: both client and
server can serve remote procedure for its peer.
The library is separated into four parts:
* core library contains all common classes.
* server library contains all the server side related stuff.
* client library contains all the client side related stuff.
* utils library contains some helpers used in previous libraries.
'''
import sjrpc.core
import sjrpc.server
import sjrpc.client
import sjrpc.utils
#!/usr/bin/env python
#coding:utf8
from sjrpc.client.simple import SimpleRpcClient
__all__ = ('SimpleRpcClient',)
#!/usr/bin/env python
#coding=utf8
import ssl
import select
import socket
from sjrpc.core import RpcConnection, ConnectionManager
class SimpleRpcClient(ConnectionManager):
'''
Create a new simple RPC client.
:param connect: the :class:`RpcConnection` object to bind the client manager
:param default_handler: the default handler to bind to the client connection
:param on_disconnect: method on the handler to call when the client
disconnects.
'''
def __init__(self, connection, default_handler=None, on_disconnect=None):
super(SimpleRpcClient, self).__init__()
self._on_disconnect = on_disconnect
self._connection = connection
self.register(self._connection)
@classmethod
def from_addr(cls, addr, port, enable_ssl=False, cert=None,
default_handler=None, on_disconnect=None):
'''
Construct the instance of :class:`SimpleRpcClient` without providing
the :class:`RpcConnection` object. The :class:`RpcConnection` is
automatically created and passed to the standard constructor before to
return the new instance.
:param addr: the target ip address
:param port: the target port
:param ssl: enable SSL
:param cert: is SSL is enabled, profile the filename of certificate to
check. If None, don't check certificate.
:param default_handler: the default handler to bind to the
client connection
:param on_disconnect: method on the handler to call when the client
disconnects.
'''
connection = RpcConnection.from_addr(addr, port, None,
enable_ssl=enable_ssl, cert=cert)
client = cls(connection, default_handler, on_disconnect)
connection._manager = client
return client
@classmethod
def from_sock(cls, sock, default_handler=None, on_disconnect=None):
'''
Construct the instance of :class:`SimpleRpcClient` without providing
the :class:`RpcConnection` object. The :class:`RpcConnection` is
automatically created and passed to the standard constructor before to
return the new instance
:param sock: the socket object to wrap with :class:`RpcConnection`
object.
:param default_handler: the default handler to bind to the
client connection
:param on_disconnect: method on the handler to call when the client
disconnects.
'''
connection = RpcConnection(sock, None)
client = cls(connection, default_handler, on_disconnect)
connection._manager = client
return client
def shutdown(self):
super(SimpleRpcClient, self).shutdown()
self._connection.shutdown(self._on_disconnect)
def handle_event(self, fd, event):
if event & select.EPOLLIN:
# Data are ready to be readed on socket
try:
self._connection.receive()
except socket.error as err:
self.shutdown()
if event & select.EPOLLOUT:
# Data are ready to be written on socket
try:
self._connection.send()
except socket.error as err:
self.shutdown()
if event & select.EPOLLHUP:
self.shutdown() #TODO
def all_connections(self):
return set((self._connection,))
def call(self, *args, **kwargs):
return self._connection.call(*args, **kwargs)
#!/usr/bin/env python
#coding:utf8
from sjrpc.core.rpcconnection import *
from sjrpc.core.connectionmanagers import *
from sjrpc.core.callers import *
from sjrpc.core.exceptions import *
__all__ = ('ConnectionManager', 'RpcConnection', 'RpcCaller', 'RpcError',
'ThreadedRpcCaller', )
#!/usr/bin/env python
#coding:utf8
import threading
class RpcCaller(object):
'''
A caller execute a callable (function, method, class which implement the
:meth:`__call__` method...) in a particular context (threaded or
"timeouted" for example), and return the result (or the exception) to the
peer through it :class:`RpcConnection` object.
'''
def __init__(self, request, connection, func):
self._request = request
self._connection = connection
self._func = func
def run(self):
'''
Run the callable and return the result (or the exception) to the peer.
'''
msg_id = self._request['id']
args = self._request['args']
kwargs = self._request['kwargs']
if not getattr(self._func, '__pure__', False):
args.insert(0, self._connection)
try:
returned = self._func(*args, **kwargs)
except Exception as err:
self._connection.error(msg_id, message=str(err),
error=err.__class__.__name__)
else:
self._connection.response(msg_id, returned=returned)
def start(self):
'''
Start execution of the callable, the most of time, it just call
:meth:`run` method.
'''
self.run()
class ThreadedRpcCaller(RpcCaller):
'''
A caller which make the call into a separated thread.
'''
def __init__(self, *args, **kwargs):
super(ThreadedRpcCaller, self).__init__(*args, **kwargs)
self._thread = threading.Thread(target=self.run)
self._thread.daemon = True
def start(self):
self._thread.start()
#!/usr/bin/env python
#coding:utf8
import select
import threading
class ConnectionManager(object):
'''
Base class for all connection manager classes.
'''
# The timeout to wait before the poll call release the hand with no events:
POLL_TIMEOUT = 1
# Masks for fd registration on poll object:
MASK_NORMAL = (select.EPOLLIN | select.EPOLLPRI |
select.EPOLLERR | select.EPOLLHUP)
MASK_WRITABLE = MASK_NORMAL | select.EPOLLOUT
def __init__(self):
self._poll = select.epoll()
self._running = True
self._received_msg = {}
self._wait_groups = {}
def register(self, connection):
'''
Register a :class:`RpcConnection` object on this manager.
:param connection: the instance of :class:`RpcConnection` to register
:type param: instance of :class:`RpcConnection`
'''
self._poll.register(connection.get_fd(), ConnectionManager.MASK_NORMAL)
def is_running(self):
return self._running
def run(self):
'''
Run the main loop of the :class:`ConnectionManager`. It will catch
events on registered :class:`RpcConnection` and process them.
'''
while self._running:
events = self._poll.poll(ConnectionManager.POLL_TIMEOUT)
for fd, event in events:
self.handle_event(fd, event)
def start(self, daemonize=False):
'''
Run the main loop in a separated thread.
:param daemonize: set the thread daemon state
'''
t = threading.Thread(target=self.run)
t.daemon = daemonize
t.start()
def wait(self, msg_id_set, timeout=None, wait_all=True):
'''
Wait for the asynchronous messages in ``msg_id_set``.
When the timeout argument is present and not ``None``, it should be a
floating point number specifying a timeout for the operation in
seconds (or fractions thereof).
You can also set ``wait_all`` to False if you want to unlock the call
when the first response is received.
:param msg_id_set: set of message to wait
:type msg_id_set: :class:`frozenset`
:param timeout: timeout value or None to disable timeout (default: None)
:type timeout: :class:`int` or :class:`None`
:param wait_all: wait for all messages (default: True)
:type wait_all: :class:`bool`
.. warning:
This is important that ``msg_id_set`` is a :class:`frozenset`
and not a :class:`set`.
'''
waiter = {'event': threading.Event(), 'wait_all': wait_all}
self._wait_groups.setdefault(msg_id_set, waiter)
already_completed = self._check_waiter(msg_id_set)
if not already_completed:
waiter['event'].wait(timeout=timeout)
messages = waiter['responses']
del self._wait_groups[msg_id_set]
return messages
def signal_arrival(self, message):
'''
Signal the arrival of a new message to the :class:`ConnectionManager`.
This method is ordinary called by the :class:`RpcConnections` objects,
when a response to an asynchronous call is received.
:param message: the message received
'''
self._received_msg[message['id']] = message
for waitset in self._wait_groups.keys():
self._check_waiter(waitset)
def _check_waiter(self, waitset):
'''
Check if a waitset is completed and process it.
:param waitset: the waitset to check
:return: True if waitset is completed else None
'''
# Make a set of received messages ids:
recv_msg = set(self._received_msg)
try:
waiter = self._wait_groups[waitset]
except KeyError:
return False
is_ok = (waiter['wait_all'] and waitset <= recv_msg
or not waiter['wait_all'] and not recv_msg.isdisjoint(waitset))
if is_ok:
# Clean the call list on each attached RpcConnection
for connection in self.all_connections():
connection.clean_all_calls(waitset)
# Get the messages:
messages = []
for msg_id, msg in self._received_msg.items():
if msg_id in waitset:
messages.append(msg)
del self._received_msg[msg_id]
waiter['responses'] = tuple(messages)
# Unlock the event:
waiter['event'].set()
return True
else:
return False
def all_connections(self):
'''
Return all connection attached to this :class:`ConnectionManager`.
:return: a set of :class:`RpcConnection` attached
to this :class:`ConnectionManager`
'''
raise NotImplementedError
def shutdown(self):
'''
Shutdown the manager properly.
'''
self._running = False
def data_to_write(self, connection):
'''
Method called by a connection to inform the manager that it have data
to send.
:param connection: the :class:`RpcConnection` object which inform the
manager
'''
fd = connection.get_fd()
self._poll.modify(fd, ConnectionManager.MASK_WRITABLE)
def nothing_to_write(self, connection):
'''
Method called by a connection to inform the manager that it have no
more data to send.
:param connection: the :class:`RpcConnection` object which inform the
manager
'''
fd = connection.get_fd()
self._poll.modify(fd, ConnectionManager.MASK_NORMAL)
def handle_event(self, fd, event):
'''
Handle an event and make associated action. This is an abstract method to
overload on derived classes.
:param fd: the fd that have generated the event
:param event: the event as returned by the poller object
'''
pass
#!/usr/bin/env python
#coding:utf8
class RpcError(Exception):
'''
Exception raised by caller when an error occurs while execution of remote
procedure call.
'''
def __init__(self, exception, message):
self.exception = exception
self.message = message
def __str__(self):
return '%s: %s' % (self.exception, self.message)
This diff is collapsed.
#!/usr/bin/env python
#coding:utf8
from rpc.client import SimpleRpcClient
from rpc.utils import ConnectionProxy
import socket
import threading
# Initialization of the client socket:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.connect(('127.0.0.1', 1234))
# Create the client objet binded on the socket we create above:
client = SimpleRpcClient(sock)
# Launch the event loop in separated thread:
threading.Thread(target=client.run).start()
# Create the proxy object that allow us to call easily methods on the server:
proxy = ConnectionProxy(client)
print '42 + 42 = ', proxy.add(42, 42)
# You can start this script with -i argument of the python interpreter to call
# methods with proxy interactively.
#!/usr/bin/env python
#coding:utf8
from rpc.server import SimpleRpcServer
import socket
def add(op1, op2):
# This will be printed on server side:
print 'add %s + %s' % (op1, op2)
# This will be returned to the client:
return op1 + op2
handler = {
'add': add
}
# Initialization of the server socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('0.0.0.0', 1234))
sock.listen(5)
# Create the server objet binded on the socket we create above:
server = SimpleRpcServer(sock, default_handler=handler)
# Launch the server's event loop with proper exit when exception is raised:
try:
server.run()
except Exception as err:
print err
server.shutdown()
#!/usr/bin/env python
#coding:utf8
from sjrpc.server.simple import *
__all__ = ('SimpleRpcServer', 'SimpleSslRpcServer')
#!/usr/bin/env python
#coding=utf8
import ssl
import time
import socket
import select
from sjrpc.core import RpcConnection, ConnectionManager
class SimpleRpcServer(ConnectionManager):
'''
A simple RPC Server that wait for new connections and dispatch messages
from thoses are already established.
:param sock: the :class:`socket` object to bind to the server connection
:param default_handler: the default handler to bind to the new client
connections
'''
def __init__(self, sock, default_handler=None, on_disconnect=None):
super(SimpleRpcServer, self).__init__()
sock.setblocking(False)
self._listening_sock = sock
self._poll.register(sock)
self._clients = {}
self._default_handler = default_handler
self._on_disconnect = on_disconnect
def _accept_connection(self):
return self._listening_sock.accept()
def register(self, connection):
super(SimpleRpcServer, self).register(connection)
self._clients[connection.get_fd()] = connection
def shutdown(self):
super(SimpleRpcServer, self).shutdown()
time.sleep(ConnectionManager.POLL_TIMEOUT)
for connection in self._clients.values():
connection.shutdown(self._on_disconnect)
self._listening_sock.close()
def all_connections(self):
return set(self._clients.values())
def handle_event(self, fd, event):
if fd == self._listening_sock.fileno():
# Event concerns the listening socket:
if event & select.EPOLLIN:
sock, address = self._accept_connection()
sock.setblocking(False)
connection = RpcConnection(sock, self,
handler=self._default_handler)
self.register(connection)
else:
# Event concerns a client socket:
connection = self._clients[fd]
if event & select.EPOLLIN:
# Data are ready to be readed on socket
try:
connection.receive()
except socket.error as err:
connection.shutdown(self._on_disconnect)
del self._clients[fd]
if event & select.EPOLLOUT:
# Data are ready to be written on socket
try:
connection.send()
except socket.error as err:
connection.shutdown(self._on_disconnect)
del self._clients[fd]
if event & select.EPOLLHUP:
connection.shutdown(self._on_disconnect)
del self._clients[fd]
class SimpleSslRpcServer(SimpleRpcServer):
'''
A simple RPC Server that wait for new connections and dispatch messages
from thoses are already established. This server version enable SSL on
client sockets.
:param sock: the :class:`socket` object to bind to the server connection
:param default_handler: the default handler to bind to the new client
connections
'''
def __init__(self, sock, certfile=None, keyfile=None, **kwargs):
self._certfile = certfile
self._keyfile = keyfile
super(SimpleSslRpcServer, self).__init__(sock, **kwargs)
def _accept_connection(self):
sock, address = self._listening_sock.accept()
sslsock = ssl.wrap_socket(sock, server_side=True, keyfile=self._keyfile,
certfile=self._certfile,
ssl_version=ssl.PROTOCOL_TLSv1,
do_handshake_on_connect=True)
return sslsock, address
#!/usr/bin/env python
#coding:utf8
from sjrpc.utils.datastructures import *
from sjrpc.utils.proxies import *
from sjrpc.utils.handlers import *
__all__ = ('BytesBuffer', 'ConnectionProxy', 'RpcHandler', 'threadless', 'pure')
#!/usr/bin/env python
#coding:utf8
import threading
class BytesBuffer(object):
'''
BytesBuffers objects are useful to create socket buffers.
Its behavior is pretty simple and looks like a FIFO queue: you can push data
on it with push method, and pull a specified amount of data.
.. note::
All calls to :class:`BytesBuffer` objects are thread-safe.
'''
def __init__(self, data=None):
if data is None:
self._buf = ''
else:
self._buf = data
self._lock = threading.Lock()
def push(self, data):
'''
Push data on buffer.
:param data: the data to push on the buffer
:type data: :class:`str` object
'''
self._lock.acquire()
self._buf += data
self._lock.release()
def pull(self, size=0):
'''
Pull the specified amount of data on the buffer.
If size is equal to 0, will return the entire content. If buffer
contains less of data than asked, :meth:`pull` will return all the
available data.
:param size: size (in bytes) of data to pull on the buffer