Commit 9028dae5 authored by Antoine Millet's avatar Antoine Millet
Browse files

Introduced remote tags with asynchronous fetching

parent 6053777e
Loading
Loading
Loading
Loading
+3 −4
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@ from ccserver.jobs import JobsManager
from ccserver.clients import Client

#from cloudcontrol.common.tql.db.object import TqlObject
from ccserver.db import SObject
from ccserver.db import SObject, SRequestor
from cloudcontrol.common.tql.db.tag import StaticTag
from cloudcontrol.common.tql.db.db import TqlDatabase

@@ -27,7 +27,6 @@ import ccserver.clients.hv
import ccserver.clients.bootstrap
import ccserver.clients.spv


class WelcomeHandler(RpcHandler):
    """ Default handler used on client connections of the server.
    """
@@ -70,7 +69,7 @@ class CCServer(object):
        if keyfile:
            logging.info('SSL Key: %s', certfile)

        self.db = TqlDatabase()
        self.db = TqlDatabase(default_requestor=SRequestor())

        # Create the rpc server:
        logging.info('Listening on %s:%s', address, port)
@@ -88,7 +87,7 @@ class CCServer(object):
        Update the database with accounts.
        '''

        db_accounts = set((obj['a'].get_value() for obj in self.db.objects if 'a' in obj))
        db_accounts = set((obj['a'].value for obj in self.db.objects if 'a' in obj))
        accounts = set(self.conf.list_accounts())

        to_register = accounts - db_accounts
+18 −8
Original line number Diff line number Diff line
@@ -6,13 +6,12 @@ sjRPC handler.

import logging

from functools import partial
from datetime import datetime

from cloudcontrol.common.tql.db.tag import CallbackTag

from ccserver.handlers import CCHandler, listed
from ccserver.exceptions import RightError
from ccserver.db import RemoteTag
from cloudcontrol.common.tql.db.tag import CallbackTag


class RegisteredCCHandler(CCHandler):
@@ -124,6 +123,12 @@ class Client(object):
    # Properties
    #

    @property
    def object(self):
        """ Return the tql object of this client.
        """
        return self._tql_object

    @property
    def login(self):
        """ Return the login of this client.
@@ -168,7 +173,7 @@ class Client(object):
        peer = self.conn.getpeername()
        return ':'.join(peer.split(':')[:-1])

    def get_tags(self, tags):
    def get_tags(self, tags):                                                   # DEPRECATED
        """ Get tags on the remote node.

        :param tags: tags is the list of tags to fetch
@@ -195,9 +200,15 @@ class Client(object):
        """
        self._last_action = datetime.now()

    def get_remote_tags(self, tag):
    def get_remote_tags(self, tag):                                             # DEPRECATED
        return self.conn.call('get_tags', (tag,))[tag]

    def async_remote_tags(self, watcher, robj, tags):
        """ Asynchronously update tags from the remote client using
            specified watcher.
        """
        watcher.register(self.conn, 'get_tags', tags, _data=robj)

    def tags_register(self, name, ttl=None, value=None):
        """ Register a new remote tag for the client.

@@ -207,8 +218,7 @@ class Client(object):
        :param value: value of the tag
        """

        callback = partial(self.get_remote_tags, name)
        tag = CallbackTag(name, callback, ttl=ttl)
        tag = RemoteTag(name, self, ttl=ttl)
        self._tql_object.register(tag)
        self._remote_tags.add(name)

@@ -241,6 +251,6 @@ class Client(object):
        tag = self._tql_object.get(name)
        if tag is not None:
            if value is not None:
                tag.set_value(value)
                tag.cached = value
            if ttl is not None:
                tag.ttl = ttl
+100 −8
Original line number Diff line number Diff line
@@ -2,8 +2,13 @@
    tags database provided in cc-commons.
"""

from datetime import datetime

from sjrpc.core import AsyncWatcher

from cloudcontrol.common.tql.db.tag import BaseTag
from cloudcontrol.common.tql.db.object import TqlObject
from cloudcontrol.common.tql.db.requestor import StaticRequestor, fetcher


class SObject(TqlObject):
@@ -52,9 +57,96 @@ class OverrideTag(BaseTag):
        super(OverrideTag, self).__init__(name)
        self.override = override
        self.replaced = replaced
        self.interface = self.override.interface

    @property
    def value(self):
        return self.override.value

    @value.setter
    def value(self, value):
        self.override.value = value


class RemoteTag(BaseTag):

    """ A tag which is available remotely on a client.
    """

    interface = 'cc-server.async'

    def __init__(self, name, client, ttl=None):
        super(RemoteTag, self).__init__(name)
        self._client = client
        self._ttl = ttl if ttl != -1 else None   #FIXME: ANAEL !!!!
        self._cache_last_update = None
        self._cache_value = u''

    @property
    def client(self):
        return self._client

    @property
    def ttl(self):
        if self._cache_last_update is None:
            return 0
        elif self._ttl is None:
            return float('inf')
        else:
            dt = datetime.now() - self._cache_last_update
            age = dt.seconds + dt.days * 60 * 60 * 24
            return self._ttl - age

    @ttl.setter
    def ttl(self, value):
        self._ttl = value

    @property
    def cached(self):
        """ Get the cached value.
        """
        if self.ttl > 0:
            return self._cache_value
        else:
            raise self.OutdatedCacheError('Cache is out of date')

    @cached.setter
    def cached(self, value):
        """ Set the cached value.
        """
        self._cache_value = value
        self._cache_last_update = datetime.now()

    def invalidate(self):
        self._cache_last_update = None


    def get_value(self):
        return self.override.get_value()
    class OutdatedCacheError(Exception):
        pass

    def set_value(self, value):
        return self.override.set_value(value)

class SRequestor(StaticRequestor):

    @fetcher('cc-server.async')
    def fetcher_ccs_async(self, map):
        """ Fetching method using asynchronous call from sjrpc to get values
            from the remote client.
        """
        watcher = AsyncWatcher()
        # Do the requests to remote clients:
        for obj, tags in map.iteritems():
            to_update = set()
            for tag in tags:
                try:
                    obj.set(tag.name, tag.cached)
                except tag.OutdatedCacheError:
                    to_update.add(tag)
            if to_update:
                client = tuple(to_update)[0].client  # Same client for all remote tags of an object
                client.async_remote_tags(watcher, obj, [t.name for t in to_update])
        # Get and process the results:
        for update in watcher.wait(timeout=60):  #TODO: adaptative timeout
            obj = update['data']
            for tag_name, tag_value in update['return'].iteritems():
                obj.set(tag_name, tag_value)
                obj[tag_name].cached = tag_value  # Set the tag cache value