Skip to content
async.py 3.23 KiB
Newer Older
import datetime
from Queue import Queue, Empty

#TODO: implement iterable interface
#TODO: implement priority responses

class AsyncWatcher(object):

    '''
    Asynchronous call watcher -- Handle asynchronous calls and responses.
    Usage example::

      >>> watcher = AsyncWatcher()
      >>> watcher.register(conn1, 'ping')
      >>> watcher.register(conn2, 'ping')
      >>> responses = watcher.wait() # Wait for all responses

    '''

    def __init__(self):
        self._response_queue = Queue()
        self._expected_responses = set()

    def _get_in_queue(self, *args, **kwargs):
        '''
        Get an item in response queue and return it with the time waited to its
        comming. Accept all arguments accepted by :meth:`Queue.get`
        '''
        start = datetime.datetime.now()
        item = self._response_queue.get(*args, **kwargs)
        dt = datetime.datetime.now() - start
        dt = dt.seconds + dt.days * 86400 + dt.microseconds * 0.000001
        return (dt, item)

#
# Public API:
#

    @property
    def remains(self):
        '''
        Remaining expected responses.
        '''
        return len(self._expected_responses)

    def register(self, rpc, method, *args, **kwargs):
        '''
        Call specified method on specified rpc with specified arguments
        and register the call on this object.

        :param rpc: the rpc on which made the call
        :param method: the rpc method to call
        :param \*args, \*\*kwargs: the arguments to pass to the rpc method
        '''
        msg_id = rpc.async_call(self._response_queue, method, *args, **kwargs)
        self._expected_responses.add(msg_id)

    def wait(self, timeout=None, max_wait=None):
        '''
        Wait responses for calls registered on this :class:`AsyncWatcher`
        instance and return them.

        :param timeout: timeout value or None to disable timeout (default: None)
        :param max_wait: maximum waited responses
        :return: received messages in a list

        .. note::
           You can repeat call to this method on a single
           :class:`AsyncWatcher`. For example, if you want to process the
           first arrived message, then wait others for a minute, you can do::
             >>> msgs = watcher.wait(max_wait=1)
             >>> process_speedy(msgs[0])
             >>> for msg in watcher.wait(timeout=60):
             >>>     process(msg)
        return list(self.iter(timeout, max_wait))

    def iter(self, timeout=None, max_wait=None):
        '''
        Work like :meth:`AsyncWatcher.wait` but return an iterable, instead of
        a list object.

        .. note::

           Responses are yielded by the iterable when they are received, so you
           can start the processing of response before receiving all.
        '''

        while self.remains:
                dt, response = self._get_in_queue(timeout=timeout)
            if timeout is not None:
                timeout -= dt
            self._expected_responses.remove(response['id'])
            yield response
            # Check for max_wait:
            if max_wait is not None:
                max_wait -= 1
                if not max_wait:
                    break