Skip to content
simple.py 3.9 KiB
Newer Older
Antoine Millet's avatar
Antoine Millet committed
import ssl
import time
Antoine Millet's avatar
Antoine Millet committed
import socket
import select
import threading
from sjrpc.core import RpcConnection

import pyev


class RpcServer(object):

    '''
    Base class for all RpcServer classes.
    '''

    NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK)

    def __init__(self, sock, loop=None, conn_args=(), conn_kw={}):
        self._clients = set()
        self.logger = logging.getLogger('sjrpc')
        if loop is None:
            self.loop = pyev.default_loop()
        else:
            self.loop = loop
        self._conn_args = conn_args
        self._conn_kw = conn_kw
        self._sock = sock
        self._sock_watcher = self.loop.io(self._sock, pyev.EV_READ, self._handle)
        self._sock_watcher.start()

    @classmethod
    def from_addr(cls, addr, port, *args, **kwargs):
        sock = socket.socket()
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((addr, port))
        sock.setblocking(False)
        sock.listen(5)
        return cls(sock, *args, **kwargs)

Antoine Millet's avatar
Antoine Millet committed
    def _wrap(self, sock):
        '''
        Wrap the socket into the :class:`RpcConnection` and return it.
        '''
        return RpcConnection(sock, self.loop, *self._conn_args, **self._conn_kw)

    def _handle(self, watcher, revents):
        while True:
            try:
                sock, address = self._sock.accept()
            except socket.error as err:
Antoine Millet's avatar
Antoine Millet committed
                if err.errno in self.NONBLOCKING_ERRORS:
                    break
                else:
                    raise #FIXME

            self.logger.info('New incoming connection from %s:%s', *address)
Antoine Millet's avatar
Antoine Millet committed
            conn = self._wrap(sock)
    def _clean_conn(self, ref):
        '''
        Callback called by weakref when an object is about to be collected by
        garbage collector.
        '''
        try:
            self._clients.remove(ref)
        except KeyError:
            pass


    def register(self, conn):
        '''
        Register a new connection on this server.

        :param conn: the connection to register.
        '''
        self._clients.add(weakref.ref(conn, self._clean_conn))
        gc.collect() # Force a manual garbage collection to avoid memory leak
                     # with RpcConnections. This is maybe not required but I
                     # need to read docs about python's gc and circular refs.

    def unregister(self, conn, shutdown=False):
        '''
        Unregister the specified client from this server. If shutdown is
        specified, client is shutdown before to be unregistered.

        :param conn: the connection to unregister
        :param shutdown: shutdown or not the connection before to register
        '''
        if conn in self._clients:
            if shutdown:
                conn.shutdown()

    def run(self):
        '''
        Run the :class:`RpcServer`.
        '''
        self.loop.start()

    def shutdown(self):
        '''
        Shutdown the :class:`RpcServer` instance.
        '''
        self.logger.info('Shutdown requested')
        for client in self._clients.copy():
            self.unregister(client, shutdown=True)
        self.loop.stop(pyev.EVBREAK_ALL)
Antoine Millet's avatar
Antoine Millet committed


class SSLRpcServer(RpcServer):

    '''
    SSL version of the RpcServer.
    '''

    def __init__(self, sock, certfile, keyfile, loop=None, *args, **kwargs):
        super(SSLRpcServer, self).__init__(sock, loop, *args, **kwargs)
        self._certfile = certfile
        self._keyfile = keyfile

    def _wrap(self, sock):
        sock = ssl.wrap_socket(sock, server_side=True,
                               keyfile=self._keyfile,
                               certfile=self._certfile,
                               ssl_version=ssl.PROTOCOL_TLSv1,
                               do_handshake_on_connect=True)
        return super(SSLRpcServer, self)._wrap(sock)