Skip to content
rpcconnection.py 14.5 KiB
Newer Older
Antoine Millet's avatar
Antoine Millet committed
#!/usr/bin/env python
#coding:utf8

'''
This module contains the RpcConnection class, more informations about this
class are located in it docstring.
'''

import ssl
import json
import struct
import socket
import logging
import threading
from uuid import uuid1
from Queue import Queue

from sjrpc.utils import BytesBuffer
from sjrpc.core.callers import RpcCaller, ThreadedRpcCaller
from sjrpc.core.exceptions import RpcError


class RpcConnection(object):
    '''
    This class manage a single peer connection.
    '''
    
    RECV_BUF_SIZE = 4096
    SEND_BUF_SIZE = 4096
Antoine Millet's avatar
Antoine Millet committed
    REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}}
    RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None}
    
    def __init__(self, sock, manager, handler=None):
        # Sock of this connection:
        self._sock = sock
        
        # Inbound buffer containing currently readed message while it sending:
        self._inbound_buffer = BytesBuffer()
        
        # The size of the currently readed message, None value significate that
        # the next message size have to be readed:
        self._cur_msg_size = None
        
        # Outbound buffer containing data to write on the socket:
        self._outbound_buffer = BytesBuffer()
        
        # Reference to the RPC handler, this attribute can be set to None if
        # no handler is defined for this connection (for example, for a client
        # connection).
        self._handler = handler
        
        # Store all calls sent to the peer. Key is the id of the call and value
        # the event to raise when call is finished.
        self._calls = {}
        
        # The manager of this connection 
        self._manager = manager

    @classmethod
    def from_addr(cls, addr, port, manager, enable_ssl=False, timeout=30.0,
Antoine Millet's avatar
Antoine Millet committed
                  cert=None, handler=None):
        '''
        Construct the instance of :class:`RpcConnection` without providing
        the :class:`socket` object. Socket is automatically created and passed
        to the standard constructor before to return the new instance.

        :param addr: the target ip address
        :param port: the target port
        :param manager: manager of this connection
Antoine Millet's avatar
Antoine Millet committed
        :param enable_ssl: enable SSL
        :param cert: is SSL is enabled, profile the filename of certificate to
            check. If None, don't check certificate.
        :param handler: Handler to attach to this :class:`RpcConnection` object
Antoine Millet's avatar
Antoine Millet committed
        '''

        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if enable_ssl:
            req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED
            sock = ssl.wrap_socket(sock, certfile=None, cert_reqs=req,
                                   ssl_version=ssl.PROTOCOL_TLSv1)
Antoine Millet's avatar
Antoine Millet committed
        sock.connect((addr, port))
        sock.setblocking(False)
        return cls(sock, manager, handler)

    def __repr__(self):
Antoine Millet's avatar
Antoine Millet committed

    def __hash__(self):
        return self._sock.__hash__()

    def send(self):
        '''
        Flush the outbound buffer by sending all it content through socket. This 
        method is usually called by the :class:`ConnectionManager` of 
        this :class:`RpcConnection`.
        '''

        data = self._outbound_buffer.pull(RpcConnection.SEND_BUF_SIZE)

        while data:
            sent = self._sock.send(data)
            data = data[sent:]

        with self._outbound_buffer:
            if not len(self._outbound_buffer):
                self._manager.nothing_to_write(self)
Antoine Millet's avatar
Antoine Millet committed
    
    def receive(self):
        '''
        Receive data from socket into the inbound buffer. If data can be
        decoded, do it and pass result to :meth:`dispatch` method.
        '''
        
        buf = self._sock.recv(RpcConnection.RECV_BUF_SIZE)
        if not buf:
            raise socket.error()
        
        self._inbound_buffer.push(buf)
        # Read the message length:
        if self._cur_msg_size is None and len(self._inbound_buffer) >= 4:
            length = struct.unpack('!L', self._inbound_buffer.pull(4))[0]
            self._cur_msg_size = length
        
        # Read the message payload:
        if (self._cur_msg_size is not None 
                           and len(self._inbound_buffer) >= self._cur_msg_size):
            payload = self._inbound_buffer.pull(self._cur_msg_size)
            self._cur_msg_size = None
            message = json.loads(payload)
            logging.debug('Received: %s' % message)
            self.dispatch(message)
    
    def _send(self, message):
        '''
        Low level method to encode a message in json, calculate it size, and
        place result on outbound buffer.
        
        .. warning::
           Message must be a jsonisable structure.
        '''

        logging.debug('Sending: %s' % message)
        json_msg = json.dumps(message)
        size = struct.pack('!L', len(json_msg))
        with self._outbound_buffer:
            self._outbound_buffer.push(size + json_msg)
            self._manager.data_to_write(self)
Antoine Millet's avatar
Antoine Millet committed
    
    def _send_call(self, method_name, *args, **kwargs):
        '''
        Create the message for the call and push them to the outbound queue.
        
        :param method_name: the name of the method to call on the peer
        :param *args: arguments to pass to the remote method
        :param **kwargs: keyword arguments to pass to the remote method
        :return: the generated id for the request
        :rtype: :class:`str` object
        '''
        
        msg = RpcConnection.REQUEST_MESSAGE.copy()
        msg['method'] = method_name
        msg['args'] = args
        msg['kwargs'] = kwargs
        msg['id'] = str(uuid1())
        
        self._send(msg)
        
        return msg['id']
    
    def _send_response(self, msg_id, returned=None, error=None):
        '''
        Low level method to send a response message to the peer.

        :param msg_id: the id of the replied message
        :param returned: returned data
        :type returned: returned data or None if errors have been raised
        :param error: raised errors
        :type error: raised error or None if no error have been raised
        '''
        
        msg = RpcConnection.RESPONSE_MESSAGE.copy()
        msg['id'] = msg_id
        msg['return'] = returned
        msg['error'] = error
        
        self._send(msg)
    
    def response(self, msg_id, returned):
        '''
        Send an "return" response to the peer.
        
        :param msg_id: the id of the replied message
        :param returned: the value returned by the function
        
        .. warning::
           In case of raised error, use the :meth:`error` method instead of 
           this one.
        '''
        
        self._send_response(msg_id, returned=returned)
    
    def error(self, msg_id, error, message):
        '''
        Send an error response to the peer.
        
        :param msg_id: the id of the replied message
        :param error: the name of the raised exception
        :param message: human readable error for the exception
        '''
        
        err = {'exception': error, 'message': message}
        self._send_response(msg_id, error=err)
    
    def call(self, method_name, *args, **kwargs):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Make a new remote call on the peer.
        
        :param method_name: the method to call on the peer
        :param \*args: the arguments for the call
        :param \*\*kwargs: the keyword arguments for the call
        :return: the data returned by the peer for the call
        
        .. note::
           This function will block until the peer response is received. You 
           can also specify a ``timeout`` argument to specify a number of 
           seconds before to raise an :exc:`CallTimeout` exception if the peer 
           didnt respond.
        '''

        if '_timeout' in kwargs:
            timeout = kwargs['_timeout']
            del kwargs['_timeout']
        else:
            timeout = None
Antoine Millet's avatar
Antoine Millet committed
        
        # Send the call to the peer:
        msg_id = self._send_call(method_name, *args, **kwargs)
        
        # Create an item in calls dict with reference to the event to raise:
        self._calls[msg_id] = {'return': None, 'error': None, 
                               'event': threading.Event()}
        
        # Wait for the response:
        self._calls[msg_id]['event'].wait(timeout)

        # Check if timeout occured:
        if not self._calls[msg_id]['event'].is_set():
            raise RpcError('TimeoutError', 'remote method timeout')
Antoine Millet's avatar
Antoine Millet committed
        
        # Check if error occured while execution:
        if self._calls[msg_id]['error'] is not None:
            raise RpcError(self._calls[msg_id]['error']['exception'],
                           self._calls[msg_id]['error']['message'])
        
        response = self._calls[msg_id]['return']
        del self._calls[msg_id]
        
        return response
    
    def async_call(self, method_name, *args, **kwargs):
        '''
        Make a new asynchronous call on the peer.
        
        :param method_name: the method to call on the peer
        :param \*args: the arguments for the call
        :param \*\*kwargs: the keyword arguments for the call
        :return: the message id of the call
        '''

        # Extract _data from argument:
        if '_data' in kwargs:
            data = kwargs['_data']
            del kwargs['_data']
        else:
            data = None

Antoine Millet's avatar
Antoine Millet committed
        # Send the call to the peer:
        msg_id = self._send_call(method_name, *args, **kwargs)

        self._calls[msg_id] = {'async': True, 'data': data}
Antoine Millet's avatar
Antoine Millet committed

        return msg_id

    def dispatch(self, message):
        '''
        Dispatch a received message according to it type.
        
        :param message: the received message to dispatch
        
        .. note::
           When dispatching a call, the responsability of response is delegated
           to the caller, except for the case where the method isn't found on 
           the handler.
        '''
        if set(RpcConnection.REQUEST_MESSAGE) <= set(message):
            # Handle call requests from the peer:
            if self._handler is not None:
                try:
                    func = self._handler[message['method']]
                except KeyError:
                    self.error(
                        message['id'],
                        'NameError', 
                        "remote name '%s' is not defined" % message['method'],
                    )
                else:
                    if getattr(func, '__threaded__', True):
                        ThreadedRpcCaller(message, self, func).start()
                    else:
                        RpcCaller(message, self, func).start()
Antoine Millet's avatar
Antoine Millet committed
                self.error(
                    message['id'],
                    'NameError',
                    "remote name '%s' is not defined" % message['method'],
                )
Antoine Millet's avatar
Antoine Millet committed
        elif set(RpcConnection.RESPONSE_MESSAGE) <= set(message):
            # Handle response message from the peer:
            call = self._calls.get(message['id'])
            if call is not None:
                # Call exists in call list
                if message['error'] is None:
                    call['return'] = message['return']
                else:
                    call['error'] = message['error']

                if 'event' in call:
                    # Release the call if its synchronous:
                    call['event'].set()
                else:
                    # Else, it's an asynchonous call, so, we need to notify
                    # manager that the response has been received:
                    message['data'] = call['data']
Antoine Millet's avatar
Antoine Millet committed
                    self._manager.signal_arrival(message)

                    # Message will be deleted from the call list by the
                    # manager while cleanup.

        else:
            pass # Malformed messages are note processed
    
Antoine Millet's avatar
Antoine Millet committed
        '''
        Shutdown this connection.
        
        :param callback: Name of the callback to call on the handler or a
            callable to call when the connection is shutdown.
        :type callback: :class:`str` or callable or None
Antoine Millet's avatar
Antoine Millet committed
        '''

        self._sock.close()
        # Release all running calls from this connection:
        for call in self._calls.values():
            if 'event' in call:
                call['error'] = {'exception': 'RpcError',
                                 'message': 'Connection reset by peer'}
                call['return'] = None
                call['event'].set()
        
        if callback is not None and not callable(callback):
            if self._handler is not None:
                try:
                    callback = self._handler[callback]
                except KeyError:
                    callback = None
Antoine Millet's avatar
Antoine Millet committed
            else:
                callback = None

        if callback is not None:
            try:
                callback(self)
            except Exception as err:
                logging.error('Error while execution of shutdown '
                              'callback: %s' % err)
Antoine Millet's avatar
Antoine Millet committed

    def clean_call(self, msg_id):
        '''
        Delete waiting call by it id in the call list.

        :param msg_id: the id of the call to delete
        :return: True if call have been deleted or False if it doesn't exists
        '''
        try:
            del self._calls[msg_id]
        except KeyError:
            return False
        else:
            return True

    def clean_all_calls(self, msg_ids):
        '''
        Delete all call by their ids in the call list.

        :param msg_id: list of call id to delete
        '''

        for msg_id in msg_ids:
            self.clean_call(msg_id)

    def get_handler(self):
        '''
        Return the handler binded to the :class:`RpcConnection`.

        :return: binded handler
        '''

        return self._handler

    def set_handler(self, handler):
        '''
        Define a new handler for this connection.
        
        :param handler: the new handler to define.
        '''
        
        self._handler = handler

    def get_fd(self):
        '''
        Get the file descriptor of the socket managed by this connection.
        
        :return: the file descriptor number of the socket
        '''
        
        return self._sock.fileno()

    def getpeername(self):
        '''
        Get the peer name.

        :return: string representing the peer name
        '''

        return '%s:%s' % self._sock.getpeername()