Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import datetime
from Queue import Queue, Empty
#TODO: implement iterable interface
#TODO: implement priority responses
class AsyncWatcher(object):
'''
Asynchronous call watcher -- Handle reception of asynchrone-request
responses.
Usage example:
>>> watcher = AsyncWatcher()
>>> watcher.register(conn1, 'ping')
>>> watcher.register(conn2, 'ping')
>>> responses = watcher.wait() # Wait for all responses
'''
def __init__(self):
self._response_queue = Queue()
self._expected_responses = set()
def _get_in_queue(self, *args, **kwargs):
'''
Get an item in response queue and return it with the time waited to its
comming. Accept all arguments accepted by :meth:`Queue.get`
'''
start = datetime.datetime.now()
item = self._response_queue.get(*args, **kwargs)
dt = datetime.datetime.now() - start
dt = dt.seconds + dt.days * 86400 + dt.microseconds * 0.000001
return (dt, item)
#
# Public API:
#
@property
def remains(self):
'''
Remaining expected responses.
'''
return len(self._expected_responses)
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def register(self, rpc, method, *args, **kwargs):
'''
Call specified method on specified rpc with specified arguments
and register the call on this object.
:param rpc: the rpc on which made the call
:param method: the rpc method to call
:param \*args, \*\*kwargs: the arguments to pass to the rpc method
'''
msg_id = rpc.async_call(self._response_queue, method, *args, **kwargs)
self._expected_responses.add(msg_id)
def wait(self, timeout=None, max_wait=None):
'''
Wait messages registered on this :class:`AsyncWatcher` instance and
return them.
:param timeout: timeout value or None to disable timeout (default: None)
:param max_wait: maximum waited messages
:return: received messages in a list
.. note::
You can repeat call to this method on a single
:class:`AsyncWatcher`, for example, if you want to process the
first arrived message, then wait others for a minute, you can do:
>>> msgs = watcher.wait(max_wait=1)
>>> process_speedy(msgs[0])
>>> for msg in watcher.wait(timeout=60):
>>> process(msg)
'''
responses = []
while self._expected_responses:
try:
dt, response = self._get_in_queue(timeout=timeout)
if timeout is not None:
timeout -= dt