Newer
Older
""" This module contains some cc-server specific helpers for the CloudControl
tags database provided in cc-commons.
"""
from datetime import datetime
from sjrpc.core import AsyncWatcher
from cloudcontrol.common.tql.db.tag import BaseTag, BaseTagInterface
from cloudcontrol.common.tql.db.object import TqlObject
from cloudcontrol.common.tql.db.requestor import StaticRequestor, fetcher
class SObject(TqlObject):
""" A TQL object with features specific to cc-server.
"""
def __init__(self, *args, **kwargs):
super(SObject, self).__init__(*args, **kwargs)
self._overridden = defaultdict(lambda: None)
def register(self, tag, override=False):
""" Register a tag on this object (or override).
"""
# The tag to register must override an eventual existing tag.
# Overridden tag is moved in the overridden tags dict:
self._overridden[tag.name] = self._tags[tag.name]
elif tag.name in self._tags:
# The tag to register is already overridden, we place it directly
# on the overridden tags dict:
if tag.name in self._overridden:
raise KeyError('A tag with this name is already registered on this object')
self._overridden[tag.name] = tag
return
def unregister(self, name, override=False):
""" Unregister a tag on this object (or remove override).
"""
super(SObject, self).unregister(name)
# If a tag is overriden, replace it on the tag list:
if override and name in self._overridden:
self._tags[name] = self._overridden[name]
del self._overridden[name]
def is_overriding(self, name):
""" Return True if a tag is overriding another one for the name.
If the tag is not found, False is returned.
"""
return self._overridden[name] is not None
@abstractproperty
def client(self):
""" The client on which do the fetch.
"""
@abstractproperty
def cached(self):
""" The cached value stored in the tag. Can raise an OutdatedCacheError
if the cache is out of date.
"""
@cached.setter
def cached(self):
""" Write the new cached value.
"""
class RemoteTag(BaseTag):
""" A tag which is available remotely on a client.
"""
def __init__(self, name, callback, ttl=None):
super(RemoteTag, self).__init__(name)
self._cache_last_update = None
self._cache_value = u''
@property
def callback(self):
return self._callback
@property
def ttl(self):
if self._cache_last_update is None:
return 0
elif self._ttl is None:
return float('inf')
else:
dt = datetime.now() - self._cache_last_update
age = dt.seconds + dt.days * 60 * 60 * 24
return self._ttl - age
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@ttl.setter
def ttl(self, value):
self._ttl = value
@property
def cached(self):
""" Get the cached value.
"""
if self.ttl > 0:
return self._cache_value
else:
raise self.OutdatedCacheError('Cache is out of date')
@cached.setter
def cached(self, value):
""" Set the cached value.
"""
self._cache_value = value
self._cache_last_update = datetime.now()
def invalidate(self):
self._cache_last_update = None
class OutdatedCacheError(Exception):
pass
CCSAsyncTagInterface.register(RemoteTag)
class SRequestor(StaticRequestor):
def fetcher_ccs_async(self, map):
""" Fetching method using asynchronous call from sjrpc to get values
from the remote client.
"""
watcher = AsyncWatcher()
# Do the requests to remote clients:
for obj, tags in map.iteritems():
to_update = set()
for tag in tags:
try:
obj.set(tag.name, tag.cached)
except tag.OutdatedCacheError:
to_update.add(tag)
if to_update:
# All remote tags of an object are always bound to the same
# client. Request for tag value is made in a single call to
# avoid multiple query/response, so we take the callback from
# the first tag to do the update on the whole:
cb = tuple(to_update)[0].callback
cb(watcher, obj, [t.name for t in to_update])
# Get and process the results:
for update in watcher.iter(timeout=4, raise_timeout=True): #TODO: adaptative timeout
requested_tags, obj = update['data']
for tag_name in requested_tags:
obj.set(tag_name, '#ERR#')
else:
tags = update['return']
for tag_name in requested_tags:
tag_value = tags.get(tag_name)
if tag_value is None:
obj.set(tag_name, '#ERR')
else:
obj.set(tag_name, tag_value)
obj[tag_name].cached = tag_value # Set the tag cache value