Skip to content
objectsdb.py 13.7 KiB
Newer Older
#!/usr/bin/env python
#coding=utf8

'''
The current living objects database.
'''

from __future__ import absolute_import

from datetime import datetime, timedelta
import logging
from threading import RLock

from ccserver.tql import TqlObject
from ccserver.orderedset import OrderedSet
from ccserver.exceptions import AlreadyRegistered, UnknownObjectError

DEFAULT_TTL = 0
Antoine Millet's avatar
Antoine Millet committed
TTL_SERVER_DELTA = 1 # Delta to apply to TTL for all tags

class ObjectsDB(object):
    '''
    The object database.

    This class abstract the access to the current living objects database. The
    database store a list of :class:`TqlObject` instances, each defining an
    object with its tags.

    Objects are registered by :meth:`register` method, and removed by
    :meth:`unregister` method. You can update the tags of registered objects
    with :meth:`update` method. A call to :meth:`all` method give you the
    complete list of objects with an automatic update before to return it.

    .. note:
       All calls to the :class:`ObjectDB` are thread-safe.
    '''

    def __init__(self, server):

        # The server to use to complete database:
        self._server = server

        # The object database:
        self._objects = {} # Dict of oid -> TqlObject instances

        # The TTL database:
        self._ttls = {} # (obj_id, tag) -> time of death

        # The access lock of the object database:
        self._lock = RLock()

        # The list of received message while update:
        self._msgs = []
        self._requested = {} # object id -> set of tags
        # Total amount of query updates:
        self._nb_queries = 0

    def update(self, ids=None, tags=None, tags_novalue=None):
        '''
        Update the database according to the TTL values.

        :param ids: list of ids to update (or None for all)
        :param tags: list of tags to update (or None for all)
        :param tags_noval: list of tags to update (only the name, not the
            attached value, None for all)
        '''

        with self._lock:
            self._requested = {}
            self._nb_queries += 1
            self._update(ids, tags, tags_novalue)
            self._process_responses(tags)
            self._update_user_defined(ids)

    def _update(self, ids=None, tags=None, tags_novalue=None):
        '''
        First step of the database update process.

        This methods make essentially two things:

            * Calculate the final list of tags to get and to check
            * Send the request to each client involved in the update

        .. warning:
            The call to this method is not threadsafe and must be done only
            by :meth:`update` method.
        '''

        self._msgs = []
        
        # Cast input to the right type:
        if tags is not None:
            tags = set(tags)

        if tags_novalue is not None:
            tags_novalue = set(tags_novalue)
            if tags is not None:
                tags_novalue -= tags
        
        if ids is not None:
            ids = set(ids)
            ids = set(self._objects)

        # The main-loop, we will pop each object id from the set and
        # process it:
        while ids:
            oid = ids.pop()
            obj = self._objects.get(oid)

            if obj is None:
                raise UnknownObjectError('%r not found in database' % oid)

            if '__parent' in obj:
                # Current object is proxified by a parent:
                parent = obj['__parent']

                # Parent is removed from the list if it present (because
                # it has been updated):
                if parent['id'] in ids:
                    self._update_object(parent, tags, tags_novalue)
                    ids.remove(parent['id'])
                else:
                    self._update_object(parent, (), (), only_udt=True)

                # Search for sibling objects:
                sg = set((o for o in ids if o.startswith('%s.' % parent['id'])))
                sg &= ids
                sg.add(oid)

                for sibling in sg:
                    sibling = self.get_by_id(sibling)
                    self._update_object(sibling, tags, tags_novalue)
            else:
                # Current object is a standard node:
                self._update_object(obj, tags, tags_novalue)

    def _update_user_defined(self, ids):
        '''
        Update the user defined tags (previously fetched
        by :meth:`_update_object`).
        '''

        if ids is not None:
            ids = set(ids)
        else:
            ids = set(self._objects)

        for objid in ids:
            obj = self.get_by_id(objid)

            if '__parent' in obj:
                user_tags = obj['__parent']['__user_defined_tags']
            else:
                user_tags = obj['__user_defined_tags']

            obj.update(user_tags)

            # Delete ttls if present:
            for tag in user_tags:
                if (obj['id'], tag) in self._ttls:
                    del self._ttls[(obj['id'], tag)]
    def _process_responses(self, tags):
        '''
        Process the responses received from clients.
        '''
        
        msgs = self._server.manager.wait(frozenset(self._msgs), timeout=4)
        now = datetime.now()

        # Avoid error if all tags are asked (not optimized case because we
        # can't known the real list of tags:
        if tags is None:
            tags = ()

        received = set()
        
        for msg in msgs:
            obj = msg['data']
            assert isinstance(obj, TqlObject), 'data obj is not TqlObject'

            # Update tags if received from the peer:
            if msg.get('error') is not None:
                logging.warning('Error from client while getting tags: %s',
                                msg.get('error'))
                for name in tags:
                    if name in obj:
                        old = obj[name]
                        obj[name] = '#ERR#OLD:%s' % old
                    else:
                        obj[name] = '#ERR#'
                    self._ttls[(obj['id'], name)] = now + timedelta(0, 10)
            else:
                returned = msg['return']
                assert isinstance(returned, dict), 'returned tags is not a dict'
                received.add(obj['id'])

                for name, attrs in returned.iteritems():
                    obj[name] = attrs.get('value')

                    # Update the ttl
                    ttl = attrs.get('ttl', DEFAULT_TTL)
                    if ttl == -1:
                        tod = datetime.max
                    else:
                        ttl += TTL_SERVER_DELTA
                        dt = timedelta(seconds=ttl)
                        tod = now + dt

                    self._ttls[(obj['id'], name)] = tod

        for oid in set(self._requested) - received:
            obj = self.get_by_id(oid)
            o_tags = self._requested[oid]
            if o_tags is not None:
                for name in o_tags:
                    obj[name] = '#ERR#'
                    self._ttls[(obj['id'], name)] = now + timedelta(0, 10)

    def _update_object(self, obj, tags, tags_novalue, only_udt=False):
        '''
        Calculate the list of tags to request for a client, and send the
        request to the client.
Antoine Millet's avatar
Antoine Millet committed

        :param obj: the object to update
        :param tags: the tags to update on the object
        :param tags_novalue: the tags to update (do not fetch values) on the
            object
        :param only_udt: only update the user defined tags
        parent = obj.get('__parent')

        if parent is None:
            user_tags = self._server.conf.show(obj['id'])['tags']
            obj['__user_defined_tags'] = user_tags
            client_id = obj['id']
        else:
            user_tags = parent['__user_defined_tags']
            client_id = parent['id']

        if only_udt:
            return

        if tags is not None:
            tags -= set(user_tags)

        if tags_novalue is not None:
            tags_novalue = set(tags_novalue)
            tags_novalue -= set(user_tags)

        try:
            client = self._server.get_connection(client_id)
        except KeyError:
            # The client is not connected, we need to keep only unttlised tags:
            for name in obj.keys():
                if (obj['id'], name) in self._ttls:
                    del obj[name]
        else:
            # The client is connected:
            if parent is None:
                obj.update(client.get_tags())

            # First, check if each tag is not already cached:
            now = datetime.now()
            if tags is not None:
                for tag in tags.copy():
                    if now < self._ttls.get((obj['id'], tag), now):
                        tags.remove(tag)
                # Cast as jsonizable type:
                tags = tuple(tags)

            if tags_novalue is not None:
                for tag in tags_novalue.copy():
                    if now < self._ttls.get((obj['id'], tag), now):
                        tags_novalue.remove(tag)
                # Cast as jsonizable type:
                tags_novalue = tuple(tags_novalue)

            # Ask tags to the node:
            if (tags is None or tags) or (tags_novalue is None or tags_novalue):
                self._request_tags(obj, client, tags, tags_novalue)

    def _request_tags(self, obj, client, tags, tags_novalue):
        '''
        Send the request to the client.
        '''
        
        if '__parent' in obj:
            subid = obj['id'].partition('.')[2]
            msg_id = client.connection.async_call('sub_tags', subid, tags,
                                                  tags_novalue, _data=obj)
        else:
            msg_id = client.connection.async_call('get_tags', tags,
                                                  tags_novalue, _data=obj)

        self._requested[obj['id']] = tags
        self._msgs.append(msg_id)

    def get_by_id(self, oid):
        '''
        Get an registered object by its id.

        :param oid: the complete oid of the object to get
        :return: a :class:`TqlObject` instance
        '''
        return self._objects[oid]

        '''
        Populate the database with the specified object. All the tags specified
        in the passed object are not declared in cache and will never be
        cleaned.

        :param obj: the object to register
        :param clenable: the optionnal list of tags to declare as cleanable when
            the cache is cleaned
        '''

        with self._lock:
            if 'id' not in obj:
                raise ValueError('id tag must exists on the object')
            elif obj['id'] in self._objects:
                raise AlreadyRegistered('The object is already registered')
            else:
                self._objects[obj['id']] = obj
                for tag in cleanable:
                    self._ttls[(obj['id'], tag)] = datetime.max

    def unregister(self, obj_id):
        '''
        Remove an object from the manager by its id.
        '''

        with self._lock:
            if obj_id in self._objects:
                self.unregister_children(obj_id)
                del self._objects[obj_id]
                raise ValueError('The object %r is not registered' % obj_id)

    def unregister_children(self, obj_id):
        '''
        Remove all children of an object.
        '''

        with self._lock:
            if obj_id in self._objects:
                parent = self._objects[obj_id]

                for key, obj in self._objects.items():
                    if obj.get('__parent') is parent:
                        self.clean_ttls(obj['id'])
                        del self._objects[key]
            else:
                raise ValueError('The object %r is not registered' % obj_id)

    def clean_ttls(self, obj_id):
        '''
        Remove TTLs for all tags for the specified object.
        '''

        with self._lock:
            obj = self.get_by_id(obj_id)
            for oid, tag in self._ttls.keys():
                if oid == obj_id:
                    del self._ttls[(oid, tag)]
    def get_ids(self):
        '''
        Get the set of all the first level ids.
        '''

        ids = set()
        for obj in self._objects.values():
            if not '__parent' in obj:
                ids.add(obj['id'])
    def get(self, oid):
        '''
        Get the specified object by it id or return None.
        '''
        return self._objects.get(oid)

    def all(self, tags, to_check):
        '''
        Get all the objects registered on the server with specified tags.
        '''

        self.update(tags=tags, tags_novalue=to_check)
        for obj in self._objects.values():
            yield obj

    def some(self, ids, tags=set(), to_check=set()):
        '''
        Get specified objects registed on the server with specified tags.
        '''

        self.update(ids, tags=tags, tags_novalue=to_check)
        for obj_id in ids:
            yield self._objects[obj_id]

    def stats(self):
        '''
        Get stats about the current state of the object database.
        '''

        stats = {}

        stats['count'] = len(self._objects)
        cached = dead = 0
        now = datetime.now()
        for date in self._ttls.values():
            if date > now:
                dead += 1
            else:
                cached += 1
        stats['cached'] = cached
        stats['dead'] = dead
        stats['total'] = cached + dead
        stats['queries'] = self._nb_queries

        return stats