Skip to content
db.py 5.51 KiB
Newer Older
""" This module contains some cc-server specific helpers for the CloudControl
    tags database provided in cc-commons.
"""

Antoine Millet's avatar
Antoine Millet committed
from abc import abstractproperty
from datetime import datetime
Antoine Millet's avatar
Antoine Millet committed
from collections import defaultdict

from sjrpc.core import AsyncWatcher

Antoine Millet's avatar
Antoine Millet committed
from cloudcontrol.common.tql.db.tag import BaseTag, BaseTagInterface
from cloudcontrol.common.tql.db.object import TqlObject
from cloudcontrol.common.tql.db.requestor import StaticRequestor, fetcher


class SObject(TqlObject):

    """ A TQL object with features specific to cc-server.
    """

Antoine Millet's avatar
Antoine Millet committed
    def __init__(self, *args, **kwargs):
        super(SObject, self).__init__(*args, **kwargs)
        self._overridden = defaultdict(lambda: None)
    def register(self, tag, override=False):
        """ Register a tag on this object (or override).
        """

Antoine Millet's avatar
Antoine Millet committed
        if override:
            # The tag to register must override an eventual existing tag.
            # Overridden tag is moved in the overridden tags dict:
Antoine Millet's avatar
Antoine Millet committed
            if tag.name in self._tags:
                self._overridden[tag.name] = self._tags[tag.name]
Antoine Millet's avatar
Antoine Millet committed
                del self._tags[tag.name]
        elif tag.name in self._tags:
            # The tag to register is already overridden, we place it directly
            # on the overridden tags dict:
            if tag.name in self._overridden:
                raise KeyError('A tag with this name is already registered on this object')
            self._overridden[tag.name] = tag
            return
Antoine Millet's avatar
Antoine Millet committed
        return super(SObject, self).register(tag)

    def unregister(self, name, override=False):
        """ Unregister a tag on this object (or remove override).
        """
Antoine Millet's avatar
Antoine Millet committed
        super(SObject, self).unregister(name)
        # If a tag is overriden, replace it on the tag list:
        if override and name in self._overridden:
            self._tags[name] = self._overridden[name]
            del self._overridden[name]

    def is_overriding(self, name):
        """ Return True if a tag is overriding another one for the name.

        If the tag is not found, False is returned.
        """
        return self._overridden[name] is not None
Antoine Millet's avatar
Antoine Millet committed
class CCSAsyncTagInterface(BaseTagInterface):
Antoine Millet's avatar
Antoine Millet committed
    @abstractproperty
    def client(self):
        """ The client on which do the fetch.
        """
Antoine Millet's avatar
Antoine Millet committed
    @abstractproperty
    def cached(self):
        """ The cached value stored in the tag. Can raise an OutdatedCacheError
            if the cache is out of date.
        """
Antoine Millet's avatar
Antoine Millet committed
    @cached.setter
    def cached(self):
        """ Write the new cached value.
        """
    """ A tag which is available remotely on a client.
    """

    def __init__(self, name, callback, ttl=None):
        super(RemoteTag, self).__init__(name)
        self._callback = callback
        self._cache_last_update = None
        self._cache_value = u''

    @property
    def callback(self):
        return self._callback

    @property
    def ttl(self):
        if self._cache_last_update is None:
            return 0
        elif self._ttl is None:
            return float('inf')
        else:
            dt = datetime.now() - self._cache_last_update
            age = dt.seconds + dt.days * 60 * 60 * 24
            return self._ttl - age
    @ttl.setter
    def ttl(self, value):
        self._ttl = value

    @property
    def cached(self):
        """ Get the cached value.
        """
        if self.ttl > 0:
            return self._cache_value
        else:
            raise self.OutdatedCacheError('Cache is out of date')

    @cached.setter
    def cached(self, value):
        """ Set the cached value.
        """
        self._cache_value = value
        self._cache_last_update = datetime.now()

    def invalidate(self):
        self._cache_last_update = None


    class OutdatedCacheError(Exception):
        pass


Antoine Millet's avatar
Antoine Millet committed
CCSAsyncTagInterface.register(RemoteTag)


class SRequestor(StaticRequestor):

Antoine Millet's avatar
Antoine Millet committed
    @fetcher(CCSAsyncTagInterface)
    def fetcher_ccs_async(self, map):
        """ Fetching method using asynchronous call from sjrpc to get values
            from the remote client.
        """
        watcher = AsyncWatcher()
        # Do the requests to remote clients:
        for obj, tags in map.iteritems():
            to_update = set()
            for tag in tags:
                try:
                    obj.set(tag.name, tag.cached)
                except tag.OutdatedCacheError:
                    to_update.add(tag)
            if to_update:
                # All remote tags of an object are always bound to the same
                # client. Request for tag value is made in a single call to
                # avoid multiple query/response, so we take the callback from
                # the first tag to do the update on the whole:
                cb = tuple(to_update)[0].callback
                cb(watcher, obj, [t.name for t in to_update])
        # Get and process the results:
        for update in watcher.iter(timeout=4, raise_timeout=True):  #TODO: adaptative timeout
            requested_tags, obj = update['data']
            if 'return' not in update:
                for tag_name in requested_tags:
                    obj.set(tag_name, '#ERR#')
            else:
                tags = update['return']
                for tag_name in requested_tags:
                    tag_value = tags.get(tag_name)
                    if tag_value is None:
                        obj.set(tag_name, '#ERR')
                    else:
                        obj.set(tag_name, tag_value)
                        obj[tag_name].cached = tag_value  # Set the tag cache value