Newer
Older
import datetime
from Queue import Queue, Empty
#TODO: implement iterable interface
#TODO: implement priority responses
class AsyncWatcher(object):
'''
Asynchronous call watcher -- Handle asynchronous calls and responses.
Usage example::
>>> watcher = AsyncWatcher()
>>> watcher.register(conn1, 'ping')
>>> watcher.register(conn2, 'ping')
>>> responses = watcher.wait() # Wait for all responses
'''
def __init__(self):
self._response_queue = Queue()
self._expected_responses = set()
def _get_in_queue(self, *args, **kwargs):
'''
Get an item in response queue and return it with the time waited to its
comming. Accept all arguments accepted by :meth:`Queue.get`
'''
start = datetime.datetime.now()
item = self._response_queue.get(*args, **kwargs)
dt = datetime.datetime.now() - start
dt = dt.seconds + dt.days * 86400 + dt.microseconds * 0.000001
return (dt, item)
#
# Public API:
#
@property
def remains(self):
'''
Remaining expected responses.
'''
return len(self._expected_responses)
def register(self, rpc, method, *args, **kwargs):
'''
Call specified method on specified rpc with specified arguments
and register the call on this object.
:param rpc: the rpc on which made the call
:param method: the rpc method to call
:param \*args, \*\*kwargs: the arguments to pass to the rpc method
'''
msg_id = rpc.async_call(self._response_queue, method, *args, **kwargs)
self._expected_responses.add(msg_id)
def wait(self, timeout=None, max_wait=None):
'''
Wait responses for calls registered on this :class:`AsyncWatcher`
instance and return them.
:param timeout: timeout value or None to disable timeout (default: None)
:param max_wait: maximum waited responses
:return: received messages in a list
.. note::
You can repeat call to this method on a single
:class:`AsyncWatcher`. For example, if you want to process the
first arrived message, then wait others for a minute, you can do::
>>> msgs = watcher.wait(max_wait=1)
>>> process_speedy(msgs[0])
>>> for msg in watcher.wait(timeout=60):
>>> process(msg)
return list(self.iter(timeout, max_wait))
def iter(self, timeout=None, max_wait=None):
'''
Work like :meth:`AsyncWatcher.wait` but return an iterable, instead of
a list object.
.. note::
Responses are yielded by the iterable when they are received, so you
can start the processing of response before receiving all.
'''
dt, response = self._get_in_queue(timeout=timeout)
if timeout is not None:
timeout -= dt
self._expected_responses.remove(response['id'])
# Check for max_wait:
if max_wait is not None:
max_wait -= 1
if not max_wait:
break