import ssl import time import socket import select import logging from sjrpc.core import RpcConnection, GreenRpcConnection class RpcServer(object): ''' Base class for all RpcServer classes. ''' def __init__(self): self._clients = set() self.logger = logging.getLogger('sjrpc') def register(self, conn): ''' Register a new connection on this server. :param conn: the connection to register. ''' self._clients.add(conn) def unregister(self, conn, shutdown=False): ''' Unregister the specified client from this server. If shutdown is specified, client is shutdown before to be unregistered. :param conn: the connection to unregister :param shutdown: shutdown or not the connection before to register ''' if conn in self._clients: if shutdown: conn.shutdown() self._clients.remove(conn) def run(self): raise NotImplementedError('You must use a sub-class of RpcServer.') def shutdown(self): ''' Shutdown the :class:`RpcServer` instance. ''' self.logger.info('Shutdown requested') for client in self._clients.copy(): self.unregister(client, shutdown=True) class GreenRpcServer(RpcServer): ''' An sjrpc server that use Gevent and its Greenlets to handle client connections. :param addr: the ip address to connect to :param port: the tcp port to connect to :param conn_args, conn_kw: the arguments to pass to the client :class:`RpcConnection` instance .. note:: At this time, the server must be ran into that imported this module. This is a limitation of Gevent 0.x and this should be disappear when Gevent will be used. ''' def __init__(self, addr, port, conn_args=(), conn_kw={}, *args, **kwargs): from gevent.server import StreamServer super(GreenRpcServer, self).__init__(*args, **kwargs) self._conn_args = conn_args self._conn_kw = conn_kw self._server = StreamServer((addr, port), self._handler) def _handler(self, sock, address): conn = GreenRpcConnection(sock, *self._conn_args, **self._conn_kw) self.register(conn) RpcConnection.run(conn) #FIXME def run(self): #self._server.serve_forever() #FIXME: Sometime, when shutdown is called, _server.serve_forever stay # stuck and never return. This is maybe a problem with gevent, # but this workaround seem to work. self._server.start() while not self._server._stopped_event.is_set(): self._server._stopped_event.wait(2) def shutdown(self): super(GreenRpcServer, self).shutdown() self._server.stop() class SSLGreenRpcServer(GreenRpcServer): ''' The SSL version of :class:`GreenRpcServer`. All connecting client are automatically wrapped into and SSL connection. You must provide certfile and keyfile to make this work properly. ''' def __init__(self, addr, port, conn_args=(), conn_kw={}, certfile=None, keyfile=None, *args, **kw): super(GreenRpcServer, self).__init__(*args, **kw) from gevent.server import StreamServer self._conn_args = conn_args self._conn_kw = conn_kw self._server = StreamServer((addr, port), self._handler, certfile=certfile, keyfile=keyfile)