#!/usr/bin/env python #coding=utf8 import ssl import time import socket import select import logging from sjrpc.core import RpcConnection, ConnectionManager class SimpleRpcServer(ConnectionManager): ''' A simple RPC Server that wait for new connections and dispatch messages from thoses are already established. :param sock: the :class:`socket` object to bind to the server connection :param default_handler: the default handler to bind to the new client connections ''' def __init__(self, sock, default_handler=None, on_disconnect=None): super(SimpleRpcServer, self).__init__() sock.setblocking(False) self._listening_sock = sock self._poll.register(sock) self._clients = {} self._default_handler = default_handler self._on_disconnect = on_disconnect def _accept_connection(self): return self._listening_sock.accept() def register(self, connection): super(SimpleRpcServer, self).register(connection) self._clients[connection.get_fd()] = connection def shutdown(self): super(SimpleRpcServer, self).shutdown() time.sleep(ConnectionManager.POLL_TIMEOUT) for connection in self._clients.values(): connection.shutdown(self._on_disconnect) self._listening_sock.close() def shutdown_client(self, fd): conn = self._clients.get(fd) try: self._poll.unregister(fd) except IOError: pass if fd is not None: try: del self._clients[fd] except KeyError: pass if conn is not None: conn.shutdown(callback=self._on_disconnect) def all_connections(self): return set(self._clients.values()) def handle_event(self, fd, event): if fd == self._listening_sock.fileno(): # Event concerns the listening socket: if event & select.EPOLLIN: accepted = self._accept_connection() if accepted is not None: sock, address = accepted sock.setblocking(False) connection = RpcConnection(sock, self, handler=self._default_handler) self.register(connection) else: # Event concerns a client socket: connection = self._clients[fd] if event & select.EPOLLIN: # Data are ready to be readed on socket try: connection.receive() except socket.error as err: logging.error('Socket error while receiving from client ' 'fd/%s: %s' % (fd, err)) self.shutdown_client(fd) except Exception as err: logging.error('Unknown error while receiving from client ' 'fd/%s: %s' % (fd, err)) self.shutdown_client(fd) if event & select.EPOLLOUT: # Data are ready to be written on socket try: connection.send() except socket.error as err: logging.error('Socket error while sending to the client ' 'fd/%s: %s' % (fd, err)) self.shutdown_client(fd) except Exception as err: logging.error('Unknown error while sending to the client ' 'fd/%s: %s' % (fd, err)) self.shutdown_client(fd) if event & select.EPOLLHUP: logging.error('Socket HUP fd/%s: %s' % (fd, err)) self.shutdown_client(fd) class SimpleSslRpcServer(SimpleRpcServer): ''' A simple RPC Server that wait for new connections and dispatch messages from thoses are already established. This server version enable SSL on client sockets. :param sock: the :class:`socket` object to bind to the server connection :param default_handler: the default handler to bind to the new client connections ''' def __init__(self, sock, certfile=None, keyfile=None, **kwargs): self._certfile = certfile self._keyfile = keyfile super(SimpleSslRpcServer, self).__init__(sock, **kwargs) def _accept_connection(self): sock, address = self._listening_sock.accept() try: sslsock = ssl.wrap_socket(sock, server_side=True, keyfile=self._keyfile, certfile=self._certfile, ssl_version=ssl.PROTOCOL_TLSv1, do_handshake_on_connect=True) except ssl.SSLError as err: logging.error('Error when accepting ssl connection: %s' % err) else: return sslsock, address