import datetime from Queue import Queue, Empty #TODO: implement iterable interface #TODO: implement priority responses class AsyncWatcher(object): ''' Asynchronous call watcher -- Handle asynchronous calls and responses. Usage example:: >>> watcher = AsyncWatcher() >>> watcher.register(conn1, 'ping') >>> watcher.register(conn2, 'ping') >>> responses = watcher.wait() # Wait for all responses ''' def __init__(self): self._response_queue = Queue() self._expected_responses = set() def _get_in_queue(self, *args, **kwargs): ''' Get an item in response queue and return it with the time waited to its comming. Accept all arguments accepted by :meth:`Queue.get` ''' start = datetime.datetime.now() item = self._response_queue.get(*args, **kwargs) dt = datetime.datetime.now() - start dt = dt.seconds + dt.days * 86400 + dt.microseconds * 0.000001 return (dt, item) # # Public API: # @property def remains(self): ''' Remaining expected responses. ''' return len(self._expected_responses) def register(self, rpc, method, *args, **kwargs): ''' Call specified method on specified rpc with specified arguments and register the call on this object. :param rpc: the rpc on which made the call :param method: the rpc method to call :param \*args, \*\*kwargs: the arguments to pass to the rpc method ''' msg_id = rpc.async_call(self._response_queue, method, *args, **kwargs) self._expected_responses.add(msg_id) def wait(self, timeout=None, max_wait=None): ''' Wait responses for calls registered on this :class:`AsyncWatcher` instance and return them. :param timeout: timeout value or None to disable timeout (default: None) :param max_wait: maximum waited responses :return: received messages in a list .. note:: You can repeat call to this method on a single :class:`AsyncWatcher`. For example, if you want to process the first arrived message, then wait others for a minute, you can do:: >>> msgs = watcher.wait(max_wait=1) >>> process_speedy(msgs[0]) >>> for msg in watcher.wait(timeout=60): >>> process(msg) ''' return list(self.iter(timeout, max_wait)) def iter(self, timeout=None, max_wait=None): ''' Work like :meth:`AsyncWatcher.wait` but return an iterable, instead of a list object. .. note:: Responses are yielded by the iterable when they are received, so you can start the processing of response before receiving all. ''' while self.remains: try: dt, response = self._get_in_queue(timeout=timeout) except Empty: break if timeout is not None: timeout -= dt self._expected_responses.remove(response['id']) yield response # Check for max_wait: if max_wait is not None: max_wait -= 1 if not max_wait: break