Loading ccserver/objectsdb.py +43 −7 Original line number Diff line number Diff line Loading @@ -51,6 +51,7 @@ class ObjectsDB(object): # The list of received message while update: self._msgs = [] self._requested = {} # object id -> set of tags # Total amount of query updates: self._nb_queries = 0 Loading @@ -66,9 +67,10 @@ class ObjectsDB(object): ''' with self._lock: self._requested = {} self._nb_queries += 1 self._update(ids, tags, tags_novalue) self._process_responses() self._process_responses(tags) self._update_user_defined(ids) def _update(self, ids=None, tags=None, tags_novalue=None): Loading Loading @@ -113,12 +115,14 @@ class ObjectsDB(object): if '__parent' in obj: # Current object is proxified by a parent: parent = obj['__parent'] self._update_object(parent, tags, tags_novalue) # Parent is removed from the list if it present (because # it has been updated): if parent['id'] in ids: self._update_object(parent, tags, tags_novalue) ids.remove(parent['id']) else: self._update_object(parent, (), (), only_udt=True) # Search for sibling objects: sg = set((o for o in ids if o.startswith('%s.' % parent['id']))) Loading Loading @@ -154,7 +158,12 @@ class ObjectsDB(object): obj.update(user_tags) def _process_responses(self): # Delete ttls if present: for tag in user_tags: if (obj['id'], tag) in self._ttls: del self._ttls[(obj['id'], tag)] def _process_responses(self, tags): ''' Process the responses received from clients. ''' Loading @@ -162,17 +171,32 @@ class ObjectsDB(object): msgs = self._server.manager.wait(frozenset(self._msgs), timeout=4) now = datetime.now() # Avoid error if all tags are asked (not optimized case because we # can't known the real list of tags: if tags is None: tags = () received = set() for msg in msgs: obj = msg['data'] assert isinstance(obj, TqlObject), 'data obj is not TqlObject' # Update tags if received from the peer: if msg.get('error') is not None: logging.warning('Error from client while getting tags: %s', msg.get('error')) for name in tags: if name in obj: old = obj[name] obj[name] = '#ERR#OLD:%s' % old else: obj[name] = '#ERR#' self._ttls[(obj['id'], name)] = now + timedelta(0, 10) else: returned = msg['return'] obj = msg['data'] assert isinstance(returned, dict), 'returned tags is not a dict' assert isinstance(obj, TqlObject), 'data obj is not TqlObject' received.add(obj['id']) for name, attrs in returned.iteritems(): obj[name] = attrs.get('value') Loading @@ -188,7 +212,15 @@ class ObjectsDB(object): self._ttls[(obj['id'], name)] = tod def _update_object(self, obj, tags, tags_novalue): for oid in set(self._requested) - received: obj = self.get_by_id(oid) o_tags = self._requested[oid] if o_tags is not None: for name in o_tags: obj[name] = '#ERR#' self._ttls[(obj['id'], name)] = now + timedelta(0, 10) def _update_object(self, obj, tags, tags_novalue, only_udt=False): ''' Calculate the list of tags to request for a client, and send the request to the client. Loading @@ -204,6 +236,9 @@ class ObjectsDB(object): user_tags = parent['__user_defined_tags'] client_id = parent['id'] if only_udt: return if tags is not None: tags = set(tags) tags -= set(user_tags) Loading Loading @@ -257,6 +292,7 @@ class ObjectsDB(object): msg_id = client.connection.async_call('get_tags', tags, tags_novalue, _data=obj) self._requested[obj['id']] = tags self._msgs.append(msg_id) def get_by_id(self, oid): Loading Loading
ccserver/objectsdb.py +43 −7 Original line number Diff line number Diff line Loading @@ -51,6 +51,7 @@ class ObjectsDB(object): # The list of received message while update: self._msgs = [] self._requested = {} # object id -> set of tags # Total amount of query updates: self._nb_queries = 0 Loading @@ -66,9 +67,10 @@ class ObjectsDB(object): ''' with self._lock: self._requested = {} self._nb_queries += 1 self._update(ids, tags, tags_novalue) self._process_responses() self._process_responses(tags) self._update_user_defined(ids) def _update(self, ids=None, tags=None, tags_novalue=None): Loading Loading @@ -113,12 +115,14 @@ class ObjectsDB(object): if '__parent' in obj: # Current object is proxified by a parent: parent = obj['__parent'] self._update_object(parent, tags, tags_novalue) # Parent is removed from the list if it present (because # it has been updated): if parent['id'] in ids: self._update_object(parent, tags, tags_novalue) ids.remove(parent['id']) else: self._update_object(parent, (), (), only_udt=True) # Search for sibling objects: sg = set((o for o in ids if o.startswith('%s.' % parent['id']))) Loading Loading @@ -154,7 +158,12 @@ class ObjectsDB(object): obj.update(user_tags) def _process_responses(self): # Delete ttls if present: for tag in user_tags: if (obj['id'], tag) in self._ttls: del self._ttls[(obj['id'], tag)] def _process_responses(self, tags): ''' Process the responses received from clients. ''' Loading @@ -162,17 +171,32 @@ class ObjectsDB(object): msgs = self._server.manager.wait(frozenset(self._msgs), timeout=4) now = datetime.now() # Avoid error if all tags are asked (not optimized case because we # can't known the real list of tags: if tags is None: tags = () received = set() for msg in msgs: obj = msg['data'] assert isinstance(obj, TqlObject), 'data obj is not TqlObject' # Update tags if received from the peer: if msg.get('error') is not None: logging.warning('Error from client while getting tags: %s', msg.get('error')) for name in tags: if name in obj: old = obj[name] obj[name] = '#ERR#OLD:%s' % old else: obj[name] = '#ERR#' self._ttls[(obj['id'], name)] = now + timedelta(0, 10) else: returned = msg['return'] obj = msg['data'] assert isinstance(returned, dict), 'returned tags is not a dict' assert isinstance(obj, TqlObject), 'data obj is not TqlObject' received.add(obj['id']) for name, attrs in returned.iteritems(): obj[name] = attrs.get('value') Loading @@ -188,7 +212,15 @@ class ObjectsDB(object): self._ttls[(obj['id'], name)] = tod def _update_object(self, obj, tags, tags_novalue): for oid in set(self._requested) - received: obj = self.get_by_id(oid) o_tags = self._requested[oid] if o_tags is not None: for name in o_tags: obj[name] = '#ERR#' self._ttls[(obj['id'], name)] = now + timedelta(0, 10) def _update_object(self, obj, tags, tags_novalue, only_udt=False): ''' Calculate the list of tags to request for a client, and send the request to the client. Loading @@ -204,6 +236,9 @@ class ObjectsDB(object): user_tags = parent['__user_defined_tags'] client_id = parent['id'] if only_udt: return if tags is not None: tags = set(tags) tags -= set(user_tags) Loading Loading @@ -257,6 +292,7 @@ class ObjectsDB(object): msg_id = client.connection.async_call('get_tags', tags, tags_novalue, _data=obj) self._requested[obj['id']] = tags self._msgs.append(msg_id) def get_by_id(self, oid): Loading