Newer
Older
import errno
import logging
import threading
from sjrpc.core import RpcConnection
import pyev
class RpcServer(object):
'''
Base class for all RpcServer classes.
'''
NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK)
def __init__(self, sock, loop=None, conn_args=(), conn_kw={}):
self._clients = set()
self.logger = logging.getLogger('sjrpc')
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
if loop is None:
self.loop = pyev.default_loop()
else:
self.loop = loop
self._conn_args = conn_args
self._conn_kw = conn_kw
self._sock = sock
self._sock_watcher = self.loop.io(self._sock, pyev.EV_READ, self._handle)
self._sock_watcher.start()
@classmethod
def from_addr(cls, addr, port, *args, **kwargs):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr, port))
sock.setblocking(False)
sock.listen(5)
return cls(sock, *args, **kwargs)
def _handle(self, watcher, revents):
while True:
try:
sock, address = self._sock.accept()
except socket.error as err:
if err.args[0] in self.NONBLOCKING_ERRORS:
break
else:
raise #FIXME
self.logger.info('New incoming connection from %s:%s', *address)
conn = RpcConnection(sock, self.loop,
*self._conn_args, **self._conn_kw)
self.register(conn)
#
# Public methods:
#
def register(self, conn):
'''
Register a new connection on this server.
:param conn: the connection to register.
'''
self._clients.add(conn)
def unregister(self, conn, shutdown=False):
'''
Unregister the specified client from this server. If shutdown is
specified, client is shutdown before to be unregistered.
:param conn: the connection to unregister
:param shutdown: shutdown or not the connection before to register
'''
if conn in self._clients:
if shutdown:
conn.shutdown()
self._clients.remove(conn)
def run(self):
'''
Run the :class:`RpcServer`.
'''
self.loop.start()
def shutdown(self):
'''
Shutdown the :class:`RpcServer` instance.
'''
self.logger.info('Shutdown requested')
self._sock_watcher.stop()
for client in self._clients.copy():
self.unregister(client, shutdown=True)
self.loop.stop(pyev.EVBREAK_ALL)