Newer
Older
import gc
import errno
import weakref
import logging
import threading
from sjrpc.core import RpcConnection
import pyev
class RpcServer(object):
'''
Base class for all RpcServer classes.
'''
NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK)
def __init__(self, sock, loop=None, conn_args=(), conn_kw={}):
self._clients = set()
self.logger = logging.getLogger('sjrpc')
if loop is None:
self.loop = pyev.default_loop()
else:
self.loop = loop
self._conn_args = conn_args
self._conn_kw = conn_kw
self._sock = sock
self._sock_watcher = self.loop.io(self._sock, pyev.EV_READ, self._handle)
self._sock_watcher.start()
@classmethod
def from_addr(cls, addr, port, *args, **kwargs):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr, port))
sock.setblocking(False)
sock.listen(5)
return cls(sock, *args, **kwargs)
def _wrap(self, sock):
'''
Wrap the socket into the :class:`RpcConnection` and return it.
'''
return RpcConnection(sock, self.loop, *self._conn_args, **self._conn_kw)
def _handle(self, watcher, revents):
while True:
try:
sock, address = self._sock.accept()
except socket.error as err:
break
else:
raise #FIXME
self.logger.info('New incoming connection from %s:%s', *address)
self.register(conn)
def _clean_conn(self, ref):
'''
Callback called by weakref when an object is about to be collected by
garbage collector.
'''
try:
self._clients.remove(ref)
except KeyError:
pass
#
# Public methods:
#
def register(self, conn):
'''
Register a new connection on this server.
:param conn: the connection to register.
'''
self._clients.add(weakref.ref(conn, self._clean_conn))
gc.collect() # Force a manual garbage collection to avoid memory leak
# with RpcConnections. This is maybe not required but I
# need to read docs about python's gc and circular refs.
def unregister(self, conn, shutdown=False):
'''
Unregister the specified client from this server. If shutdown is
specified, client is shutdown before to be unregistered.
:param conn: the connection to unregister
:param shutdown: shutdown or not the connection before to register
'''
if conn in self._clients:
if shutdown:
conn.shutdown()
def run(self):
'''
Run the :class:`RpcServer`.
'''
self.loop.start()
def shutdown(self):
'''
Shutdown the :class:`RpcServer` instance.
'''
self.logger.info('Shutdown requested')
self._sock_watcher.stop()
for client in self._clients.copy():
self.unregister(client, shutdown=True)
self.loop.stop(pyev.EVBREAK_ALL)
class SSLRpcServer(RpcServer):
'''
SSL version of the RpcServer.
'''
def __init__(self, sock, certfile, keyfile, loop=None, *args, **kwargs):
super(SSLRpcServer, self).__init__(sock, loop, *args, **kwargs)
self._certfile = certfile
self._keyfile = keyfile
def _wrap(self, sock):
sock = ssl.wrap_socket(sock, server_side=True,
keyfile=self._keyfile,
certfile=self._certfile,
ssl_version=ssl.PROTOCOL_TLSv1,
do_handshake_on_connect=True)
return super(SSLRpcServer, self)._wrap(sock)