Commit 4f5e7141 authored by Antoine Millet's avatar Antoine Millet
Browse files

Added raise_timeout option in wait and iter methods of AsyncWatcher

This option allow to raise a TimeoutError for each expected response
not arrived on time.
parent ecdadf15
Loading
Loading
Loading
Loading
+15 −6
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ class AsyncWatcher(object):

    def __init__(self):
        self._response_queue = Queue()
        self._expected_responses = set()
        self._expected_responses = {}

    def _get_in_queue(self, *args, **kwargs):
        '''
@@ -54,9 +54,10 @@ class AsyncWatcher(object):
        :param \*args, \*\*kwargs: the arguments to pass to the rpc method
        '''
        msg_id = rpc.async_call(self._response_queue, method, *args, **kwargs)
        self._expected_responses.add(msg_id)
        self._expected_responses[msg_id] = kwargs.pop('_data', None)
        # Data are stored directly on the AsyncWatcher instead of RpcProtocol.

    def wait(self, timeout=None, max_wait=None):
    def wait(self, timeout=None, max_wait=None, raise_timeout=False):
        '''
        Wait responses for calls registered on this :class:`AsyncWatcher`
        instance and return them.
@@ -76,9 +77,9 @@ class AsyncWatcher(object):
             >>> for msg in watcher.wait(timeout=60):
             >>>     process(msg)
        '''
        return list(self.iter(timeout, max_wait))
        return list(self.iter(timeout, max_wait, raise_timeout))

    def iter(self, timeout=None, max_wait=None):
    def iter(self, timeout=None, max_wait=None, raise_timeout=False):
        '''
        Work like :meth:`AsyncWatcher.wait` but return an iterable, instead of
        a list object.
@@ -96,10 +97,18 @@ class AsyncWatcher(object):
                break
            if timeout is not None:
                timeout -= dt
            self._expected_responses.remove(response['id'])
            response['data'] = self._expected_responses[response['id']]
            del self._expected_responses[response['id']]
            yield response
            # Check for max_wait:
            if max_wait is not None:
                max_wait -= 1
                if not max_wait:
                    break
        if timeout is not None and raise_timeout:
            for expected, data in self._expected_responses.iteritems():
                yield {'error': {'exception': 'TimeoutError',
                                 'message': 'Response not received on time'},
                       'data': data,
                       'id': expected}
            self._expected_responses = {}