import ssl import time import errno import socket import select import logging import threading from sjrpc.core import RpcConnection import pyev class RpcServer(object): ''' Base class for all RpcServer classes. ''' NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK) def __init__(self, sock, loop=None, conn_args=(), conn_kw={}): self._clients = set() self.logger = logging.getLogger('sjrpc') if loop is None: self.loop = pyev.default_loop() else: self.loop = loop self._conn_args = conn_args self._conn_kw = conn_kw self._sock = sock self._sock_watcher = self.loop.io(self._sock, pyev.EV_READ, self._handle) self._sock_watcher.start() @classmethod def from_addr(cls, addr, port, *args, **kwargs): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((addr, port)) sock.setblocking(False) sock.listen(5) return cls(sock, *args, **kwargs) def _handle(self, watcher, revents): while True: try: sock, address = self._sock.accept() except socket.error as err: if err.args[0] in self.NONBLOCKING_ERRORS: break else: raise #FIXME self.logger.info('New incoming connection from %s:%s', *address) conn = RpcConnection(sock, self.loop, *self._conn_args, **self._conn_kw) self.register(conn) # # Public methods: # def register(self, conn): ''' Register a new connection on this server. :param conn: the connection to register. ''' self._clients.add(conn) def unregister(self, conn, shutdown=False): ''' Unregister the specified client from this server. If shutdown is specified, client is shutdown before to be unregistered. :param conn: the connection to unregister :param shutdown: shutdown or not the connection before to register ''' if conn in self._clients: if shutdown: conn.shutdown() self._clients.remove(conn) def run(self): ''' Run the :class:`RpcServer`. ''' self.loop.start() def shutdown(self): ''' Shutdown the :class:`RpcServer` instance. ''' self.logger.info('Shutdown requested') self._sock_watcher.stop() for client in self._clients.copy(): self.unregister(client, shutdown=True) self.loop.stop(pyev.EVBREAK_ALL)