Skip to content
objectsdb.py 9.63 KiB
Newer Older
#!/usr/bin/env python
#coding=utf8

'''
The current living objects database.
'''

from datetime import datetime, timedelta
import logging
from threading import RLock

from tql import TqlObject
Antoine Millet's avatar
Antoine Millet committed
from orderedset import OrderedSet
from exceptions import AlreadyRegistered

DEFAULT_TTL = 0
TTL_SERVER_DELTA = 2 # Delta to apply for all tags

class ObjectsDB(object):
    '''
    The object database.

    This class abstract the access to the current living objects database. The
    database store a list of :class:`TqlObject` instances, each defining an
    object with its tags.

    Objects are registered by :meth:`register` method, and removed by
    :meth:`unregister` method. You can update the tags of registered objects
    with :meth:`update` method. A call to :meth:`all` method give you the
    complete list of objects with an automatic update before to return it.

    .. note:
       All calls to the :class:`ObjectDB` are thread-safe.
    '''

    def __init__(self, server):

        # The server to use to complete database:
        self._server = server

        # The object database:
        self._objects = {} # Dict of oid -> TqlObject instances

        # The TTL database:
        self._ttls = {} # (obj_id, tag) -> time of death

        # The access lock of the object database:
        self._lock = RLock()

        # The list of received message while update:
        self._msgs = []

    def update(self, ids=None, tags=None, tags_novalue=None):
        '''
        Update the database according to the TTL values.

        :param ids: list of ids to update (or None for all)
        :param tags: list of tags to update (or None for all)
        :param tags_noval: list of tags to update (only the name, not the
            attached value, None for all)
        '''

        with self._lock:
            self._update(ids, tags, tags_novalue)
            self._process_responses()

    def _update(self, ids=None, tags=None, tags_novalue=None):
        '''
        First step of the database update process.

        This methods make essentially two things:

            * Calculate the final list of tags to get and to check
            * Send the request to each client involved in the update

        .. warning:
            The call to this method is not threadsafe and must be done only
            by :meth:`update` method.
        '''

        self._msgs = []
        
        # Cast input to the right type:
        if tags is not None:
            tags = set(tags)

        if tags_novalue is not None:
            tags_novalue = set(tags_novalue)
            if tags is not None:
                tags_novalue -= tags
        
        if ids is not None:
            ids = OrderedSet(ids)
            ids = OrderedSet(self._objects)

        # The main-loop, we will pop each object id from the set and
        # process it:
        while ids:
            oid = ids.pop()
            obj = self._objects.get(oid)

            assert obj is not None, 'object not found'

            if '__parent' in obj:
                # Current object is proxified by a parent:
                parent = obj['__parent']
                self._update_object(parent, tags, tags_novalue)

                # Parent is removed from the list if it present (because
                # it has been updated):
                if parent['id'] in ids:
                    ids.remove(parent['id'])

                # Search for sibling objects:
                sg = set((o for o in ids if o.startswith('%s.' % parent['id'])))
                ids -= sg

                for sibling in sg:
                    sibling = self.get_by_id(sibling)
                    self._update_object(sibling, tags, tags_novalue)
            else:
                # Current object is a standard node:
                self._update_object(obj, tags, tags_novalue)
    
    def _process_responses(self):
        '''
        Process the responses received from clients.
        '''
        
        msgs = self._server.manager.wait(frozenset(self._msgs), timeout=10)
        now = datetime.now()

        for msg in msgs:
            if msg.get('error') is not None:
                logging.error('Error from client while getting tags')
            else:
                returned = msg['return']
                obj = msg['data']

                assert isinstance(returned, dict), 'returned tags is not a dict'
                assert isinstance(obj, TqlObject), 'data obj is not TqlObject'

                if '__parent' in obj:
                    user_tags = obj['__parent']['__user_defined_tags']
                else:
                    user_tags = obj['__user_defined_tags']

                for name, attrs in returned.iteritems():
                    obj[name] = attrs.get('value')

                    # Update the ttl
                    ttl = attrs.get('ttl', DEFAULT_TTL)
                    if ttl == -1:
                        tod = datetime.max
                    else:
                        ttl += TTL_SERVER_DELTA
                        dt = timedelta(seconds=ttl)
                        tod = now + dt

                    self._ttls[(obj['id'], name)] = tod

                obj.update(user_tags)

    def _update_object(self, obj, tags, tags_novalue):
        '''
        Calculate the list of tags to request for a client, and send the
        request to the client.
        '''
        parent = obj.get('__parent')

        if parent is None:
            user_tags = self._server.conf.show(obj['id'])['tags']
            obj['__user_defined_tags'] = user_tags
            client_id = obj['id']
        else:
            user_tags = parent['__user_defined_tags']
            client_id = parent['id']

        if tags is not None:
            tags -= set(user_tags)

        if tags_novalue is not None:
            tags_novalue -= set(user_tags)

        try:
            client = self._server.get_connection(client_id)
        except KeyError:
            pass
        else:
            # The client is connected:
            if parent is None:
                obj.update(client.get_tags())

            # First, check if each tag is not already cached:
            now = datetime.now()
            if tags is not None:
                for tag in tags.copy():
                    if now < self._ttls.get((obj['id'], tag), now):
                        tags.remove(tag)
                # Cast as jsonizable type:
                tags = tuple(tags)

            if tags_novalue is not None:
                for tag in tags_novalue.copy():
                    if now < self._ttls.get((obj['id'], tag), now):
                        tags_novalue.remove(tag)
                # Cast as jsonizable type:
                tags_novalue = tuple(tags_novalue)

            # Ask tags to the node:
            if (tags is None or tags) or (tags_novalue is None or tags_novalue):
                self._request_tags(obj, client, tags, tags_novalue)

    def _request_tags(self, obj, client, tags, tags_novalue):
        '''
        Send the request to the client.
        '''
        
        if '__parent' in obj:
            subid = obj['id'].partition('.')[2]
            msg_id = client.connection.async_call('sub_tags', subid, tags,
                                                  tags_novalue, _data=obj)
        else:
            msg_id = client.connection.async_call('get_tags', tags,
                                                  tags_novalue, _data=obj)

        self._msgs.append(msg_id)

    def get_by_id(self, oid):
        '''
        Get an registered object by its id.

        :param oid: the complete oid of the object to get
        :return: a :class:`TqlObject` instance
        '''
        return self._objects[oid]

    def register(self, obj):
        '''
        Populate the database with the specified object. All the tags specified
        in the passed object will be declared as infite TTL.
        '''

        with self._lock:
            if 'id' not in obj:
                raise ValueError('id tag must exists on the object')
            elif obj['id'] in self._objects:
                raise AlreadyRegistered('The object is already registered')
            else:
                self._objects[obj['id']] = obj

    def unregister(self, obj_id):
        '''
        Remove an object from the manager by its id.
        '''

        with self._lock:
            if obj_id in self._objects:
                self.unregister_children(obj_id)
                del self._objects[obj_id]
            else:
                raise ValueError('The object is not registered')

    def unregister_children(self, obj_id):
        '''
        Remove all children of an object.
        '''

        with self._lock:
            if obj_id in self._objects:
                parent = self._objects[obj_id]

                for key, obj in self._objects.items():
                    if obj.get('__parent') is parent:
                        del self._objects[key]
            else:
                raise ValueError('The object %r is not registered' % obj_id)

    def get_ids(self):
        '''
        Get the set of all the first level ids.
        '''

        ids = set()
        for oid in self._objects:
            parid = oid.partition('.')[0]
            ids.add(parid)

        return ids

    def all(self, tags, to_check):
        '''
        Get all the objects registered on the server with specified tags.
        '''

        self.update(tags=tags, tags_novalue=to_check)
        for obj in self._objects.values():
            yield obj

    def some(self, ids, tags=set(), to_check=set()):
        '''
        Get specified objects registed on the server with specified tags.
        '''

        self.update(ids, tags=tags, tags_novalue=to_check)
        for obj_id in ids:
            yield self._objects[obj_id]