Skip to content
rpcconnection.py 14.1 KiB
Newer Older
Antoine Millet's avatar
Antoine Millet committed

'''
This module contains the RpcConnection class, more informations about this
class are located in it docstring.
'''

from __future__ import absolute_import

Antoine Millet's avatar
Antoine Millet committed
import ssl
import errno
Antoine Millet's avatar
Antoine Millet committed
import struct
import socket
import logging

from sjrpc.core.protocols import Protocol, RpcProtocol, TunnelProtocol
from sjrpc.core.exceptions import (RpcError, NoFreeLabelError,
                                   FallbackModeEnabledError)
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed

class RpcConnection(object):
    '''
    This class manage a single peer connection.
Antoine Millet's avatar
Antoine Millet committed
    You can wrap an existing socket with the default constructor::

      >>> conn = RpcConnection(mysocket)

    Or create a new socket automatically with from_addr constructor::

      >>> conn = RpcConnection.from_addr(host, port)

    If you prefer SSL connection, you can use the from_addr_ssl constructor::

      >>> conn = RpcConnection.from_addr_ssl(host, port)

    By default, an :class:`RpcProtocol` is created on label 0, you can access
    to this rpc through the `conn.rpc` shortcut::

      >>> conn.rpc.call('ping')

    Also, the connection object expose :meth:`call` and :meth:`async_call`
    method from default rpc, so you can use it directly on connection::

     >>> conn.call('ping') # Equivalent to the exemple before

    .. seealso::
       You can read the :ref:`Default rpc, aka Rpc0` section to know more about
       the default rpc

    :param sock: the socket object of this newly created :class:`RpcConnection`
    :param \*args,\*\*kwargs: arguments to pass to the default rpc protocol
        automatically registered on label 0.
Antoine Millet's avatar
Antoine Millet committed
    '''
Antoine Millet's avatar
Antoine Millet committed

    NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK)
    NONBLOCKING_SSL_ERRORS = (ssl.SSL_ERROR_WANT_READ)

    MESSAGE_HEADER_FALLBACK = '!L'
    SHORTCUTS_MAINRPC = ('call', 'async_call')
Antoine Millet's avatar
Antoine Millet committed

    def __init__(self, sock, loop=None, enable_tcp_keepalive=False,
                 *args, **kwargs):
Antoine Millet's avatar
Antoine Millet committed
        # Sock of this connection:
        self._sock = sock
        # Initialization requires fallback mode disabled:
        self.fallback = False

        # Get the pyev loop:
        if loop is None:
            self.loop = pyev.default_loop()
        else:
            self.loop = loop

        # Activate TCP keepalive on the connection:
        if enable_tcp_keepalive:
            self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
Antoine Millet's avatar
Antoine Millet committed

        # Watcher list:
        self._watchers = set()

        # Socket inbound/outbound buffers:
        self._inbound_buffer = ''
        self._outbound_buffer = ''
        self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
        self._proto_receiving = None

        # Initialize main read/write watchers:
        self._sock_reader = self.create_watcher(pyev.Io,
                                                fd=self._sock,
                                                events=pyev.EV_READ,
                                                callback=self._dispatch)
        self._sock_writer = self.create_watcher(pyev.Io,
                                                fd=self._sock,
                                                events=pyev.EV_WRITE,
                                                callback=self._writer)
        # Is the RpcConnection connected to its peer:
        self._connected = True

        # "Need to send" loop signal:
        self._need_to_send = self.create_watcher(pyev.Async,
                                                 callback=self._cb_need_to_send)
        self._need_to_send.start()

Antoine Millet's avatar
Antoine Millet committed
        # Setup logging facility:
        self.logger = logging.getLogger('sjrpc.%s' % self.getpeername())

        # Protocols registered on this connection:
        self._protocols = {}
        self.register_protocol(0, RpcProtocol, *args, **kwargs)

        # Create shortcuts to the main rpc (protocol 0) for convenience:
        for name in RpcConnection.SHORTCUTS_MAINRPC:
            setattr(self, name, getattr(self.get_protocol(0), name))
        # By default, enter in fallback mode, no label, all frames are
        # redirected on Rpc0:
        self.fallback = True
Antoine Millet's avatar
Antoine Millet committed

        # Send our capabilities to the peer:
        self._remote_capabilities = None
        self._send_capabilities()

Antoine Millet's avatar
Antoine Millet committed
    @classmethod
    def from_addr(cls, addr, port, conn_timeout=30.0, *args, **kwargs):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Construct the instance of :class:`RpcConnection` without providing
        the :class:`socket` object. Socket is automatically created and passed
        to the standard constructor before to return the new instance.

        :param addr: the target ip address
        :param port: the target port
        :param conn_timeout: the connection operation timeout
        :param \*args,\*\*kwargs: extra argument to pass to the constructor (see
Antoine Millet's avatar
Antoine Millet committed
        '''
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(conn_timeout)
Antoine Millet's avatar
Antoine Millet committed
        sock.connect((addr, port))
        return cls(sock, *args, **kwargs)

    @classmethod
    def from_addr_ssl(cls, addr, port, cert=None,
                      conn_timeout=30, *args, **kwargs):
        '''
        Construct :class:`RpcConnection` instance like :meth:`from_addr`, but
        enable ssl on socket.

        :param cert: ssl client certificate or None for ssl without certificat
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(conn_timeout)
        sock.connect((addr, port))
        sock.setblocking(True)
        req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED
        sock = ssl.wrap_socket(sock, certfile=cert, cert_reqs=req,
                               ssl_version=ssl.PROTOCOL_TLSv1)
        return cls(sock, *args, **kwargs)
Antoine Millet's avatar
Antoine Millet committed

    def __repr__(self):
Antoine Millet's avatar
Antoine Millet committed

    def __hash__(self):
        return self._sock.__hash__()

    def __nonzero__(self):
        return self._connected

    def _send_capabilities(self):
        '''
        Send capabilities to the peer, only work in fallback mode for
        compatibility with old sjRpc.

        Send a special message through the Rpc0 with these fields:
        - special: 'capabilities'
        - capabilities: {'version': REMOTE_VERSION, 'capabilities': []}
        '''
        from sjrpc import __version__
        cap = {'version': __version__, 'capabilities':['rpc', 'tunnel']}
        rpc0 = self.get_protocol(0)
        rpc0.send_special('capabilities', capabilities=cap)

    def _dispatch(self, watcher, revents):
        Read next message from socket and dispatch it to accoding protocol
        handler.
        # Try to received remaining data from the socket:
        try:
            buf = self._sock.recv(self._remains)
        except socket.error as err:
            if (isinstance(err, socket.error) and err.errno
                in RpcConnection.NONBLOCKING_ERRORS):
                return
            elif (isinstance(err, ssl.SSLError) and err.errno
                  in RpcConnection.NONBLOCKING_SSL_ERRORS):
                return
            else:
                raise
Antoine Millet's avatar
Antoine Millet committed
        if not buf:
            # Empty data on non-blocking socket means that the connection
            # has been closed:
Antoine Millet's avatar
Antoine Millet committed
        self._remains -= len(buf)
        if self._proto_receiving is None:
            self._inbound_buffer += buf
            if self._remains == 0:
                if self.fallback:
                    pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER_FALLBACK, self._inbound_buffer)[0]
                    label = 0
                else:
                    label, pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER, self._inbound_buffer)

Antoine Millet's avatar
Antoine Millet committed
                # Get the registered protocol for the specified label:
                self._proto_receiving = self._protocols.get(label)

                # If frame's label is not binded to a protocol, we create a
                # dummy protocol to consume the payload:
                if self._proto_receiving is None:
                    self._proto_receiving = Protocol(self, -1)

                self._proto_receiving.start_message(pl_size)
                self._inbound_buffer = ''
                self._remains = pl_size
            self._proto_receiving.feed(buf)
            if self._remains == 0:
                self._proto_receiving.end_of_message()
                if self.fallback:
                    self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
                else:
                    self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER)
                self._inbound_buffer = ''
                self._proto_receiving = None

    def _writer(self, watcher, revent):
        '''
        Write data on the socket.
        '''
        if self._outbound_buffer:
            try:
                if self.fallback:
                    sent = self._sock.send(self._outbound_buffer[:4096])
                else:
                    sent = self._sock.send(self._outbound_buffer)
            except socket.error as err:
                errmsg = 'Fatal error while sending through socket: %s' % err
                self.logger.error(errmsg)
                raise RpcError('SocketError', errmsg)
            self._outbound_buffer = self._outbound_buffer[sent:]

        if not self._outbound_buffer:
            watcher.stop()

    def _cb_need_to_send(self, watcher, revents):
        self._sock_writer.start()

#
# Public API
#

    @property
    def rpc(self):
        return self.get_protocol(0)

    def run(self):
        '''
        Main loop execution.
        '''
        self.loop.start()

    def create_watcher(self, watcher_class, **kwargs):
        '''
        Create a new pyev watcher and return it.
        '''
        kwargs['loop'] = self.loop
        watcher = watcher_class(**kwargs)
        self._watchers.add(watcher)
        return watcher
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        '''
        Low level method to send a message through the socket, generally
        used by protocols.
Antoine Millet's avatar
Antoine Millet committed
        '''
            raise RpcError('RpcError', 'Not connected to the peer')
        size = len(payload)
        if self.fallback:
            header = struct.pack(RpcConnection.MESSAGE_HEADER_FALLBACK, size)
        else:
            header = struct.pack(RpcConnection.MESSAGE_HEADER, label, size)
        self._outbound_buffer += header + payload
        self._need_to_send.send()
    def set_capabilities(self, capabilities):
        '''
        Set capabilities of remote host (and disable fallback mode).

        Should be called by Rpc0 when the peer send its capabilities message.
        '''
        self._remote_capabilities = capabilities
        self.fallback = False

    def register_protocol(self, label, protocol_class, *args, **kwargs):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Register a new protocol for the specified label.
Antoine Millet's avatar
Antoine Millet committed
        '''
        if self.fallback:
            raise FallbackModeEnabledError('Fallback mode is not compatible '
                                           'with protocols')
        if label is None:
            for label in xrange(0, RpcConnection.MAX_LABEL):
                if label not in self._protocols:
                    break
            else:
                raise NoFreeLabelError('No more label number are availables')
        if label in self._protocols:
            raise KeyError('A protocol is already registered for this label')
        elif not isinstance(label, int):
            raise ValueError('Label must be an integer')
        self._protocols[label] = protocol_class(self, label, *args, **kwargs)
        return self._protocols[label]
Antoine Millet's avatar
Antoine Millet committed

    def unregister_protocol(self, label):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Unregister the specified protocol label for this connection.
Antoine Millet's avatar
Antoine Millet committed
        '''
        if self.fallback:
            raise FallbackModeEnabledError('Fallback mode is not compatible '
                                           'with protocols')
        if label in self._protocols and label != 0:
            del self._protocols[label]
            raise KeyError('No protocol registered for this label')
Antoine Millet's avatar
Antoine Millet committed

    def create_rpc(self, label=None, *args, **kwargs):
        '''
        Shortcut which can be used to create rpc protocols.
        '''
        return self.register_protocol(label, RpcProtocol, *args, **kwargs)

    def create_tunnel(self, label=None, *args, **kwargs):
        '''
        Shortcut which can be used to create tunnels protocols.
        '''
        return self.register_protocol(label, TunnelProtocol, *args, **kwargs)

Antoine Millet's avatar
Antoine Millet committed
        '''
        Get the protocol registered for specified label.
Antoine Millet's avatar
Antoine Millet committed
        '''
        proto = self._protocols.get(label)
        if proto is None:
            raise KeyError('No protocol registered for this label')
        return proto
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        '''
        Shutdown this connection.
        '''
        # Shutdown each registered watcher:
        for watcher in self._watchers:
            watcher.stop()
Antoine Millet's avatar
Antoine Millet committed

        # Shutdown each registered protocols:
        for proto in self._protocols.itervalues():
            proto.shutdown()
        # Close the connection socket:
        self._connected = False
            self._sock.shutdown(socket.SHUT_RDWR)
            self._sock.close()
        except socket.error as err:
            #self.logger.warn('Error while socket close: %s', err)
            pass
Antoine Millet's avatar
Antoine Millet committed

    def get_handler(self):
        '''
        Return the handler binded to the :class:`RpcConnection`.

        :return: binded handler
        '''
        return self._handler

    def set_handler(self, handler):
        '''
        Define a new handler for this connection.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :param handler: the new handler to define.
        '''
        self._handler = handler

    def get_fd(self):
        '''
        Get the file descriptor of the socket managed by this connection.
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        :return: the file descriptor number of the socket
        '''
        try:
            return self._sock.fileno()
        except socket.error:
            return None

    def getpeername(self):
        '''
        Get the peer name.

        :return: string representing the peer name
        '''
        return '%s:%s' % self._sock.getpeername()