#!/usr/bin/env python #coding=utf8 ''' The current living objects database. ''' from __future__ import absolute_import from datetime import datetime, timedelta import logging from threading import RLock from ccserver.tql import TqlObject from ccserver.orderedset import OrderedSet from ccserver.exceptions import AlreadyRegistered, UnknownObjectError DEFAULT_TTL = 0 TTL_SERVER_DELTA = 1 # Delta to apply to TTL for all tags class ObjectsDB(object): ''' The object database. This class abstract the access to the current living objects database. The database store a list of :class:`TqlObject` instances, each defining an object with its tags. Objects are registered by :meth:`register` method, and removed by :meth:`unregister` method. You can update the tags of registered objects with :meth:`update` method. A call to :meth:`all` method give you the complete list of objects with an automatic update before to return it. .. note: All calls to the :class:`ObjectDB` are thread-safe. ''' def __init__(self, server): # The server to use to complete database: self._server = server # The object database: self._objects = {} # Dict of oid -> TqlObject instances # The TTL database: self._ttls = {} # (obj_id, tag) -> time of death # The access lock of the object database: self._lock = RLock() # The list of received message while update: self._msgs = [] self._requested = {} # object id -> set of tags # Total amount of query updates: self._nb_queries = 0 def update(self, ids=None, tags=None, tags_novalue=None): ''' Update the database according to the TTL values. :param ids: list of ids to update (or None for all) :param tags: list of tags to update (or None for all) :param tags_noval: list of tags to update (only the name, not the attached value, None for all) ''' with self._lock: self._requested = {} self._nb_queries += 1 self._update(ids, tags, tags_novalue) self._process_responses(tags) self._update_user_defined(ids) def _update(self, ids=None, tags=None, tags_novalue=None): ''' First step of the database update process. This methods make essentially two things: * Calculate the final list of tags to get and to check * Send the request to each client involved in the update .. warning: The call to this method is not threadsafe and must be done only by :meth:`update` method. ''' self._msgs = [] # Cast input to the right type: if tags is not None: tags = set(tags) if tags_novalue is not None: tags_novalue = set(tags_novalue) if tags is not None: tags_novalue -= tags if ids is not None: ids = set(ids) else: ids = set(self._objects) # The main-loop, we will pop each object id from the set and # process it: while ids: oid = ids.pop() obj = self._objects.get(oid) if obj is None: raise UnknownObjectError('%r not found in database' % oid) if '__parent' in obj: # Current object is proxified by a parent: parent = obj['__parent'] # Parent is removed from the list if it present (because # it has been updated): if parent['id'] in ids: self._update_object(parent, tags, tags_novalue) ids.remove(parent['id']) else: self._update_object(parent, (), (), only_udt=True) # Search for sibling objects: sg = set((o for o in ids if o.startswith('%s.' % parent['id']))) sg &= ids ids -= sg sg.add(oid) for sibling in sg: sibling = self.get_by_id(sibling) self._update_object(sibling, tags, tags_novalue) else: # Current object is a standard node: self._update_object(obj, tags, tags_novalue) def _update_user_defined(self, ids): ''' Update the user defined tags (previously fetched by :meth:`_update_object`). ''' if ids is not None: ids = set(ids) else: ids = set(self._objects) for objid in ids: obj = self.get_by_id(objid) if '__parent' in obj: user_tags = obj['__parent']['__user_defined_tags'] else: user_tags = obj['__user_defined_tags'] obj.update(user_tags) # Delete ttls if present: for tag in user_tags: if (obj['id'], tag) in self._ttls: del self._ttls[(obj['id'], tag)] def _process_responses(self, tags): ''' Process the responses received from clients. ''' msgs = self._server.manager.wait(frozenset(self._msgs), timeout=4) now = datetime.now() # Avoid error if all tags are asked (not optimized case because we # can't known the real list of tags: if tags is None: tags = () received = set() for msg in msgs: obj = msg['data'] assert isinstance(obj, TqlObject), 'data obj is not TqlObject' # Update tags if received from the peer: if msg.get('error') is not None: logging.warning('Error from client while getting tags: %s', msg.get('error')) for name in tags: if name in obj: old = obj[name] obj[name] = '#ERR#OLD:%s' % old else: obj[name] = '#ERR#' self._ttls[(obj['id'], name)] = now + timedelta(0, 10) else: returned = msg['return'] assert isinstance(returned, dict), 'returned tags is not a dict' received.add(obj['id']) for name, attrs in returned.iteritems(): obj[name] = attrs.get('value') # Update the ttl ttl = attrs.get('ttl', DEFAULT_TTL) if ttl == -1: tod = datetime.max else: ttl += TTL_SERVER_DELTA dt = timedelta(seconds=ttl) tod = now + dt self._ttls[(obj['id'], name)] = tod for oid in set(self._requested) - received: obj = self.get_by_id(oid) o_tags = self._requested[oid] if o_tags is not None: for name in o_tags: obj[name] = '#ERR#' self._ttls[(obj['id'], name)] = now + timedelta(0, 10) def _update_object(self, obj, tags, tags_novalue, only_udt=False): ''' Calculate the list of tags to request for a client, and send the request to the client. :param obj: the object to update :param tags: the tags to update on the object :param tags_novalue: the tags to update (do not fetch values) on the object :param only_udt: only update the user defined tags ''' parent = obj.get('__parent') if parent is None: user_tags = self._server.conf.show(obj['id'])['tags'] obj['__user_defined_tags'] = user_tags client_id = obj['id'] else: user_tags = parent['__user_defined_tags'] client_id = parent['id'] if only_udt: return if tags is not None: tags = set(tags) tags -= set(user_tags) if tags_novalue is not None: tags_novalue = set(tags_novalue) tags_novalue -= set(user_tags) try: client = self._server.get_connection(client_id) except KeyError: # The client is not connected, we need to keep only unttlised tags: for name in obj.keys(): if (obj['id'], name) in self._ttls: del obj[name] else: # The client is connected: if parent is None: obj.update(client.get_tags()) # First, check if each tag is not already cached: now = datetime.now() if tags is not None: for tag in tags.copy(): if now < self._ttls.get((obj['id'], tag), now): tags.remove(tag) # Cast as jsonizable type: tags = tuple(tags) if tags_novalue is not None: for tag in tags_novalue.copy(): if now < self._ttls.get((obj['id'], tag), now): tags_novalue.remove(tag) # Cast as jsonizable type: tags_novalue = tuple(tags_novalue) # Ask tags to the node: if (tags is None or tags) or (tags_novalue is None or tags_novalue): self._request_tags(obj, client, tags, tags_novalue) def _request_tags(self, obj, client, tags, tags_novalue): ''' Send the request to the client. ''' if '__parent' in obj: subid = obj['id'].partition('.')[2] msg_id = client.connection.async_call('sub_tags', subid, tags, tags_novalue, _data=obj) else: msg_id = client.connection.async_call('get_tags', tags, tags_novalue, _data=obj) self._requested[obj['id']] = tags self._msgs.append(msg_id) def get_by_id(self, oid): ''' Get an registered object by its id. :param oid: the complete oid of the object to get :return: a :class:`TqlObject` instance ''' return self._objects[oid] def register(self, obj, cleanable=()): ''' Populate the database with the specified object. All the tags specified in the passed object are not declared in cache and will never be cleaned. :param obj: the object to register :param clenable: the optionnal list of tags to declare as cleanable when the cache is cleaned ''' with self._lock: if 'id' not in obj: raise ValueError('id tag must exists on the object') elif obj['id'] in self._objects: raise AlreadyRegistered('The object is already registered') else: self._objects[obj['id']] = obj for tag in cleanable: self._ttls[(obj['id'], tag)] = datetime.max def unregister(self, obj_id): ''' Remove an object from the manager by its id. ''' with self._lock: if obj_id in self._objects: self.unregister_children(obj_id) self.clean_ttls(obj_id) del self._objects[obj_id] else: raise ValueError('The object %r is not registered' % obj_id) def unregister_children(self, obj_id): ''' Remove all children of an object. ''' with self._lock: if obj_id in self._objects: parent = self._objects[obj_id] for key, obj in self._objects.items(): if obj.get('__parent') is parent: self.clean_ttls(obj['id']) del self._objects[key] else: raise ValueError('The object %r is not registered' % obj_id) def clean_ttls(self, obj_id): ''' Remove TTLs for all tags for the specified object. ''' with self._lock: obj = self.get_by_id(obj_id) for oid, tag in self._ttls.keys(): if oid == obj_id: del self._ttls[(oid, tag)] try: del obj[tag] except: pass def get_ids(self): ''' Get the set of all the first level ids. ''' ids = set() for obj in self._objects.values(): if not '__parent' in obj: ids.add(obj['id']) return ids def get(self, oid): ''' Get the specified object by it id or return None. ''' return self._objects.get(oid) def all(self, tags, to_check): ''' Get all the objects registered on the server with specified tags. ''' self.update(tags=tags, tags_novalue=to_check) for obj in self._objects.values(): yield obj def some(self, ids, tags=set(), to_check=set()): ''' Get specified objects registed on the server with specified tags. ''' self.update(ids, tags=tags, tags_novalue=to_check) for obj_id in ids: yield self._objects[obj_id] def stats(self): ''' Get stats about the current state of the object database. ''' stats = {} stats['count'] = len(self._objects) cached = dead = 0 now = datetime.now() for date in self._ttls.values(): if date > now: dead += 1 else: cached += 1 stats['cached'] = cached stats['dead'] = dead stats['total'] = cached + dead stats['queries'] = self._nb_queries return stats