Commit 956349a1 authored by Anael Beutot's avatar Anael Beutot Committed by Antoine Millet
Browse files

Added tag utils for clients

parent c4fb1590
Loading
Loading
Loading
Loading
+0 −0

Empty file added.

+517 −0
Original line number Diff line number Diff line
import inspect
import logging
import weakref
from functools import partial
from itertools import chain
from collections import defaultdict


logger = logging.getLogger(__name__)


class Tag(object):
    """``class`` that abstract tags and act as a simple container."""
    def __init__(self, name, valuable, ttl=-1, refresh=None, parent=None):
        """
        :param string name: tag name
        :param valuable: something that gives tag value, string or callable
        :param None,int ttl: Time to live for caching the tags, possible values
            are:
                * None (means no ttl)
                * -1 (means infinite ttl)
                * positive integer in seconds

        :param None,int refresh: period used to refresh tag value on the node
        :param object obj: parent object the tag is attached to, it may be
            needed for the tag callable to process the tag
        """
        self.name = name
        self.value = None
        self.is_function = False
        self.ttl = ttl
        self.refresh = refresh
        self.parent = parent if parent is None else weakref.proxy(parent)

        if inspect.isfunction(valuable):
            self.is_function = True
            args_count = len(inspect.getargspec(valuable).args)
            if args_count > 1:
                raise TypeError('Tag function take at most 1 argument')
            elif args_count == 1:
                self._calculate_value = partial(valuable, self.parent)
            else:
                self._calculate_value = valuable
        else:
            self.value = valuable

        self.watcher = None

        # special arguments for tag db
        self.db = None
        self.sub_id = None

    def calculate_value(self):
        try:
            self.value = self._calculate_value()
        except Exception:
            logger.exception('Cannot calculate tag value for %s', self.name)
            self.value = None
        # logger.debug('Calculate Tag(%s) = %s', self.name, self.value)

    def update_value(self):
        """Called when the tag value may change."""
        prev_value = self.value
        self.calculate_value()

        if self.db is None:
            # when we calculate the tag, the latter could raise an exception
            # and could provoke a deconnection to the libvirt for example,
            # that will also provoke a deregister of some tags which might
            # include the current (in case this happens, just return)
            return

        if prev_value == self.value:
            return
        elif prev_value is None:
            # we need to register tag again
            if self.sub_id == '__main__':
                logger.debug('Register tag %s', self.name)
                self.db.rpc_register_tag(self)
            else:
                logger.debug('Register sub tag %s.%s', self.sub_id,
                             self.name)
                self.db.rpc_register_sub_tag(self.sub_id, self)
        elif self.value is None:
            # we drop the tag
            if self.sub_id == '__main__':
                logger.debug('Unregister tag %s', self.name)
                self.db.rpc_unregister_tag(self.name)
            else:
                logger.debug(
                    'Unregister sub tag %s.%s', self.sub_id, self.name)
                self.db.rpc_unregister_sub_tag(self.sub_id, self.name)
        # if tag is not pushed
        elif self.ttl != -1:
            return
        else:
            # update the tag value
            logger.debug('Update tag value %s', self.name)
            self.db.rpc_update_tag(self.sub_id, self)

    def start(self, loop):
        """
        :param loop: pyev loop
        """
        if not self.is_function:
            return

        if self.refresh is None:
            self.calculate_value()
            return

        # TODO more sophisticated calculation with event propagation
        self.watcher = loop.timer(.0, float(self.refresh), lambda *args:
                                  self.update_value())
        self.watcher.start()

    def stop(self):
        if self.watcher is not None:
            self.watcher.stop()
            self.watcher = None


def tag_inspector(mod, parent=None):
    """Inspect module to find tags.

    :param module mod: module to inspect

    Currently there are two ways to define a tag inside a module:

        * affect a string to a variable, the variable name will be the tag
          name
        * define a function that returns a value as a string or None (meaning
          the tag doesn't exist on the host), as you guessed the function name
          will define the tag name
    """
    tags = []
    for n, m in inspect.getmembers(mod):  # (name, member)
        # only keep strings or functions as tags
        if getattr(m, '__module__', None) != mod.__name__ or (
            n.startswith('_')):
            continue
        elif isinstance(m, (str, unicode)):
            # if string, it means it is constant, then ttl = -1
            ttl = -1
            refresh = None
        elif inspect.isfunction(m):
            # if function take function ttl argument or set -1 by default
            ttl = getattr(m, 'ttl', -1)
            refresh = getattr(m, 'refresh', None)
        else:
            # whatever it is we don't care...
            continue

        logger.debug('Introspected %s with ttl %s, refresh %s for object %s',
                     n, ttl, refresh, parent)

        # finally add the tag
        tags.append(Tag(n, m, ttl, refresh, parent))

    return tags


# decorators for tag inspector
def ttl(value):
    def decorator(func):
        func.ttl = value
        return func
    return decorator


def refresh(value):
    def decorator(func):
        func.refresh = value
        return func
    return decorator


def get_tags(tags_dict, tags=None):
    """Helper to get tags.

    :param tags_dict: dict containing :class:`Tag` objects
    :param tags: list of tags to get (None mean all tags)
    """
    # logger.debug('Tags request: %s', unicode(tags))

    if tags is None:
        tags = tags_dict.iterkeys()
    else:
        tags = set(tags) & set(tags_dict)

    result = dict((
        t,  # tag name
        tags_dict[t].value,
    ) for t in tags)

    # logger.debug('Returning: %s', unicode(result))
    return result


def rpc_simple_cb(log_msg):
    """Log a message in case of error of the rpc call.

    It is a factory that is used in TagDB

    :param str log_msg: format logging message with 3 %s arguments
        (opaque, error class, error message)
    """
    def cb(self, call_id, response, error):
        opaque = self.async_calls.pop(call_id)
        if error:
            logger.error(log_msg, opaque, error['exception'],
                         error['message'])
            return

        logger.debug('Simple cb for %s succeed', opaque)

    return cb


class TagDB(object):
    """Tag database. FIXME comment

    Handles common operations such as registering tag on the cc-server,
    updating its values, etc.

    TagDB can have a parent TagDB, in this case, the latter could handle tag
    registration on the cc-server.

    """
    def __init__(self, parent_db=None, tags=None, sub_tags=None):
        """
        :param TagDB parent_db: TagDB parent object
        :param iterable tags: initial tags
        :param dict sub_tags: initial subtags
        """
        self._parent = parent_db

        if tags is None:
            tags = tuple()
        if sub_tags is None:
            sub_tags = {}

        self.db = defaultdict(
            dict,
            __main__=dict(),  # tags for main object
            # others objects
        )

        for tag in tags:
            self.add_tag(tag)

        for sub_id, tags in sub_tags.iteritems():
            for tag in tags:
                self.add_sub_tag(sub_id, tag)

    def set_parent(self, parent):
        """Set parent tag database."""
        # check if previous parent
        if self._parent is not None:
            # we must remove tags from old parent
            self._parent.remove_tags(self.db['__main__'])
            for sub_id in self.db:
                if sub_id == '__main__':
                    continue
                self._parent.remove_sub_object(sub_id)
        # set new parent
        self._parent = parent
        if self._parent is not None:
            # add tags in new parent
            self._parent.add_tags(self.db['__main__'].itervalues())

            for sub_id, db in self.db.iteritems():
                if sub_id == '__main__':
                    continue
                self._parent.add_sub_object(sub_id, db.itervalues())

    # tag handling part, used by plugins
    def add_tags(self, tags):
        """
        :param iterable tags: list of tags to add
        """
        for tag in tags:
            self.add_tag(tag)

    def add_sub_tags(self, sub_id, tags):
        for tag in tags:
            self.add_sub_tag(sub_id, tag)

    def remove_tags(self, tag_names):
        """
        :param iterable tag_names: list of tag names to remove
        """
        for name in tag_names:
            self.remove_tag(name)

    def add_tag(self, tag):
        # set special attributes on tag instance
        if self._parent is not None:
            self._parent.add_tag(tag)
        self.db['__main__'][tag.name] = tag

    def remove_tag(self, tag_name):
        self.db['__main__'].pop(tag_name)
        if self._parent is not None:
            self._parent.remove_tag(tag_name)

    def add_sub_tag(self, sub_id, tag):
        if self._parent is not None:
            self._parent.add_sub_tag(sub_id, tag)
        self.db[sub_id][tag.name] = tag

    def remove_sub_tag(self, sub_id, tag_name):
        self.db[sub_id].pop(tag_name)
        if self._parent is not None:
            self._parent.remove_sub_tag(sub_id, tag_name)

    def add_sub_object(self, sub_id, tags):
        if self._parent is not None:
            # tags will be added after
            self._parent.add_sub_object(sub_id, tuple())
        # add sub object tags
        for t in tags:
            self.add_sub_tag(sub_id, t)

    def remove_sub_object(self, sub_id):
        self.db.pop(sub_id)
        if self._parent is not None:
            self._parent.remove_sub_object(sub_id)

    # dict like
    def get(self, key, default=None):
        return self.db.get(key, default)

    def __getitem__(self, key):
        return self.db[key]

    def keys(self):
        return self.db.keys()

    def iteritems(self):
        return self.db.iteritems()

    def itervalues(self):
        return self.db.itervalues()
    # end dict like


class RootTagDB(TagDB):
    """Root tag database.

    It takes care of tag registration with cc-server. It has no parent.
    """
    def __init__(self, main, tags=None, sub_tags=None):
        """
        :param main: MainLoop instance
        :param tags: initial tags
        :param sub_tags: initial sub tags
        """
        self.main = main

        #: dict for async call storage, keep a part of log message
        self.async_calls = dict()

        TagDB.__init__(self, tags=tags, sub_tags=sub_tags)

    # RPC part
    def rpc_call(self, opaque, callback, remote_name, *args, **kwargs):
        """Local helper for all rpc calls.

        :param opaque: data to associate with the async call
        :param callable callback: callback when call is done, signature is the
            same as for :py:meth:`RpcConnection.async_call_cb`
        :param remote_name: remote method name
        :param \*args: arguments for the call
        :param \*\*kwargs: keyword arguments for the call
        """
        # call only if connected and authenticated to the cc-server
        if self.main.rpc_authenticated:
            # logger.debug('RPC call %s %s', remote_name, args)
            self.async_calls[self.main.rpc_con.rpc.async_call_cb(
                callback, remote_name, *args, **kwargs)] = opaque

    def rpc_register(self):
        """Register all objects and tags on the cc-server.

        This is used just after a (re)connection to the cc-server is made.
        """
        for sub_id in self.db:
            if sub_id == '__main__':
                continue
            self.rpc_register_sub_object(sub_id)

        # register tags on the cc-server
        for tag in self.db['__main__'].itervalues():
            if tag.value is not None:
                self.rpc_register_tag(tag)

        # register sub tags on the cc-server
        for tag in chain.from_iterable(
            db.itervalues() for name, db in self.db.iteritems()
            if name != '__main__'):
            if tag.value is not None:
                self.rpc_register_sub_tag(tag.sub_id, tag)

    def rpc_register_sub_object(self, sub_id):
        # register object on the cc-server
        self.rpc_call(sub_id, self.rpc_object_register_cb, 'register', sub_id,
                      'vm')  # FIXME generalization

    def rpc_unregister_sub_object(self, sub_id):
        self.rpc_call(
            sub_id, self.rpc_object_register_cb, 'unregister', sub_id)

    #: this is a method
    rpc_object_register_cb = rpc_simple_cb(
        'Error while trying to register the object %s, %s("%s")')

    rpc_object_unregister_cb = rpc_simple_cb(
        'Error while trying to unregister the object %s, %s("%s")')

    def rpc_register_tag(self, tag):
        logger.debug('RPC register tag %s', tag.name)
        ttl = None if tag.ttl == -1 else tag.ttl
        self.rpc_call(tag.name, self.rpc_tag_register_cb, 'tags_register',
                      tag.name, ttl, tag.value)

    def rpc_unregister_tag(self, tag_name):
        logger.debug('RPC unregister tag %s', tag_name)
        self.rpc_call(tag_name, self.rpc_tag_unregister_cb, 'tags_unregister',
                      tag_name)

    def rpc_update_tag(self, sub_id, tag):
        """Update tag value on cc-server."""
        logger.debug('RPC update tag %s(%s)', tag.name, sub_id)
        if sub_id == '__main__':
            self.rpc_call(sub_id + tag.name, self.rpc_update_tag_cb,
                          'tags_update', tag.name, tag.value)
        else:
            self.rpc_call(sub_id + tag.name, self.rpc_update_tag_cb,
                          'sub_tags_update', sub_id, tag.name, tag.value)

    def rpc_register_sub_tag(self, sub_id, tag):
        logger.debug('RPC register tag %s(%s)', tag.name, sub_id)
        ttl = None if tag.ttl == -1 else tag.ttl
        self.rpc_call(tag.name, self.rpc_sub_tag_register_cb,
                      'sub_tags_register', sub_id, tag.name, ttl, tag.value)

    def rpc_unregister_sub_tag(self, sub_id, tag_name):
        logger.debug('RPC unregister tag %s(%s)', tag_name, sub_id)
        self.rpc_call(tag_name, self.rpc_sub_tag_unregister_cb,
                      'sub_tags_unregister', sub_id, tag_name)

    #: this is a method
    rpc_tag_register_cb = rpc_simple_cb(
        'Error while trying to register tag %s, %s("%s")')
    rpc_tag_unregister_cb = rpc_simple_cb(
        'Error while trying to unregister tag %s, %s("%s")')
    rpc_sub_tag_register_cb = rpc_simple_cb(
        'Error while registering sub tag %s, %s("%s")')
    rpc_sub_tag_unregister_cb = rpc_simple_cb(
        'Error while unregistering sub tag %s, %s("%s")')
    rpc_update_tag_cb = rpc_simple_cb(
        'Error while trying to update tag %s, %s("%s")')
    # end RPC part

    def add_tag(self, tag):
        # set special attributes on tag instance
        tag.db = self
        tag.sub_id = '__main__'
        tag.start(self.main.evloop)
        # register tag on the cc-server
        if tag.value is not None:
            self.rpc_register_tag(tag)
        self.db['__main__'][tag.name] = tag

    def remove_tag(self, tag_name):
        tag = self.db['__main__'].pop(tag_name)
        tag.db = None
        tag.sub_id = None
        tag.stop()
        # unregister tag on the cc-server
        if tag.value is not None:
            self.rpc_unregister_tag(tag_name)

    def add_sub_tag(self, sub_id, tag):
        tag.db = self
        tag.sub_id = sub_id
        tag.start(self.main.evloop)
        # register tag to the cc-server
        if tag.value is not None:
            self.rpc_register_sub_tag(sub_id, tag)
        self.db[sub_id][tag.name] = tag

    def remove_sub_tag(self, sub_id, tag_name):
        tag = self.db[sub_id].pop(tag_name)
        tag.db = None
        tag.sub_id = None
        tag.stop()
        # unregister tag to the cc-server
        if tag.value is not None:
            self.rpc_unregister_sub_tag(sub_id, tag_name)

    def add_sub_object(self, sub_id, tags):
        self.rpc_register_sub_object(sub_id)
        # add sub object tags
        for t in tags:
            self.add_sub_tag(sub_id, t)

    def remove_sub_object(self, sub_id):
        for tag in self.db[sub_id].itervalues():
            tag.stop()
            tag.db = None
            tag.sub_id = None
            # we don't need to unregister each sub tag on the cc-server because
            # it will be done when we unregister the object
        del self.db[sub_id]
        self.rpc_unregister_sub_object(sub_id)