Skip to content
Snippets Groups Projects
Commit d63729bd authored by Antoine Millet's avatar Antoine Millet
Browse files

Refactored polling event management code.

Now use a callback system to handle event from poller.
parent 88d12c96
No related branches found
No related tags found
No related merge requests found
...@@ -22,7 +22,7 @@ class SimpleRpcClient(ConnectionManager): ...@@ -22,7 +22,7 @@ class SimpleRpcClient(ConnectionManager):
self._on_disconnect = on_disconnect self._on_disconnect = on_disconnect
self._connection = connection self._connection = connection
self._connection.set_handler(default_handler) self._connection.set_handler(default_handler)
self.register(self._connection) self.register(self._connection.get_fd(), self._handle_event)
@classmethod @classmethod
def from_addr(cls, addr, port, enable_ssl=False, cert=None, timeout=None, def from_addr(cls, addr, port, enable_ssl=False, cert=None, timeout=None,
...@@ -79,8 +79,8 @@ class SimpleRpcClient(ConnectionManager): ...@@ -79,8 +79,8 @@ class SimpleRpcClient(ConnectionManager):
super(SimpleRpcClient, self).shutdown() super(SimpleRpcClient, self).shutdown()
self._connection.shutdown(self._on_disconnect) self._connection.shutdown(self._on_disconnect)
def handle_event(self, fd, event): def _handle_event(self, fd, events):
if event & select.EPOLLIN: if events & select.EPOLLIN:
# Data are ready to be readed on socket # Data are ready to be readed on socket
try: try:
self._connection.receive() self._connection.receive()
...@@ -89,7 +89,7 @@ class SimpleRpcClient(ConnectionManager): ...@@ -89,7 +89,7 @@ class SimpleRpcClient(ConnectionManager):
'fd/%s: %s', fd, err) 'fd/%s: %s', fd, err)
self.shutdown() self.shutdown()
if event & select.EPOLLOUT: if events & select.EPOLLOUT:
# Data are ready to be written on socket # Data are ready to be written on socket
try: try:
self._connection.send() self._connection.send()
...@@ -98,7 +98,7 @@ class SimpleRpcClient(ConnectionManager): ...@@ -98,7 +98,7 @@ class SimpleRpcClient(ConnectionManager):
'fd/%s: %s', fd, err) 'fd/%s: %s', fd, err)
self.shutdown() self.shutdown()
if event & select.EPOLLHUP: if events & select.EPOLLHUP:
logging.debug('Socket HUP fd/%s', fd) logging.debug('Socket HUP fd/%s', fd)
self.shutdown() self.shutdown()
......
...@@ -22,16 +22,35 @@ class ConnectionManager(object): ...@@ -22,16 +22,35 @@ class ConnectionManager(object):
self._running = True self._running = True
self._received_msg = {} self._received_msg = {}
self._wait_groups = {} self._wait_groups = {}
self._poll_callbacks = {}
def register(self, connection):
def register(self, fd, callback, *args, **kwargs):
''' '''
Register a :class:`RpcConnection` object on this manager. Register an fd on the poll object with the specified callback. The
callback will be called each time poller drop an event for the specified
:param connection: the instance of :class:`RpcConnection` to register fd. Extra args will be passed to the callback after fd and events.
:type param: instance of :class:`RpcConnection`
:param fd: the fd to register
:param callback: the callable to use on event
:param *args, **kwargs: extra arguments passed to the callback
''' '''
self._poll.register(connection.get_fd(), ConnectionManager.MASK_NORMAL) if hasattr(fd, 'fileno'):
fd = fd.fileno()
self._poll_callbacks[fd] = {'func': callback,
'extra': args,
'kwextra': kwargs}
self._poll.register(fd, ConnectionManager.MASK_NORMAL)
def unregister(self, fd):
'''
Unregister the specified fd from the manager.
:param fd: the fd to unregister.
'''
self._poll.unregister(fd)
del self._poll_callbacks[fd]
def is_running(self): def is_running(self):
return self._running return self._running
...@@ -49,7 +68,9 @@ class ConnectionManager(object): ...@@ -49,7 +68,9 @@ class ConnectionManager(object):
pass pass
else: else:
for fd, event in events: for fd, event in events:
self.handle_event(fd, event) if fd in self._poll_callbacks:
cb = self._poll_callbacks[fd]
cb['func'](fd, event, *cb['extra'], **cb['kwextra'])
def start(self, daemonize=False): def start(self, daemonize=False):
''' '''
...@@ -167,29 +188,25 @@ class ConnectionManager(object): ...@@ -167,29 +188,25 @@ class ConnectionManager(object):
self._running = False self._running = False
def data_to_write(self, connection): def data_to_write(self, fd):
''' '''
Method called by a connection to inform the manager that it have data Method called by a connection to inform the manager that it have data
to send. to send.
:param connection: the :class:`RpcConnection` object which inform the :param connection: the fd which have data to write
manager
''' '''
fd = connection.get_fd()
if fd is not None: if fd is not None:
self._poll.modify(fd, ConnectionManager.MASK_WRITABLE) self._poll.modify(fd, ConnectionManager.MASK_WRITABLE)
def nothing_to_write(self, connection): def nothing_to_write(self, fd):
''' '''
Method called by a connection to inform the manager that it have no Method called by a connection to inform the manager that it have no
more data to send. more data to send.
:param connection: the :class:`RpcConnection` object which inform the :param fd: the fd which have no more data to write
manager
''' '''
fd = connection.get_fd()
if fd is not None: if fd is not None:
self._poll.modify(fd, ConnectionManager.MASK_NORMAL) self._poll.modify(fd, ConnectionManager.MASK_NORMAL)
......
...@@ -115,7 +115,7 @@ class RpcConnection(object): ...@@ -115,7 +115,7 @@ class RpcConnection(object):
with self._outbound_buffer: with self._outbound_buffer:
if not len(self._outbound_buffer): if not len(self._outbound_buffer):
self._manager.nothing_to_write(self) self._manager.nothing_to_write(self.get_fd())
def receive(self): def receive(self):
''' '''
...@@ -183,7 +183,7 @@ class RpcConnection(object): ...@@ -183,7 +183,7 @@ class RpcConnection(object):
size = struct.pack('!L', len(json_msg)) size = struct.pack('!L', len(json_msg))
with self._outbound_buffer: with self._outbound_buffer:
self._outbound_buffer.push(size + json_msg) self._outbound_buffer.push(size + json_msg)
self._manager.data_to_write(self) self._manager.data_to_write(self.get_fd())
def _send_call(self, method_name, *args, **kwargs): def _send_call(self, method_name, *args, **kwargs):
''' '''
......
...@@ -22,18 +22,14 @@ class SimpleRpcServer(ConnectionManager): ...@@ -22,18 +22,14 @@ class SimpleRpcServer(ConnectionManager):
super(SimpleRpcServer, self).__init__() super(SimpleRpcServer, self).__init__()
sock.setblocking(False) sock.setblocking(False)
self._listening_sock = sock self._listening_sock = sock
self._poll.register(sock) self.register(sock, self._handle_master_event)
self._clients = {} self._clients = {}
self._default_handler = default_handler self._default_handler = default_handler
self._on_disconnect = on_disconnect self._on_disconnect = on_disconnect
def _accept_connection(self): def _accept_connection(self):
return self._listening_sock.accept() return self._listening_sock.accept()
def register(self, connection):
super(SimpleRpcServer, self).register(connection)
self._clients[connection.get_fd()] = connection
def shutdown(self): def shutdown(self):
super(SimpleRpcServer, self).shutdown() super(SimpleRpcServer, self).shutdown()
time.sleep(ConnectionManager.POLL_TIMEOUT) time.sleep(ConnectionManager.POLL_TIMEOUT)
...@@ -44,7 +40,7 @@ class SimpleRpcServer(ConnectionManager): ...@@ -44,7 +40,7 @@ class SimpleRpcServer(ConnectionManager):
def shutdown_client(self, fd): def shutdown_client(self, fd):
conn = self._clients.get(fd) conn = self._clients.get(fd)
try: try:
self._poll.unregister(fd) self.unregister(fd)
except IOError: except IOError:
pass pass
if fd is not None: if fd is not None:
...@@ -58,51 +54,51 @@ class SimpleRpcServer(ConnectionManager): ...@@ -58,51 +54,51 @@ class SimpleRpcServer(ConnectionManager):
def all_connections(self): def all_connections(self):
return set(self._clients.values()) return set(self._clients.values())
def handle_event(self, fd, event): def _handle_master_event(self, fd, events):
if fd == self._listening_sock.fileno(): # Event concerns the listening socket:
# Event concerns the listening socket: if events & select.EPOLLIN:
if event & select.EPOLLIN: accepted = self._accept_connection()
accepted = self._accept_connection() if accepted is not None:
if accepted is not None: sock, address = accepted
sock, address = accepted sock.setblocking(False)
sock.setblocking(False) connection = RpcConnection(sock, self,
connection = RpcConnection(sock, self, handler=self._default_handler)
handler=self._default_handler) self.register(connection.get_fd(), self._handle_client_event)
self.register(connection) self._clients[connection.get_fd()] = connection
else:
# Event concerns a client socket:
connection = self._clients[fd]
if event & select.EPOLLIN:
# Data are ready to be readed on socket
try:
connection.receive()
except socket.error as err:
logging.debug('Socket error while receiving from client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd)
except Exception as err:
logging.debug('Unknown error while receiving from client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd)
if event & select.EPOLLOUT:
# Data are ready to be written on socket
try:
connection.send()
except socket.error as err:
logging.debug('Socket error while sending to the client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd)
except Exception as err:
logging.debug('Unknown error while sending to the client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd)
if event & select.EPOLLHUP: def _handle_client_event(self, fd, events):
logging.debug('Socket HUP fd/%s', fd) connection = self._clients[fd]
if events & select.EPOLLIN:
# Data are ready to be readed on socket
try:
connection.receive()
except socket.error as err:
logging.debug('Socket error while receiving from client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd)
except Exception as err:
logging.debug('Unknown error while receiving from client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd) self.shutdown_client(fd)
if events & select.EPOLLOUT:
# Data are ready to be written on socket
try:
connection.send()
except socket.error as err:
logging.debug('Socket error while sending to the client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd)
except Exception as err:
logging.debug('Unknown error while sending to the client '
'fd/%s: %s', fd, err)
self.shutdown_client(fd)
if events & select.EPOLLHUP:
logging.debug('Socket HUP fd/%s', fd)
self.shutdown_client(fd)
class SimpleSslRpcServer(SimpleRpcServer): class SimpleSslRpcServer(SimpleRpcServer):
''' '''
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment