Skip to content
simple.py 2.64 KiB
Newer Older
Antoine Millet's avatar
Antoine Millet committed

import ssl
import time
Antoine Millet's avatar
Antoine Millet committed
import socket
import select
import threading

from sjrpc.core import RpcConnection

import pyev


class RpcServer(object):

    '''
    Base class for all RpcServer classes.
    '''

    NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK)

    def __init__(self, sock, loop=None, conn_args=(), conn_kw={}):
        self._clients = set()
        self.logger = logging.getLogger('sjrpc')
        if loop is None:
            self.loop = pyev.default_loop()
        else:
            self.loop = loop
        self._conn_args = conn_args
        self._conn_kw = conn_kw
        self._sock = sock
        self._sock_watcher = self.loop.io(self._sock, pyev.EV_READ, self._handle)
        self._sock_watcher.start()

    @classmethod
    def from_addr(cls, addr, port, *args, **kwargs):
        sock = socket.socket()
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((addr, port))
        sock.setblocking(False)
        sock.listen(5)
        return cls(sock, *args, **kwargs)

    def _handle(self, watcher, revents):
        while True:
            try:
                sock, address = self._sock.accept()
            except socket.error as err:
                if err.args[0] in self.NONBLOCKING_ERRORS:
                    break
                else:
                    raise #FIXME

            self.logger.info('New incoming connection from %s:%s', *address)
            conn = RpcConnection(sock, self.loop,
                                 *self._conn_args, **self._conn_kw)
            self.register(conn)

#
# Public methods:
#

    def register(self, conn):
        '''
        Register a new connection on this server.

        :param conn: the connection to register.
        '''
        self._clients.add(conn)

    def unregister(self, conn, shutdown=False):
        '''
        Unregister the specified client from this server. If shutdown is
        specified, client is shutdown before to be unregistered.

        :param conn: the connection to unregister
        :param shutdown: shutdown or not the connection before to register
        '''
        if conn in self._clients:
            if shutdown:
                conn.shutdown()
            self._clients.remove(conn)

    def run(self):
        '''
        Run the :class:`RpcServer`.
        '''
        self.loop.start()

    def shutdown(self):
        '''
        Shutdown the :class:`RpcServer` instance.
        '''
        self.logger.info('Shutdown requested')
        for client in self._clients.copy():
            self.unregister(client, shutdown=True)
        self.loop.stop(pyev.EVBREAK_ALL)