Skip to content
rpcconnection.py 11.5 KiB
Newer Older
Antoine Millet's avatar
Antoine Millet committed

'''
This module contains the RpcConnection class, more informations about this
class are located in it docstring.
'''

from __future__ import absolute_import

Antoine Millet's avatar
Antoine Millet committed
import ssl
import struct
import socket
import logging

from sjrpc.core.protocols.rpc import RpcProtocol
from sjrpc.core.exceptions import RpcError, SocketRpcError
Antoine Millet's avatar
Antoine Millet committed


class RpcConnection(object):
    '''
    This class manage a single peer connection.

    :param sock: the socket object of this newly created :class:`RpcConnection`
    :param \*args,\*\*kwargs: arguments to pass to the default rpc protocol
        automatically registered on label 0.
Antoine Millet's avatar
Antoine Millet committed
    '''
Antoine Millet's avatar
Antoine Millet committed

    MESSAGE_HEADER_FALLBACK = '!L'
    SHORTCUTS_MAINRPC = ('call', 'async_call')
Antoine Millet's avatar
Antoine Millet committed

    def __init__(self, sock, *args, **kwargs):
Antoine Millet's avatar
Antoine Millet committed
        # Sock of this connection:
        self._sock = sock

        # Activate TCP keepalive on the connection:
        #self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
Antoine Millet's avatar
Antoine Millet committed

        # Is the RpcConnection connected to its peer:
        self._connected = True

        # Setup self.logger.facility:
        self.logger = logging.getLogger('sjrpc.%s' % self.getpeername())

        # Protocols registered on this connection:
        self._protocols = {}
        self.register_protocol(0, RpcProtocol, *args, **kwargs)

        # Create shortcuts to the main rpc (protocol 0) for convenience:
        for name in RpcConnection.SHORTCUTS_MAINRPC:
            setattr(self, name, getattr(self.get_protocol(0), name))
        # By default, enter in fallback mode, no label, all frames are
        # redirected on Rpc0:
        self.fallback = True
        # Send our capabilities to the peer
        self._remote_capabilities = None
        self._send_capabilities()

Antoine Millet's avatar
Antoine Millet committed
    @classmethod
    def from_addr(cls, addr, port, conn_timeout=30.0, *args, **kwargs):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Construct the instance of :class:`RpcConnection` without providing
        the :class:`socket` object. Socket is automatically created and passed
        to the standard constructor before to return the new instance.

        :param addr: the target ip address
        :param port: the target port
        :param conn_timeout: the connection operation timeout
        :param \*args,\*\*kwargs: extra argument to pass to the constructor (see
Antoine Millet's avatar
Antoine Millet committed
        '''
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(conn_timeout)
Antoine Millet's avatar
Antoine Millet committed
        sock.connect((addr, port))
        return cls(sock, *args, **kwargs)

    @classmethod
    def from_addr_ssl(cls, addr, port, cert=None,
                      conn_timeout=30, *args, **kwargs):
        '''
        Construct :class:`RpcConnection` instance like :meth:`from_addr`, but
        enable ssl on socket.

        :param cert: ssl client certificate or None for ssl without certificat
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(conn_timeout)
        sock.connect((addr, port))
        sock.setblocking(True)
        req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED
        sock = ssl.wrap_socket(sock, certfile=cert, cert_reqs=req,
                               ssl_version=ssl.PROTOCOL_TLSv1)
        return cls(sock, *args, **kwargs)
Antoine Millet's avatar
Antoine Millet committed

    def __repr__(self):
Antoine Millet's avatar
Antoine Millet committed

    def __hash__(self):
        return self._sock.__hash__()

Antoine Millet's avatar
Antoine Millet committed
        '''
Antoine Millet's avatar
Antoine Millet committed
        '''
        while self._connected:
            try:
                self._dispatch()
            except SocketRpcError:
                # If SocketRpcError occurs while dispatching, shutdown the
                # connection if it not already shutdown:
                if self._connected:
                    self.shutdown()
    def _enable_fallback(self):
        pass

    def _disable_fallback(self):
        pass

    def _send_capabilities(self):
        '''
        Send capabilities to the peer, only work in fallback mode for
        compatibility with old sjRpc.

        Send a special message through the Rpc0 with these fields:
        - special: 'capabilities'
        - capabilities: {'version': REMOTE_VERSION, 'capabilities': []}
        '''
        from sjrpc import __version__
        cap = {'version': __version__, 'capabilities':['rpc', 'tunnel']}
        rpc0 = self.get_protocol(0)
        rpc0.send_special('capabilities', capabilities=cap)

        Read next message from socket and dispatch it to accoding protocol
        handler.
        if self.fallback:
            buf = self.recv_until(struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK))
            pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER_FALLBACK, buf)[0]
            label = 0
        else:
            buf = self.recv_until(struct.calcsize(RpcConnection.MESSAGE_HEADER))
            label, pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER, buf)
        # Get the registered protocol for the specified label
        proto = self._protocols.get(label)
        if proto is not None:
            proto.handle(label, pl_size)
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        '''
        Low level method to send a message through the socket, generally
        used by protocols.
Antoine Millet's avatar
Antoine Millet committed
        '''
            raise RpcError('RpcError', 'Not connected to the peer')
        size = len(payload)
        if self.fallback:
            header = struct.pack(RpcConnection.MESSAGE_HEADER_FALLBACK, size)
        else:
            header = struct.pack(RpcConnection.MESSAGE_HEADER, label, size)
            if self.fallback:
                data = header + payload
                while data:
                    self._sock.sendall(data[:4096])
                    data = data[4096:]
            else:
                self._sock.sendall(header + payload)
        except socket.error as err:
            errmsg = 'Fatal error while sending through socket: %s' % err
            self.logger.error(errmsg)
            raise RpcError('SocketError', errmsg)
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed

    @property
    def rpc(self):
        return self.get_protocol(0)

    def set_capabilities(self, capabilities):
        '''
        Set capabilities of remote host (and disable fallback mode).

        Should be called by Rpc0 when the peer send its capabilities message.
        '''
        self._remote_capabilities = capabilities
        self.fallback = False

    def register_protocol(self, label, protocol_class, *args, **kwargs):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Register a new protocol for the specified label.
Antoine Millet's avatar
Antoine Millet committed
        '''
Antoine Millet's avatar
Antoine Millet committed

        if label in self._protocols:
            raise KeyError('A protocol is already registered for this label')
        elif not isinstance(label, int):
            raise ValueError('Label must be an integer')
        self._protocols[label] = protocol_class(self, label, *args, **kwargs)
        return self._protocols[label]
Antoine Millet's avatar
Antoine Millet committed

    def unregister_protocol(self, label):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Unregister the specified protocol label for this connection.
Antoine Millet's avatar
Antoine Millet committed
        '''
        if label in self._protocols and label != 0:
            del self._protocols[label]
            raise KeyError('No protocol registered for this label')
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        '''
        Get the protocol registered for specified label.
Antoine Millet's avatar
Antoine Millet committed
        '''
        proto = self._protocols.get(label)
        if proto is None:
            raise KeyError('No protocol registered for this label')
        return proto
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        '''
        Shutdown this connection.
        '''

        # Shutdown each registered protocols:
        for proto in self._protocols.itervalues():
            proto.shutdown()
        # Close the connection socket:
        self._connected = False
            self._sock.shutdown(socket.SHUT_RDWR)
            self._sock.close()
        except socket.error as err:
            #self.logger.warn('Error while socket close: %s', err)
            pass
Antoine Millet's avatar
Antoine Millet committed

    def get_handler(self):
        '''
        Return the handler binded to the :class:`RpcConnection`.

        :return: binded handler
        '''
        return self._handler

    def set_handler(self, handler):
        '''
        Define a new handler for this connection.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :param handler: the new handler to define.
        '''
        self._handler = handler

    def get_fd(self):
        '''
        Get the file descriptor of the socket managed by this connection.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :return: the file descriptor number of the socket
        '''
        try:
            return self._sock.fileno()
        except socket.error:
            return None

    def getpeername(self):
        '''
        Get the peer name.

        :return: string representing the peer name
        '''
        return '%s:%s' % self._sock.getpeername()

    def recv_until(self, bufsize, flags=None):
        '''
        Read socket until bufsize is received.
        '''
        buf = ''
        while len(buf) < bufsize:
            remains = bufsize - len(buf)
            try:
                received = self._sock.recv(remains)
            except socket.error as err:
                if not self._connected:
                    raise SocketRpcError('Not connected to the peer')
                errmsg = 'Fatal error while receiving from socket: %s' % err
                self.logger.error(errmsg)
                raise SocketRpcError(errmsg)

            # Handle peer disconnection:
            if not received:
                self.logger.info('Connection reset by peer')
                self.shutdown()

            buf += received
        return buf


class GreenRpcConnection(RpcConnection):

    '''
    Cooperative RpcConnection to use with Gevent.
    '''

    def __init__(self, *args, **kwargs):
        super(GreenRpcConnection, self).__init__(*args, **kwargs)
        self._greenlet = None

    @classmethod
    def from_addr(cls, addr, port, conn_timeout=30.0, *args, **kwargs):
        '''
        Construct the instance of :class:`RpcConnection` without providing
        the :class:`socket` object. Socket is automatically created and passed
        to the standard constructor before to return the new instance.

        :param addr: the target ip address
        :param port: the target port
        :param conn_timeout: the connection operation timeout
        :param *args, **kwargs: extra argument to pass to the constructor (see
            constructor doctring)
        '''
        import gevent.socket
        sock = gevent.socket.create_connection((addr, port), conn_timeout)
        return cls(sock, *args, **kwargs)

    @classmethod
    def from_addr_ssl(cls, addr, port, cert, conn_timeout=30, *args, **kwargs):
        '''
        Construct :class:`RpcConnection` instance like :meth:`from_addr`, but
        enable ssl on socket.

        :param cert: ssl certificate or None for ssl without certificat
        '''
        import gevent.socket
        sock = gevent.socket.create_connection((addr, port), conn_timeout)
        req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED
        sock = gevent.ssl.SSLSocket(sock, certfile=None, cert_reqs=req,
                                    ssl_version=ssl.PROTOCOL_TLSv1)
        return cls(sock, *args, **kwargs)

    def run(self):
        import gevent
        self._greenlet = gevent.spawn(self.run)
        self._greenlet.join()

    def shutdown(self):
        super(GreenRpcConnection, self).shutdown()
        if self._greenlet is not None:
            self._greenlet.kill()