Skip to content 12.4 KiB
Newer Older
Antoine Millet's avatar
Antoine Millet committed

This module contains the RpcConnection class, more informations about this
class are located in it docstring.

from __future__ import absolute_import

Antoine Millet's avatar
Antoine Millet committed
import ssl
import struct
import socket
import logging

from sjrpc.core.protocols.rpc import RpcProtocol
from sjrpc.core.protocols import Protocol
from sjrpc.core.exceptions import RpcError
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed

class RpcConnection(object):
    This class manage a single peer connection.
Antoine Millet's avatar
Antoine Millet committed
    You can wrap an existing socket with the default constructor::

      >>> conn = RpcConnection(mysocket)

    Or create a new socket automatically with from_addr constructor::

      >>> conn = RpcConnection.from_addr(host, port)

    If you prefer SSL connection, you can use the from_addr_ssl constructor::

      >>> conn = RpcConnection.from_addr_ssl(host, port)

    By default, an :class:`RpcProtocol` is created on label 0, you can access
    to this rpc through the `conn.rpc` shortcut::


    Also, the connection object expose :meth:`call` and :meth:`async_call`
    method from default rpc, so you can use it directly on connection::

     >>>'ping') # Equivalent to the exemple before

    .. seealso::
       You can read the :ref:`Default rpc, aka Rpc0` section to know more about
       the default rpc

    :param sock: the socket object of this newly created :class:`RpcConnection`
    :param \*args,\*\*kwargs: arguments to pass to the default rpc protocol
        automatically registered on label 0.
Antoine Millet's avatar
Antoine Millet committed
Antoine Millet's avatar
Antoine Millet committed

    SHORTCUTS_MAINRPC = ('call', 'async_call')
Antoine Millet's avatar
Antoine Millet committed

    def __init__(self, sock, loop=None, enable_tcp_keepalive=False,
                 *args, **kwargs):
Antoine Millet's avatar
Antoine Millet committed
        # Sock of this connection:
        self._sock = sock

        # Get the pyev loop:
        if loop is None:
            self.loop = pyev.default_loop()
            self.loop = loop

        # Activate TCP keepalive on the connection:
        if enable_tcp_keepalive:
            self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
Antoine Millet's avatar
Antoine Millet committed

        # Watcher list:
        self._watchers = set()

        # Socket inbound/outbound buffers:
        self._inbound_buffer = ''
        self._outbound_buffer = ''
        self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
        self._proto_receiving = None

        # Initialize main read/write watchers:
        self._sock_reader = self.create_watcher(pyev.Io,
        self._sock_writer = self.create_watcher(pyev.Io,
        # Is the RpcConnection connected to its peer:
        self._connected = True

        # "Need to send" loop signal:
        self._need_to_send = self.create_watcher(pyev.Async,

Antoine Millet's avatar
Antoine Millet committed
        # Setup logging facility:
        self.logger = logging.getLogger('sjrpc.%s' % self.getpeername())

        # Protocols registered on this connection:
        self._protocols = {}
        self.register_protocol(0, RpcProtocol, *args, **kwargs)

        # Create shortcuts to the main rpc (protocol 0) for convenience:
        for name in RpcConnection.SHORTCUTS_MAINRPC:
            setattr(self, name, getattr(self.get_protocol(0), name))
        # By default, enter in fallback mode, no label, all frames are
        # redirected on Rpc0:
        self.fallback = True
Antoine Millet's avatar
Antoine Millet committed

        # Send our capabilities to the peer:
        self._remote_capabilities = None

Antoine Millet's avatar
Antoine Millet committed
    def from_addr(cls, addr, port, conn_timeout=30.0, *args, **kwargs):
Antoine Millet's avatar
Antoine Millet committed
        Construct the instance of :class:`RpcConnection` without providing
        the :class:`socket` object. Socket is automatically created and passed
        to the standard constructor before to return the new instance.

        :param addr: the target ip address
        :param port: the target port
        :param conn_timeout: the connection operation timeout
        :param \*args,\*\*kwargs: extra argument to pass to the constructor (see
Antoine Millet's avatar
Antoine Millet committed
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
Antoine Millet's avatar
Antoine Millet committed
        sock.connect((addr, port))
        return cls(sock, *args, **kwargs)

    def from_addr_ssl(cls, addr, port, cert=None,
                      conn_timeout=30, *args, **kwargs):
        Construct :class:`RpcConnection` instance like :meth:`from_addr`, but
        enable ssl on socket.

        :param cert: ssl client certificate or None for ssl without certificat
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((addr, port))
        req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED
        sock = ssl.wrap_socket(sock, certfile=cert, cert_reqs=req,
        return cls(sock, *args, **kwargs)
Antoine Millet's avatar
Antoine Millet committed

    def __repr__(self):
Antoine Millet's avatar
Antoine Millet committed

    def __hash__(self):
        return self._sock.__hash__()

    def __nonzero__(self):
        return self._connected

    def _send_capabilities(self):
        Send capabilities to the peer, only work in fallback mode for
        compatibility with old sjRpc.

        Send a special message through the Rpc0 with these fields:
        - special: 'capabilities'
        - capabilities: {'version': REMOTE_VERSION, 'capabilities': []}
        from sjrpc import __version__
        cap = {'version': __version__, 'capabilities':['rpc', 'tunnel']}
        rpc0 = self.get_protocol(0)
        rpc0.send_special('capabilities', capabilities=cap)

    def _dispatch(self, watcher, revents):
        Read next message from socket and dispatch it to accoding protocol
        # Try to received remaining data from the socket:
        buf = self._sock.recv(self._remains)
        if buf == '':
        self._remains -= len(buf)

        if self._proto_receiving is None:
            self._inbound_buffer += buf
            if self._remains == 0:
                if self.fallback:
                    pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER_FALLBACK, self._inbound_buffer)[0]
                    label = 0
                    label, pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER, self._inbound_buffer)

Antoine Millet's avatar
Antoine Millet committed
                # Get the registered protocol for the specified label:
                self._proto_receiving = self._protocols.get(label)

                # If frame's label is not binded to a protocol, we create a
                # dummy protocol to consume the payload:
                if self._proto_receiving is None:
                    self._proto_receiving = Protocol(self, -1)

                self._inbound_buffer = ''
                self._remains = pl_size
            if self._remains == 0:
                if self.fallback:
                    self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
                    self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER)
                self._inbound_buffer = ''
                self._proto_receiving = None

    def _writer(self, watcher, revent):
        Write data on the socket.
        if self._outbound_buffer:
                if self.fallback:
                    sent = self._sock.send(self._outbound_buffer[:4096])
                    sent = self._sock.send(self._outbound_buffer)
            except socket.error as err:
                errmsg = 'Fatal error while sending through socket: %s' % err
                raise RpcError('SocketError', errmsg)
            self._outbound_buffer = self._outbound_buffer[sent:]

        if not self._outbound_buffer:

    def _cb_need_to_send(self, watcher, revents):

# Public API

    def rpc(self):
        return self.get_protocol(0)

    def run(self):
        Main loop execution.

    def create_watcher(self, watcher_class, **kwargs):
        Create a new pyev watcher and return it.
        kwargs['loop'] = self.loop
        watcher = watcher_class(**kwargs)
        return watcher
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        Low level method to send a message through the socket, generally
        used by protocols.
Antoine Millet's avatar
Antoine Millet committed
            raise RpcError('RpcError', 'Not connected to the peer')
        size = len(payload)
        if self.fallback:
            header = struct.pack(RpcConnection.MESSAGE_HEADER_FALLBACK, size)
            header = struct.pack(RpcConnection.MESSAGE_HEADER, label, size)
        self._outbound_buffer += header + payload
    def set_capabilities(self, capabilities):
        Set capabilities of remote host (and disable fallback mode).

        Should be called by Rpc0 when the peer send its capabilities message.
        self._remote_capabilities = capabilities
        self.fallback = False

    def register_protocol(self, label, protocol_class, *args, **kwargs):
Antoine Millet's avatar
Antoine Millet committed
        Register a new protocol for the specified label.
Antoine Millet's avatar
Antoine Millet committed
Antoine Millet's avatar
Antoine Millet committed

        if label in self._protocols:
            raise KeyError('A protocol is already registered for this label')
        elif not isinstance(label, int):
            raise ValueError('Label must be an integer')
        self._protocols[label] = protocol_class(self, label, *args, **kwargs)
        return self._protocols[label]
Antoine Millet's avatar
Antoine Millet committed

    def unregister_protocol(self, label):
Antoine Millet's avatar
Antoine Millet committed
        Unregister the specified protocol label for this connection.
Antoine Millet's avatar
Antoine Millet committed
        if label in self._protocols and label != 0:
            del self._protocols[label]
            raise KeyError('No protocol registered for this label')
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        Get the protocol registered for specified label.
Antoine Millet's avatar
Antoine Millet committed
        proto = self._protocols.get(label)
        if proto is None:
            raise KeyError('No protocol registered for this label')
        return proto
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        Shutdown this connection.
        # Shutdown each registered watcher:
        for watcher in self._watchers:
Antoine Millet's avatar
Antoine Millet committed

        # Shutdown each registered protocols:
        for proto in self._protocols.itervalues():
        # Close the connection socket:
        self._connected = False
        except socket.error as err:
            #self.logger.warn('Error while socket close: %s', err)
Antoine Millet's avatar
Antoine Millet committed

    def get_handler(self):
        Return the handler binded to the :class:`RpcConnection`.

        :return: binded handler
        return self._handler

    def set_handler(self, handler):
        Define a new handler for this connection.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :param handler: the new handler to define.
        self._handler = handler

    def get_fd(self):
        Get the file descriptor of the socket managed by this connection.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :return: the file descriptor number of the socket
            return self._sock.fileno()
        except socket.error:
            return None

    def getpeername(self):
        Get the peer name.

        :return: string representing the peer name
        return '%s:%s' % self._sock.getpeername()