Skip to content
Snippets Groups Projects
Commit d4e2a4b6 authored by Antoine Millet's avatar Antoine Millet
Browse files

Added AsyncWatcher class to handle async calls.

parent 3cde7284
No related branches found
No related tags found
No related merge requests found
import datetime
from Queue import Queue, Empty
#TODO: implement iterable interface
#TODO: implement priority responses
class AsyncWatcher(object):
'''
Asynchronous call watcher -- Handle reception of asynchrone-request
responses.
Usage example:
>>> watcher = AsyncWatcher()
>>> watcher.register(conn1, 'ping')
>>> watcher.register(conn2, 'ping')
>>> responses = watcher.wait() # Wait for all responses
'''
def __init__(self):
self._response_queue = Queue()
self._expected_responses = set()
def _get_in_queue(self, *args, **kwargs):
'''
Get an item in response queue and return it with the time waited to its
comming. Accept all arguments accepted by :meth:`Queue.get`
'''
start = datetime.datetime.now()
item = self._response_queue.get(*args, **kwargs)
dt = datetime.datetime.now() - start
dt = dt.seconds + dt.days * 86400 + dt.microseconds * 0.000001
return (dt, item)
#
# Public API:
#
def register(self, rpc, method, *args, **kwargs):
'''
Call specified method on specified rpc with specified arguments
and register the call on this object.
:param rpc: the rpc on which made the call
:param method: the rpc method to call
:param \*args, \*\*kwargs: the arguments to pass to the rpc method
'''
msg_id = rpc.async_call(self._response_queue, method, *args, **kwargs)
self._expected_responses.add(msg_id)
def wait(self, timeout=None, max_wait=None):
'''
Wait messages registered on this :class:`AsyncWatcher` instance and
return them.
:param timeout: timeout value or None to disable timeout (default: None)
:param max_wait: maximum waited messages
:return: received messages in a list
.. note::
You can repeat call to this method on a single
:class:`AsyncWatcher`, for example, if you want to process the
first arrived message, then wait others for a minute, you can do:
>>> msgs = watcher.wait(max_wait=1)
>>> process_speedy(msgs[0])
>>> for msg in watcher.wait(timeout=60):
>>> process(msg)
'''
responses = []
time_remains = timeout
while self._expected_responses:
try:
dt, response = self._get_in_queue(timeout=time_remains)
except Empty:
break
time_remains -= dt
responses.append(response)
self._expected_responses.remove(response['id'])
# Check for max_wait:
if max_wait is not None:
max_wait -= 1
if not max_wait:
break
return responses
@property
def remains(self):
'''
Remaining expected responses.
'''
return len(self._expected_responses)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment