diff --git a/sjrpc/core/async.py b/sjrpc/core/async.py new file mode 100644 index 0000000000000000000000000000000000000000..894bd100794c940ffb1ae508702dd3c0bd28f0eb --- /dev/null +++ b/sjrpc/core/async.py @@ -0,0 +1,92 @@ +import datetime +from Queue import Queue, Empty + +#TODO: implement iterable interface +#TODO: implement priority responses + +class AsyncWatcher(object): + + ''' + Asynchronous call watcher -- Handle reception of asynchrone-request + responses. + + Usage example: + >>> watcher = AsyncWatcher() + >>> watcher.register(conn1, 'ping') + >>> watcher.register(conn2, 'ping') + >>> responses = watcher.wait() # Wait for all responses + ''' + + def __init__(self): + self._response_queue = Queue() + self._expected_responses = set() + + def _get_in_queue(self, *args, **kwargs): + ''' + Get an item in response queue and return it with the time waited to its + comming. Accept all arguments accepted by :meth:`Queue.get` + ''' + start = datetime.datetime.now() + item = self._response_queue.get(*args, **kwargs) + dt = datetime.datetime.now() - start + dt = dt.seconds + dt.days * 86400 + dt.microseconds * 0.000001 + return (dt, item) + +# +# Public API: +# + + def register(self, rpc, method, *args, **kwargs): + ''' + Call specified method on specified rpc with specified arguments + and register the call on this object. + + :param rpc: the rpc on which made the call + :param method: the rpc method to call + :param \*args, \*\*kwargs: the arguments to pass to the rpc method + ''' + msg_id = rpc.async_call(self._response_queue, method, *args, **kwargs) + self._expected_responses.add(msg_id) + + def wait(self, timeout=None, max_wait=None): + ''' + Wait messages registered on this :class:`AsyncWatcher` instance and + return them. + + :param timeout: timeout value or None to disable timeout (default: None) + :param max_wait: maximum waited messages + :return: received messages in a list + + .. note:: + You can repeat call to this method on a single + :class:`AsyncWatcher`, for example, if you want to process the + first arrived message, then wait others for a minute, you can do: + + >>> msgs = watcher.wait(max_wait=1) + >>> process_speedy(msgs[0]) + >>> for msg in watcher.wait(timeout=60): + >>> process(msg) + ''' + responses = [] + time_remains = timeout + while self._expected_responses: + try: + dt, response = self._get_in_queue(timeout=time_remains) + except Empty: + break + time_remains -= dt + responses.append(response) + self._expected_responses.remove(response['id']) + # Check for max_wait: + if max_wait is not None: + max_wait -= 1 + if not max_wait: + break + return responses + + @property + def remains(self): + ''' + Remaining expected responses. + ''' + return len(self._expected_responses)