Skip to content
tags.py 21.1 KiB
Newer Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl.  If not, see <http://www.gnu.org/licenses/>.

Anael Beutot's avatar
Anael Beutot committed
import inspect
import logging
import weakref
from functools import partial
Anael Beutot's avatar
Anael Beutot committed
from itertools import chain
from collections import defaultdict

from cloudcontrol.common.client.utils import main_thread
from cloudcontrol.common.client.exc import TagConflict

Anael Beutot's avatar
Anael Beutot committed

logger = logging.getLogger(__name__)


class Tag(object):
    """``class`` that abstract tags and act as a simple container."""
    def __init__(self, name, valuable, ttl=-1, refresh=None, parent=None,
                 background=False):
Anael Beutot's avatar
Anael Beutot committed
        """
        :param string name: tag name
        :param valuable: something that gives tag value, string or callable
        :param None,int ttl: Time to live for caching the tags, possible values
            are:
                * None (means no ttl)
                * -1 (means infinite ttl)
                * positive integer in seconds

        :param None,int refresh: period used to refresh tag value on the node
        :param object parent: parent object the tag is attached to, it may be
Anael Beutot's avatar
Anael Beutot committed
            needed for the tag callable to process the tag
        :param bool background: calculate the tag value in a background thread
            (migth be usefull to not block the event loop)
Anael Beutot's avatar
Anael Beutot committed
        """
        self.name = name
        self.value = None
        self.is_function = False
        self.ttl = ttl
        self.refresh = refresh
        self.parent = parent if parent is None else weakref.proxy(parent)
        self.background = bool(background)
Anael Beutot's avatar
Anael Beutot committed

        if inspect.isfunction(valuable):
            self.is_function = True
            args_count = len(inspect.getargspec(valuable).args)
            if args_count > 1:
                raise TypeError('Tag function take at most 1 argument')
            elif args_count == 1:
                self._calculate_value = partial(valuable, self.parent)
            else:
                self._calculate_value = valuable
        else:
            self.value = valuable

        #: timer instance
Anael Beutot's avatar
Anael Beutot committed
        self.watcher = None
        #: async watcher in case of background is True
        self.async = None
Anael Beutot's avatar
Anael Beutot committed

        # special arguments for tag db
        self.db = None
        self.sub_id = None

    def calculate_value(self):
        try:
            self.value = self._calculate_value()
        except Exception:
            logger.exception('Cannot calculate tag value for %s', self.name)
            self.value = None
        # logger.debug('Calculate Tag(%s) = %s', self.name, self.value)

    def update_value(self):
        """Called when the tag value may change."""
        prev_value = self.value
        if not self.background:
            self.calculate_value()
            if self.value != prev_value:
                self._handle_registration(prev_value)
            return

        # else
        # create a thread that will run the calculate_value method
        def thread_run():
            self.calculate_value()
            self.async.send()

        # keep previous value in watcher's opaque
        self.async.data = prev_value
        self.async.start()
        th = threading.Thread(target=thread_run)
        th.daemon = True
        th.start()

    def async_cb(self, watcher, revents):
        if watcher.data != self.value:
            self._handle_registration(watcher.data)

        watcher.data = None
        watcher.stop()

    def _handle_registration(self, prev_value):
Anael Beutot's avatar
Anael Beutot committed
        if self.db is None:
            # when we calculate the tag, the latter could raise an exception
            # and could provoke a deconnection to the libvirt for example,
            # that will also provoke a deregister of some tags which might
            # include the current (in case this happens, just return)
            return

        if prev_value is None:
Anael Beutot's avatar
Anael Beutot committed
            # we need to register tag again
            if self.sub_id == '__main__':
                logger.debug('Register tag %s', self.name)
                self.db.rpc_register_tag(self)
            else:
                logger.debug('Register sub tag %s.%s', self.sub_id,
                             self.name)
                self.db.rpc_register_sub_tag(self.sub_id, self)
        elif self.value is None:
            # we drop the tag
            if self.sub_id == '__main__':
                logger.debug('Unregister tag %s', self.name)
                self.db.rpc_unregister_tag(self.name)
            else:
                logger.debug(
                    'Unregister sub tag %s.%s', self.sub_id, self.name)
                self.db.rpc_unregister_sub_tag(self.sub_id, self.name)
        # if tag is not pushed
        elif self.ttl != -1:
            return
        else:
            # update the tag value
            logger.debug('Update tag value %s', self.name)
            self.db.rpc_update_tag(self.sub_id, self)

    def start(self, loop):
        """
        :param loop: pyev loop
        """
        if self.background:
            self.async = loop.async(self.async_cb)

Anael Beutot's avatar
Anael Beutot committed
        if not self.is_function:
            return

        if self.refresh is None:
            self.calculate_value()
            return

        # TODO more sophisticated calculation with event propagation
        self.watcher = loop.timer(.0, float(self.refresh), lambda *args:
                                  self.update_value())
        self.watcher.start()

    def stop(self):
        if self.watcher is not None:
            self.watcher.stop()
            self.watcher = None


def tag_inspector(mod, parent=None):
    """Inspect module to find tags.

    :param module mod: module to inspect

    Currently there are two ways to define a tag inside a module:

        * affect a string to a variable, the variable name will be the tag
          name
        * define a function that returns a value as a string or None (meaning
          the tag doesn't exist on the host), as you guessed the function name
          will define the tag name
    """
    tags = []
    for n, m in inspect.getmembers(mod):  # (name, member)
        # only keep strings or functions as tags
        if getattr(m, '__module__', None) != mod.__name__ or (
            n.startswith('_')):
            continue
        elif isinstance(m, (str, unicode)):
            # if string, it means it is constant, then ttl = -1
            ttl = -1
            refresh = None
Anael Beutot's avatar
Anael Beutot committed
        elif inspect.isfunction(m):
            # if function take function ttl argument or set -1 by default
            ttl = getattr(m, 'ttl', -1)
            refresh = getattr(m, 'refresh', None)
            background = getattr(m, 'background', False)
Anael Beutot's avatar
Anael Beutot committed
        else:
            # whatever it is we don't care...
            continue

        logger.debug('Introspected %s with ttl %s, refresh %s, background %s for object %s',
                     n, ttl, refresh, background, parent)
Anael Beutot's avatar
Anael Beutot committed

        # finally add the tag
        tags.append(Tag(n, m, ttl, refresh, parent, background))
Anael Beutot's avatar
Anael Beutot committed

    return tags


# decorators for tag inspector
def ttl(value):
    def decorator(func):
        func.ttl = value
        return func
    return decorator


def refresh(value):
    def decorator(func):
        func.refresh = value
        return func
    return decorator


def background(func):
    func.background = True
    return func


Anael Beutot's avatar
Anael Beutot committed
def get_tags(tags_dict, tags=None):
    """Helper to get tags.

    :param tags_dict: dict containing :class:`Tag` objects
    :param tags: list of tags to get (None mean all tags)
    """
    # logger.debug('Tags request: %s', unicode(tags))

    if tags is None:
        tags = tags_dict.iterkeys()
    else:
        tags = set(tags) & set(tags_dict)

    result = dict((
        t,  # tag name
        tags_dict[t].value,
    ) for t in tags)

    # logger.debug('Returning: %s', unicode(result))
    return result


def rpc_simple_cb(log_msg):
    """Log a message in case of error of the rpc call.

    It is a factory that is used in TagDB

    :param str log_msg: format logging message with 3 %s arguments
        (opaque, error class, error message)
    """
    def cb(self, call_id, response, error):
        opaque = self.async_calls.pop(call_id)
        if error:
            logger.error(log_msg, opaque, error['exception'],
                         error['message'])
            return

        logger.debug('Simple cb for %s succeed', opaque)

    return cb


class TagDB(object):
    """Tag database. FIXME comment

    Handles common operations such as registering tag on the cc-server,
    updating its values, etc.

    TagDB can have a parent TagDB, in this case, the latter could handle tag
    registration on the cc-server.

    """
Anaël Beutot's avatar
Anaël Beutot committed
    def __init__(self, parent_db=None, tags=None):
Anael Beutot's avatar
Anael Beutot committed
        """
        :param TagDB parent_db: TagDB parent object
        :param iterable tags: initial tags
        """
        self._parent = parent_db

        if tags is None:
            tags = tuple()

        self.db = defaultdict(
            dict,
            __main__=dict(),  # tags for main object
            # others objects
        )
        #: associate type for each sub object
        self._object_types = dict()
Anael Beutot's avatar
Anael Beutot committed

        for tag in tags:
            self.add_tag(tag)

    def set_parent(self, parent):
Anael Beutot's avatar
Anael Beutot committed
        """Set parent tag database."""
        # check if previous parent
        if self._parent is not None:
            # we must remove tags from old parent
            self._parent.remove_tags(self.db['__main__'])
            for sub_id in self.db:
                if sub_id == '__main__':
                    continue
                self._parent.remove_sub_object(sub_id)
        # set new parent
        self._parent = parent
        if self._parent is not None:
            # add tags in new parent
            self._parent.add_tags(self.db['__main__'].itervalues())
Anael Beutot's avatar
Anael Beutot committed

            for sub_id, db in self.db.iteritems():
                if sub_id == '__main__':
                    continue
                self._parent.add_sub_object(sub_id, db.itervalues(),
                                            self._object_types[sub_id])
    def check_tags_conflict(self, *tag_names):
        """Checks a list of tag names that might conflict before inserting in
        TagDB hierarchy

        .. warning::
            This is in no way a guarantee that following inserts will succeed.
        """
        conflicts = []
        parent_check = []
        for name in tag_names:
            if name in self.db['__main__']:
                conflicts.append(name)
            else:
                parent_check.append(name)

        if self._parent is not None and parent_check:
            conflicts.extend(self._parent.check_tags_conflict(*parent_check))

        return conflicts

Anael Beutot's avatar
Anael Beutot committed
    # tag handling part, used by plugins
    def add_tags(self, tags):
Anael Beutot's avatar
Anael Beutot committed
        """
        :param iterable tags: list of tags to add
        """
        for tag in tags:
            self.add_tag(tag)

    def add_sub_tags(self, sub_id, tags):
Anael Beutot's avatar
Anael Beutot committed
        for tag in tags:
            self.add_sub_tag(sub_id, tag)
Anael Beutot's avatar
Anael Beutot committed

    def remove_tags(self, tag_names):
        """
        :param iterable tag_names: list of tag names to remove
        """
        for name in tag_names:
            self.remove_tag(name)

    def check_tag(self, tag):
        """Checks that a given tag object is the same instance in the db

        Usefull for checking before removing.
        """
        return id(self.db['__main__'].get(tag.name, None)) == id(tag)

Anael Beutot's avatar
Anael Beutot committed
    def add_tag(self, tag):
        if tag.name in self.db['__main__']:
            raise TagConflict(
                'A tag with the name %s is already registered' % tag.name)

        # first add the tag to the child db, in case of conflict in the parent,
        # the tag is recorded in the child thus if the latter's parent is
        # changed it can be recorded again after
        self.db['__main__'][tag.name] = tag
Anael Beutot's avatar
Anael Beutot committed
        # set special attributes on tag instance
        if self._parent is not None:
            self._parent.add_tag(tag)

    def remove_tag(self, tag_name):
        tag = self.db['__main__'].pop(tag_name)
        if self._parent is not None and self._parent.check_tag(tag):
            # remove tag in parent db only if it is the same instance
Anael Beutot's avatar
Anael Beutot committed
            self._parent.remove_tag(tag_name)

    def check_sub_tag(self, sub_id, tag):
        return id(self.db[sub_id].get(tag.name, None)) == id(tag)

Anael Beutot's avatar
Anael Beutot committed
    def add_sub_tag(self, sub_id, tag):
        if tag.name in self.db[sub_id]:
            raise TagConflict(
                'A tag with the name %s is already registered' % tag.name)

        self.db[sub_id][tag.name] = tag
Anael Beutot's avatar
Anael Beutot committed
        if self._parent is not None:
            self._parent.add_sub_tag(sub_id, tag)

    def remove_sub_tag(self, sub_id, tag_name):
        tag = self.db[sub_id].pop(tag_name)
        if self._parent is not None and self._parent.check_tag(tag):
Anael Beutot's avatar
Anael Beutot committed
            self._parent.remove_sub_tag(sub_id, tag_name)

    def add_sub_object(self, sub_id, tags, type_):
        self._object_types[sub_id] = type_
Anael Beutot's avatar
Anael Beutot committed
        if self._parent is not None:
            # tags will be added after
            self._parent.add_sub_object(sub_id, tuple(), type_)
Anael Beutot's avatar
Anael Beutot committed
        # add sub object tags
        for t in tags:
            self.add_sub_tag(sub_id, t)

    def remove_sub_object(self, sub_id):
        del self.db[sub_id]
        del self._object_types[sub_id]
Anael Beutot's avatar
Anael Beutot committed
        if self._parent is not None:
            self._parent.remove_sub_object(sub_id)

    # dict like
    def get(self, key, default=None):
        return self.db.get(key, default)

    def __getitem__(self, key):
        return self.db[key]

    def keys(self):
        return self.db.keys()

    def iteritems(self):
        return self.db.iteritems()

    def itervalues(self):
        return self.db.itervalues()
    # end dict like


class RootTagDB(TagDB):
    """Root tag database.

    It takes care of tag registration with cc-server. It has no parent.
    """
Anaël Beutot's avatar
Anaël Beutot committed
    def __init__(self, main, tags=None):
Anael Beutot's avatar
Anael Beutot committed
        """
        :param main: MainLoop instance
        :param tags: initial tags
        """
        self.main = main

Anael Beutot's avatar
Anael Beutot committed
        #: dict for RPC async call storage, keep a part of log message
Anael Beutot's avatar
Anael Beutot committed
        self.async_calls = dict()

Anaël Beutot's avatar
Anaël Beutot committed
        TagDB.__init__(self, tags=tags)
    def set_parent(self, parent):
Anaël Beutot's avatar
Anaël Beutot committed
        raise NotImplementedError('Cannot set parent on RootTagDB')
Anael Beutot's avatar
Anael Beutot committed
    # RPC part
    def rpc_call(self, opaque, callback, remote_name, *args, **kwargs):
        """Local helper for all rpc calls.

        :param opaque: data to associate with the async call
        :param callable callback: callback when call is done, signature is the
            same as for :py:meth:`RpcConnection.async_call_cb`
        :param remote_name: remote method name
        :param \*args: arguments for the call
        :param \*\*kwargs: keyword arguments for the call
        """
        # call only if connected and authenticated to the cc-server
        if self.main.rpc_authenticated:
            # logger.debug('RPC call %s %s', remote_name, args)
            self.async_calls[self.main.rpc_con.rpc.async_call_cb(
                callback, remote_name, *args, **kwargs)] = opaque

    def rpc_register(self):
        """Register all objects and tags on the cc-server.

        This is used just after a (re)connection to the cc-server is made.
        """
        for sub_id in self.db:
            if sub_id == '__main__':
                continue
            self.rpc_register_sub_object(sub_id, self._object_types[sub_id])
Anael Beutot's avatar
Anael Beutot committed

        # register tags on the cc-server
        for tag in self.db['__main__'].itervalues():
            if tag.value is not None:
                self.rpc_register_tag(tag)

        # register sub tags on the cc-server
        for tag in chain.from_iterable(
            db.itervalues() for name, db in self.db.iteritems()
            if name != '__main__'):
            if tag.value is not None:
                self.rpc_register_sub_tag(tag.sub_id, tag)

    def rpc_register_sub_object(self, sub_id, type_):
Anael Beutot's avatar
Anael Beutot committed
        # register object on the cc-server
        self.rpc_call(sub_id, self.rpc_object_register_cb,
                      'register', sub_id, type_)
Anael Beutot's avatar
Anael Beutot committed

    def rpc_unregister_sub_object(self, sub_id):
        self.rpc_call(
            sub_id, self.rpc_object_register_cb, 'unregister', sub_id)

    #: this is a method
    rpc_object_register_cb = rpc_simple_cb(
        'Error while trying to register the object %s, %s("%s")')

    rpc_object_unregister_cb = rpc_simple_cb(
        'Error while trying to unregister the object %s, %s("%s")')

    def rpc_register_tag(self, tag):
        logger.debug('RPC register tag %s', tag.name)
        ttl = None if tag.ttl == -1 else tag.ttl
        self.rpc_call(tag.name, self.rpc_tag_register_cb, 'tags_register',
                      tag.name, ttl, unicode(tag.value))
Anael Beutot's avatar
Anael Beutot committed

    def rpc_unregister_tag(self, tag_name):
        logger.debug('RPC unregister tag %s', tag_name)
        self.rpc_call(tag_name, self.rpc_tag_unregister_cb, 'tags_unregister',
                      tag_name)

    def rpc_update_tag(self, sub_id, tag):
        """Update tag value on cc-server."""
        logger.debug('RPC update tag %s(%s)', tag.name, sub_id)
        if sub_id == '__main__':
            self.rpc_call(sub_id + tag.name, self.rpc_update_tag_cb,
                          'tags_update', tag.name, unicode(tag.value))
Anael Beutot's avatar
Anael Beutot committed
        else:
            self.rpc_call(sub_id + tag.name, self.rpc_update_tag_cb,
                          'sub_tags_update', sub_id, tag.name, unicode(tag.value))
Anael Beutot's avatar
Anael Beutot committed

    def rpc_register_sub_tag(self, sub_id, tag):
        logger.debug('RPC register tag %s(%s)', tag.name, sub_id)
        ttl = None if tag.ttl == -1 else tag.ttl
        self.rpc_call(tag.name, self.rpc_sub_tag_register_cb,
                      'sub_tags_register', sub_id, tag.name, ttl,
                      unicode(tag.value))
Anael Beutot's avatar
Anael Beutot committed

    def rpc_unregister_sub_tag(self, sub_id, tag_name):
        logger.debug('RPC unregister tag %s(%s)', tag_name, sub_id)
        self.rpc_call(tag_name, self.rpc_sub_tag_unregister_cb,
                      'sub_tags_unregister', sub_id, tag_name)

    #: this is a method
    rpc_tag_register_cb = rpc_simple_cb(
        'Error while trying to register tag %s, %s("%s")')
    rpc_tag_unregister_cb = rpc_simple_cb(
        'Error while trying to unregister tag %s, %s("%s")')
    rpc_sub_tag_register_cb = rpc_simple_cb(
        'Error while registering sub tag %s, %s("%s")')
    rpc_sub_tag_unregister_cb = rpc_simple_cb(
        'Error while unregistering sub tag %s, %s("%s")')
    rpc_update_tag_cb = rpc_simple_cb(
        'Error while trying to update tag %s, %s("%s")')
    # end RPC part

Anael Beutot's avatar
Anael Beutot committed
    def add_tag(self, tag):
        if tag.name in self.db['__main__']:
            raise TagConflict(
                'A tag with the name %s was already registered' % tag.name)

Anael Beutot's avatar
Anael Beutot committed
        # set special attributes on tag instance
        tag.db = self
        tag.sub_id = '__main__'
        tag.start(self.main.evloop)
        # register tag on the cc-server
        if tag.value is not None:
            self.rpc_register_tag(tag)
        self.db['__main__'][tag.name] = tag

Anael Beutot's avatar
Anael Beutot committed
    def remove_tag(self, tag_name):
        tag = self.db['__main__'].pop(tag_name)
        tag.db = None
        tag.sub_id = None
        tag.stop()
        # unregister tag on the cc-server
        if tag.value is not None:
            self.rpc_unregister_tag(tag_name)

Anael Beutot's avatar
Anael Beutot committed
    def add_sub_tag(self, sub_id, tag):
        if tag.name in self.db[sub_id]:
            raise TagConflict(
                'A tag with the name %s (%s) was already registered' %
                (tag.name, sub_id))

Anael Beutot's avatar
Anael Beutot committed
        tag.db = self
        tag.sub_id = sub_id
        tag.start(self.main.evloop)
        # register tag to the cc-server
        if tag.value is not None:
            self.rpc_register_sub_tag(sub_id, tag)
        self.db[sub_id][tag.name] = tag

Anael Beutot's avatar
Anael Beutot committed
    def remove_sub_tag(self, sub_id, tag_name):
        tag = self.db[sub_id].pop(tag_name)
        tag.db = None
        tag.sub_id = None
        tag.stop()
        # unregister tag to the cc-server
        if tag.value is not None:
            self.rpc_unregister_sub_tag(sub_id, tag_name)

    def add_sub_object(self, sub_id, tags, type_):
        self.rpc_register_sub_object(sub_id, type_)
        self._object_types[sub_id] = type_
Anael Beutot's avatar
Anael Beutot committed
        # add sub object tags
        for t in tags:
            self.add_sub_tag(sub_id, t)

Anael Beutot's avatar
Anael Beutot committed
    def remove_sub_object(self, sub_id):
        for tag in self.db[sub_id].itervalues():
            tag.stop()
            tag.db = None
            tag.sub_id = None
            # we don't need to unregister each sub tag on the cc-server because
            # it will be done when we unregister the object
        del self.db[sub_id]
        del self._object_types[sub_id]
Anael Beutot's avatar
Anael Beutot committed
        self.rpc_unregister_sub_object(sub_id)