diff --git a/ccserver/objectsdb.py b/ccserver/objectsdb.py index efab3f7144f36b8cfb9be5f645e6c83a37efd657..0bb8b2dcefb9774ab8b9277598857a119c078150 100644 --- a/ccserver/objectsdb.py +++ b/ccserver/objectsdb.py @@ -51,6 +51,7 @@ class ObjectsDB(object): # The list of received message while update: self._msgs = [] + self._requested = {} # object id -> set of tags # Total amount of query updates: self._nb_queries = 0 @@ -66,9 +67,10 @@ class ObjectsDB(object): ''' with self._lock: + self._requested = {} self._nb_queries += 1 self._update(ids, tags, tags_novalue) - self._process_responses() + self._process_responses(tags) self._update_user_defined(ids) def _update(self, ids=None, tags=None, tags_novalue=None): @@ -113,12 +115,14 @@ class ObjectsDB(object): if '__parent' in obj: # Current object is proxified by a parent: parent = obj['__parent'] - self._update_object(parent, tags, tags_novalue) # Parent is removed from the list if it present (because # it has been updated): if parent['id'] in ids: + self._update_object(parent, tags, tags_novalue) ids.remove(parent['id']) + else: + self._update_object(parent, (), (), only_udt=True) # Search for sibling objects: sg = set((o for o in ids if o.startswith('%s.' % parent['id']))) @@ -153,8 +157,13 @@ class ObjectsDB(object): user_tags = obj['__user_defined_tags'] obj.update(user_tags) + + # Delete ttls if present: + for tag in user_tags: + if (obj['id'], tag) in self._ttls: + del self._ttls[(obj['id'], tag)] - def _process_responses(self): + def _process_responses(self, tags): ''' Process the responses received from clients. ''' @@ -162,17 +171,32 @@ class ObjectsDB(object): msgs = self._server.manager.wait(frozenset(self._msgs), timeout=4) now = datetime.now() + # Avoid error if all tags are asked (not optimized case because we + # can't known the real list of tags: + if tags is None: + tags = () + + received = set() + for msg in msgs: + obj = msg['data'] + assert isinstance(obj, TqlObject), 'data obj is not TqlObject' + # Update tags if received from the peer: if msg.get('error') is not None: logging.warning('Error from client while getting tags: %s', msg.get('error')) + for name in tags: + if name in obj: + old = obj[name] + obj[name] = '#ERR#OLD:%s' % old + else: + obj[name] = '#ERR#' + self._ttls[(obj['id'], name)] = now + timedelta(0, 10) else: returned = msg['return'] - obj = msg['data'] - assert isinstance(returned, dict), 'returned tags is not a dict' - assert isinstance(obj, TqlObject), 'data obj is not TqlObject' + received.add(obj['id']) for name, attrs in returned.iteritems(): obj[name] = attrs.get('value') @@ -188,7 +212,15 @@ class ObjectsDB(object): self._ttls[(obj['id'], name)] = tod - def _update_object(self, obj, tags, tags_novalue): + for oid in set(self._requested) - received: + obj = self.get_by_id(oid) + o_tags = self._requested[oid] + if o_tags is not None: + for name in o_tags: + obj[name] = '#ERR#' + self._ttls[(obj['id'], name)] = now + timedelta(0, 10) + + def _update_object(self, obj, tags, tags_novalue, only_udt=False): ''' Calculate the list of tags to request for a client, and send the request to the client. @@ -204,6 +236,9 @@ class ObjectsDB(object): user_tags = parent['__user_defined_tags'] client_id = parent['id'] + if only_udt: + return + if tags is not None: tags = set(tags) tags -= set(user_tags) @@ -257,6 +292,7 @@ class ObjectsDB(object): msg_id = client.connection.async_call('get_tags', tags, tags_novalue, _data=obj) + self._requested[obj['id']] = tags self._msgs.append(msg_id) def get_by_id(self, oid):