import gc import ssl import time import errno import socket import select import weakref import logging import threading from sjrpc.core import RpcConnection import pyev class RpcServer(object): ''' Base class for all RpcServer classes. ''' NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK) def __init__(self, sock, loop=None, conn_args=(), conn_kw={}): self._clients = set() self.logger = logging.getLogger('sjrpc') if loop is None: self.loop = pyev.default_loop() else: self.loop = loop self._conn_args = conn_args self._conn_kw = conn_kw self._sock = sock self._sock_watcher = self.loop.io(self._sock, pyev.EV_READ, self._handle) self._sock_watcher.start() @classmethod def from_addr(cls, addr, port, *args, **kwargs): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((addr, port)) sock.setblocking(False) sock.listen(5) return cls(sock, *args, **kwargs) def _wrap(self, sock): ''' Wrap the socket into the :class:`RpcConnection` and return it. ''' return RpcConnection(sock, self.loop, *self._conn_args, **self._conn_kw) def _handle(self, watcher, revents): while True: try: sock, address = self._sock.accept() except socket.error as err: if err.errno in self.NONBLOCKING_ERRORS: break else: raise #FIXME self.logger.info('New incoming connection from %s:%s', *address) conn = self._wrap(sock) self.register(conn) def _clean_conn(self, ref): ''' Callback called by weakref when an object is about to be collected by garbage collector. ''' try: self._clients.remove(ref) except KeyError: pass # # Public methods: # def register(self, conn): ''' Register a new connection on this server. :param conn: the connection to register. ''' self._clients.add(weakref.ref(conn, self._clean_conn)) gc.collect() # Force a manual garbage collection to avoid memory leak # with RpcConnections. This is maybe not required but I # need to read docs about python's gc and circular refs. def unregister(self, conn, shutdown=False): ''' Unregister the specified client from this server. If shutdown is specified, client is shutdown before to be unregistered. :param conn: the connection to unregister :param shutdown: shutdown or not the connection before to register ''' if conn in self._clients: if shutdown: conn.shutdown() def run(self): ''' Run the :class:`RpcServer`. ''' self.loop.start() def shutdown(self): ''' Shutdown the :class:`RpcServer` instance. ''' self.logger.info('Shutdown requested') self._sock_watcher.stop() for client in self._clients.copy(): self.unregister(client, shutdown=True) self.loop.stop(pyev.EVBREAK_ALL) class SSLRpcServer(RpcServer): ''' SSL version of the RpcServer. ''' def __init__(self, sock, certfile, keyfile, loop=None, *args, **kwargs): super(SSLRpcServer, self).__init__(sock, loop, *args, **kwargs) self._certfile = certfile self._keyfile = keyfile def _wrap(self, sock): sock = ssl.wrap_socket(sock, server_side=True, keyfile=self._keyfile, certfile=self._certfile, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=True) return super(SSLRpcServer, self)._wrap(sock)