Skip to content
objectsdb.py 9.6 KiB
Newer Older
#!/usr/bin/env python
#coding=utf8

'''
The current living objects database.
'''

from datetime import datetime, timedelta
import logging
from threading import RLock

from tql import TqlObject
from exceptions import AlreadyRegistered

DEFAULT_TTL = 0
TTL_SERVER_DELTA = 2 # Delta to apply for all tags

class ObjectsDB(object):
    '''
    The object database.

    This class abstract the access to the current living objects database. The
    database store a list of :class:`TqlObject` instances, each defining an
    object with its tags.

    Objects are registered by :meth:`register` method, and removed by
    :meth:`unregister` method. You can update the tags of registered objects
    with :meth:`update` method. A call to :meth:`all` method give you the
    complete list of objects with an automatic update before to return it.

    .. note:
       All calls to the :class:`ObjectDB` are thread-safe.
    '''

    def __init__(self, server):

        # The server to use to complete database:
        self._server = server

        # The object database:
        self._objects = {} # Dict of oid -> TqlObject instances

        # The TTL database:
        self._ttls = {} # (obj_id, tag) -> time of death

        # The access lock of the object database:
        self._lock = RLock()

        # The list of received message while update:
        self._msgs = []

    def update(self, ids=None, tags=None, tags_novalue=None):
        '''
        Update the database according to the TTL values.

        :param ids: list of ids to update (or None for all)
        :param tags: list of tags to update (or None for all)
        :param tags_noval: list of tags to update (only the name, not the
            attached value, None for all)
        '''

        with self._lock:
            self._update(ids, tags, tags_novalue)
            self._process_responses()

    def _update(self, ids=None, tags=None, tags_novalue=None):
        '''
        First step of the database update process.

        This methods make essentially two things:

            * Calculate the final list of tags to get and to check
            * Send the request to each client involved in the update

        .. warning:
            The call to this method is not threadsafe and must be done only
            by :meth:`update` method.
        '''

        self._msgs = []
        
        # Cast input to the right type:
        if tags is not None:
            tags = set(tags)

        if tags_novalue is not None:
            tags_novalue = set(tags_novalue)
            if tags is not None:
                tags_novalue -= tags
        
        if ids is not None:
            ids = OrderedSet(ids)
            ids = OrderedSet(self._objects)

        # The main-loop, we will pop each object id from the set and
        # process it:
        while ids:
            oid = ids.pop()
            obj = self._objects.get(oid)

            assert obj is not None, 'object not found'

            if '__parent' in obj:
                # Current object is proxified by a parent:
                parent = obj['__parent']
                self._update_object(parent, tags, tags_novalue)

                # Parent is removed from the list if it present (because
                # it has been updated):
                if parent['id'] in ids:
                    ids.remove(parent['id'])

                # Search for sibling objects:
                sg = set((o for o in ids if o.startswith('%s.' % parent['id'])))
                ids -= sg

                for sibling in sg:
                    sibling = self.get_by_id(sibling)
                    self._update_object(sibling, tags, tags_novalue)
            else:
                # Current object is a standard node:
                self._update_object(obj, tags, tags_novalue)
    
    def _process_responses(self):
        '''
        Process the responses received from clients.
        '''
        
        msgs = self._server.manager.wait(frozenset(self._msgs), timeout=10)
        now = datetime.now()

        for msg in msgs:
            if msg.get('error') is not None:
                logging.error('Error from client while getting tags')
            else:
                returned = msg['return']
                obj = msg['data']

                assert isinstance(returned, dict), 'returned tags is not a dict'
                assert isinstance(obj, TqlObject), 'data obj is not TqlObject'

                if '__parent' in obj:
                    user_tags = obj['__parent']['__user_defined_tags']
                else:
                    user_tags = obj['__user_defined_tags']

                for name, attrs in returned.iteritems():
                    obj[name] = attrs.get('value')

                    # Update the ttl
                    ttl = attrs.get('ttl', DEFAULT_TTL)
                    if ttl == -1:
                        tod = datetime.max
                    else:
                        ttl += TTL_SERVER_DELTA
                        dt = timedelta(seconds=ttl)
                        tod = now + dt

                    self._ttls[(obj['id'], name)] = tod

                obj.update(user_tags)

    def _update_object(self, obj, tags, tags_novalue):
        '''
        Calculate the list of tags to request for a client, and send the
        request to the client.
        '''
        parent = obj.get('__parent')

        if parent is None:
            user_tags = self._server.conf.show(obj['id'])['tags']
            obj['__user_defined_tags'] = user_tags
            client_id = obj['id']
        else:
            user_tags = parent['__user_defined_tags']
            client_id = parent['id']

        if tags is not None:
            tags -= set(user_tags)

        if tags_novalue is not None:
            tags_novalue -= set(user_tags)

        try:
            client = self._server.get_connection(client_id)
        except KeyError:
            pass
        else:
            # The client is connected:
            if parent is None:
                obj.update(client.get_tags())

            # First, check if each tag is not already cached:
            now = datetime.now()
            if tags is not None:
                for tag in tags.copy():
                    if now < self._ttls.get((obj['id'], tag), now):
                        tags.remove(tag)
                # Cast as jsonizable type:
                tags = tuple(tags)

            if tags_novalue is not None:
                for tag in tags_novalue.copy():
                    if now < self._ttls.get((obj['id'], tag), now):
                        tags_novalue.remove(tag)
                # Cast as jsonizable type:
                tags_novalue = tuple(tags_novalue)

            # Ask tags to the node:
            if (tags is None or tags) or (tags_novalue is None or tags_novalue):
                self._request_tags(obj, client, tags, tags_novalue)

    def _request_tags(self, obj, client, tags, tags_novalue):
        '''
        Send the request to the client.
        '''
        
        if '__parent' in obj:
            subid = obj['id'].partition('.')[2]
            msg_id = client.connection.async_call('sub_tags', subid, tags,
                                                  tags_novalue, _data=obj)
        else:
            msg_id = client.connection.async_call('get_tags', tags,
                                                  tags_novalue, _data=obj)

        self._msgs.append(msg_id)

    def get_by_id(self, oid):
        '''
        Get an registered object by its id.

        :param oid: the complete oid of the object to get
        :return: a :class:`TqlObject` instance
        '''
        return self._objects[oid]

    def register(self, obj):
        '''
        Populate the database with the specified object. All the tags specified
        in the passed object will be declared as infite TTL.
        '''

        with self._lock:
            if 'id' not in obj:
                raise ValueError('id tag must exists on the object')
            elif obj['id'] in self._objects:
                raise AlreadyRegistered('The object is already registered')
            else:
                self._objects[obj['id']] = obj

    def unregister(self, obj_id):
        '''
        Remove an object from the manager by its id.
        '''

        with self._lock:
            if obj_id in self._objects:
                self.unregister_children(obj_id)
                del self._objects[obj_id]
            else:
                raise ValueError('The object is not registered')

    def unregister_children(self, obj_id):
        '''
        Remove all children of an object.
        '''

        with self._lock:
            if obj_id in self._objects:
                parent = self._objects[obj_id]

                for key, obj in self._objects.items():
                    if obj.get('__parent') is parent:
                        del self._objects[key]
            else:
                raise ValueError('The object %r is not registered' % obj_id)

    def get_ids(self):
        '''
        Get the set of all the first level ids.
        '''

        ids = set()
        for oid in self._objects:
            parid = oid.partition('.')[0]
            ids.add(parid)

        return ids

    def all(self, tags, to_check):
        '''
        Get all the objects registered on the server with specified tags.
        '''

        self.update(tags=tags, tags_novalue=to_check)
        for obj in self._objects.values():
            yield obj

    def some(self, ids, tags=set(), to_check=set()):
        '''
        Get specified objects registed on the server with specified tags.
        '''

        self.update(ids, tags=tags, tags_novalue=to_check)
        for obj_id in ids:
            yield self._objects[obj_id]