Newer
Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see <http://www.gnu.org/licenses/>.
import inspect
import logging
import weakref
from itertools import chain
from collections import defaultdict
from cloudcontrol.common.client.utils import main_thread
from cloudcontrol.common.client.exc import TagConflict
logger = logging.getLogger(__name__)
class Tag(object):
"""``class`` that abstract tags and act as a simple container."""
def __init__(self, name, valuable, ttl=-1, refresh=None, parent=None,
background=False):
"""
:param string name: tag name
:param valuable: something that gives tag value, string or callable
:param None,int ttl: Time to live for caching the tags, possible values
are:
* None (means no ttl)
* -1 (means infinite ttl)
* positive integer in seconds
:param None,int refresh: period used to refresh tag value on the node
:param object parent: parent object the tag is attached to, it may be
needed for the tag callable to process the tag
:param bool background: calculate the tag value in a background thread
(migth be usefull to not block the event loop)
"""
self.name = name
self.value = None
self.is_function = False
self.ttl = ttl
self.refresh = refresh
self.parent = parent if parent is None else weakref.proxy(parent)
if inspect.isfunction(valuable):
self.is_function = True
args_count = len(inspect.getargspec(valuable).args)
if args_count > 1:
raise TypeError('Tag function take at most 1 argument')
elif args_count == 1:
self._calculate_value = partial(valuable, self.parent)
else:
self._calculate_value = valuable
else:
self.value = valuable
#: async watcher in case of background is True
self.async = None
# special arguments for tag db
self.db = None
self.sub_id = None
def calculate_value(self):
try:
self.value = self._calculate_value()
except Exception:
logger.exception('Cannot calculate tag value for %s', self.name)
self.value = None
# logger.debug('Calculate Tag(%s) = %s', self.name, self.value)
def update_value(self):
"""Called when the tag value may change."""
prev_value = self.value
if not self.background:
self.calculate_value()
if self.value != prev_value:
self._handle_registration(prev_value)
return
# else
# create a thread that will run the calculate_value method
def thread_run():
self.calculate_value()
self.async.send()
# keep previous value in watcher's opaque
self.async.data = prev_value
self.async.start()
th = threading.Thread(target=thread_run)
th.daemon = True
th.start()
def async_cb(self, watcher, revents):
if watcher.data != self.value:
self._handle_registration(watcher.data)
watcher.data = None
watcher.stop()
def _handle_registration(self, prev_value):
if self.db is None:
# when we calculate the tag, the latter could raise an exception
# and could provoke a deconnection to the libvirt for example,
# that will also provoke a deregister of some tags which might
# include the current (in case this happens, just return)
return
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# we need to register tag again
if self.sub_id == '__main__':
logger.debug('Register tag %s', self.name)
self.db.rpc_register_tag(self)
else:
logger.debug('Register sub tag %s.%s', self.sub_id,
self.name)
self.db.rpc_register_sub_tag(self.sub_id, self)
elif self.value is None:
# we drop the tag
if self.sub_id == '__main__':
logger.debug('Unregister tag %s', self.name)
self.db.rpc_unregister_tag(self.name)
else:
logger.debug(
'Unregister sub tag %s.%s', self.sub_id, self.name)
self.db.rpc_unregister_sub_tag(self.sub_id, self.name)
# if tag is not pushed
elif self.ttl != -1:
return
else:
# update the tag value
logger.debug('Update tag value %s', self.name)
self.db.rpc_update_tag(self.sub_id, self)
def start(self, loop):
"""
:param loop: pyev loop
"""
if self.background:
self.async = loop.async(self.async_cb)
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
if not self.is_function:
return
if self.refresh is None:
self.calculate_value()
return
# TODO more sophisticated calculation with event propagation
self.watcher = loop.timer(.0, float(self.refresh), lambda *args:
self.update_value())
self.watcher.start()
def stop(self):
if self.watcher is not None:
self.watcher.stop()
self.watcher = None
def tag_inspector(mod, parent=None):
"""Inspect module to find tags.
:param module mod: module to inspect
Currently there are two ways to define a tag inside a module:
* affect a string to a variable, the variable name will be the tag
name
* define a function that returns a value as a string or None (meaning
the tag doesn't exist on the host), as you guessed the function name
will define the tag name
"""
tags = []
for n, m in inspect.getmembers(mod): # (name, member)
# only keep strings or functions as tags
if getattr(m, '__module__', None) != mod.__name__ or (
n.startswith('_')):
continue
elif isinstance(m, (str, unicode)):
# if string, it means it is constant, then ttl = -1
ttl = -1
refresh = None
elif inspect.isfunction(m):
# if function take function ttl argument or set -1 by default
ttl = getattr(m, 'ttl', -1)
refresh = getattr(m, 'refresh', None)
background = getattr(m, 'background', False)
else:
# whatever it is we don't care...
continue
logger.debug('Introspected %s with ttl %s, refresh %s, background %s for object %s',
n, ttl, refresh, background, parent)
tags.append(Tag(n, m, ttl, refresh, parent, background))
return tags
# decorators for tag inspector
def ttl(value):
def decorator(func):
func.ttl = value
return func
return decorator
def refresh(value):
def decorator(func):
func.refresh = value
return func
return decorator
def background(func):
func.background = True
return func
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def get_tags(tags_dict, tags=None):
"""Helper to get tags.
:param tags_dict: dict containing :class:`Tag` objects
:param tags: list of tags to get (None mean all tags)
"""
# logger.debug('Tags request: %s', unicode(tags))
if tags is None:
tags = tags_dict.iterkeys()
else:
tags = set(tags) & set(tags_dict)
result = dict((
t, # tag name
tags_dict[t].value,
) for t in tags)
# logger.debug('Returning: %s', unicode(result))
return result
def rpc_simple_cb(log_msg):
"""Log a message in case of error of the rpc call.
It is a factory that is used in TagDB
:param str log_msg: format logging message with 3 %s arguments
(opaque, error class, error message)
"""
def cb(self, call_id, response, error):
opaque = self.async_calls.pop(call_id)
if error:
logger.error(log_msg, opaque, error['exception'],
error['message'])
return
logger.debug('Simple cb for %s succeed', opaque)
return cb
class TagDB(object):
"""Tag database. FIXME comment
Handles common operations such as registering tag on the cc-server,
updating its values, etc.
TagDB can have a parent TagDB, in this case, the latter could handle tag
registration on the cc-server.
"""
"""
:param TagDB parent_db: TagDB parent object
:param iterable tags: initial tags
"""
self._parent = parent_db
if tags is None:
tags = tuple()
self.db = defaultdict(
dict,
__main__=dict(), # tags for main object
# others objects
)
#: associate type for each sub object
self._object_types = dict()
for tag in tags:
self.add_tag(tag)
"""Set parent tag database."""
# check if previous parent
if self._parent is not None:
# we must remove tags from old parent
self._parent.remove_tags(self.db['__main__'])
for sub_id in self.db:
if sub_id == '__main__':
continue
self._parent.remove_sub_object(sub_id)
# set new parent
self._parent = parent
if self._parent is not None:
# add tags in new parent
self._parent.add_tags(self.db['__main__'].itervalues())
for sub_id, db in self.db.iteritems():
if sub_id == '__main__':
continue
self._parent.add_sub_object(sub_id, db.itervalues(),
self._object_types[sub_id])
def check_tags_conflict(self, *tag_names):
"""Checks a list of tag names that might conflict before inserting in
TagDB hierarchy
.. warning::
This is in no way a guarantee that following inserts will succeed.
"""
conflicts = []
parent_check = []
for name in tag_names:
if name in self.db['__main__']:
conflicts.append(name)
else:
parent_check.append(name)
if self._parent is not None and parent_check:
conflicts.extend(self._parent.check_tags_conflict(*parent_check))
return conflicts
"""
:param iterable tags: list of tags to add
"""
for tag in tags:
self.add_tag(tag)
def add_sub_tags(self, sub_id, tags):
def remove_tags(self, tag_names):
"""
:param iterable tag_names: list of tag names to remove
"""
for name in tag_names:
self.remove_tag(name)
def check_tag(self, tag):
"""Checks that a given tag object is the same instance in the db
Usefull for checking before removing.
"""
return id(self.db['__main__'].get(tag.name, None)) == id(tag)
raise TagConflict(
'A tag with the name %s is already registered' % tag.name)
# first add the tag to the child db, in case of conflict in the parent,
# the tag is recorded in the child thus if the latter's parent is
# changed it can be recorded again after
self.db['__main__'][tag.name] = tag
# set special attributes on tag instance
if self._parent is not None:
self._parent.add_tag(tag)
def remove_tag(self, tag_name):
tag = self.db['__main__'].pop(tag_name)
if self._parent is not None and self._parent.check_tag(tag):
# remove tag in parent db only if it is the same instance
def check_sub_tag(self, sub_id, tag):
return id(self.db[sub_id].get(tag.name, None)) == id(tag)
raise TagConflict(
'A tag with the name %s is already registered' % tag.name)
if self._parent is not None:
self._parent.add_sub_tag(sub_id, tag)
def remove_sub_tag(self, sub_id, tag_name):
tag = self.db[sub_id].pop(tag_name)
if self._parent is not None and self._parent.check_tag(tag):
self._parent.remove_sub_tag(sub_id, tag_name)
def add_sub_object(self, sub_id, tags, type_):
self._object_types[sub_id] = type_
if self._parent is not None:
# tags will be added after
self._parent.add_sub_object(sub_id, tuple(), type_)
# add sub object tags
for t in tags:
self.add_sub_tag(sub_id, t)
def remove_sub_object(self, sub_id):
del self.db[sub_id]
del self._object_types[sub_id]
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
if self._parent is not None:
self._parent.remove_sub_object(sub_id)
# dict like
def get(self, key, default=None):
return self.db.get(key, default)
def __getitem__(self, key):
return self.db[key]
def keys(self):
return self.db.keys()
def iteritems(self):
return self.db.iteritems()
def itervalues(self):
return self.db.itervalues()
# end dict like
class RootTagDB(TagDB):
"""Root tag database.
It takes care of tag registration with cc-server. It has no parent.
"""
"""
:param main: MainLoop instance
:param tags: initial tags
"""
self.main = main
raise NotImplementedError('Cannot set parent on RootTagDB')
# RPC part
def rpc_call(self, opaque, callback, remote_name, *args, **kwargs):
"""Local helper for all rpc calls.
:param opaque: data to associate with the async call
:param callable callback: callback when call is done, signature is the
same as for :py:meth:`RpcConnection.async_call_cb`
:param remote_name: remote method name
:param \*args: arguments for the call
:param \*\*kwargs: keyword arguments for the call
"""
# call only if connected and authenticated to the cc-server
if self.main.rpc_authenticated:
# logger.debug('RPC call %s %s', remote_name, args)
self.async_calls[self.main.rpc_con.rpc.async_call_cb(
callback, remote_name, *args, **kwargs)] = opaque
def rpc_register(self):
"""Register all objects and tags on the cc-server.
This is used just after a (re)connection to the cc-server is made.
"""
for sub_id in self.db:
if sub_id == '__main__':
continue
self.rpc_register_sub_object(sub_id, self._object_types[sub_id])
# register tags on the cc-server
for tag in self.db['__main__'].itervalues():
if tag.value is not None:
self.rpc_register_tag(tag)
# register sub tags on the cc-server
for tag in chain.from_iterable(
db.itervalues() for name, db in self.db.iteritems()
if name != '__main__'):
if tag.value is not None:
self.rpc_register_sub_tag(tag.sub_id, tag)
def rpc_register_sub_object(self, sub_id, type_):
self.rpc_call(sub_id, self.rpc_object_register_cb,
'register', sub_id, type_)
def rpc_unregister_sub_object(self, sub_id):
self.rpc_call(
sub_id, self.rpc_object_register_cb, 'unregister', sub_id)
#: this is a method
rpc_object_register_cb = rpc_simple_cb(
'Error while trying to register the object %s, %s("%s")')
rpc_object_unregister_cb = rpc_simple_cb(
'Error while trying to unregister the object %s, %s("%s")')
def rpc_register_tag(self, tag):
logger.debug('RPC register tag %s', tag.name)
ttl = None if tag.ttl == -1 else tag.ttl
self.rpc_call(tag.name, self.rpc_tag_register_cb, 'tags_register',
tag.name, ttl, unicode(tag.value))
def rpc_unregister_tag(self, tag_name):
logger.debug('RPC unregister tag %s', tag_name)
self.rpc_call(tag_name, self.rpc_tag_unregister_cb, 'tags_unregister',
tag_name)
def rpc_update_tag(self, sub_id, tag):
"""Update tag value on cc-server."""
logger.debug('RPC update tag %s(%s)', tag.name, sub_id)
if sub_id == '__main__':
self.rpc_call(sub_id + tag.name, self.rpc_update_tag_cb,
'tags_update', tag.name, unicode(tag.value))
else:
self.rpc_call(sub_id + tag.name, self.rpc_update_tag_cb,
'sub_tags_update', sub_id, tag.name, unicode(tag.value))
def rpc_register_sub_tag(self, sub_id, tag):
logger.debug('RPC register tag %s(%s)', tag.name, sub_id)
ttl = None if tag.ttl == -1 else tag.ttl
self.rpc_call(tag.name, self.rpc_sub_tag_register_cb,
'sub_tags_register', sub_id, tag.name, ttl,
unicode(tag.value))
def rpc_unregister_sub_tag(self, sub_id, tag_name):
logger.debug('RPC unregister tag %s(%s)', tag_name, sub_id)
self.rpc_call(tag_name, self.rpc_sub_tag_unregister_cb,
'sub_tags_unregister', sub_id, tag_name)
#: this is a method
rpc_tag_register_cb = rpc_simple_cb(
'Error while trying to register tag %s, %s("%s")')
rpc_tag_unregister_cb = rpc_simple_cb(
'Error while trying to unregister tag %s, %s("%s")')
rpc_sub_tag_register_cb = rpc_simple_cb(
'Error while registering sub tag %s, %s("%s")')
rpc_sub_tag_unregister_cb = rpc_simple_cb(
'Error while unregistering sub tag %s, %s("%s")')
rpc_update_tag_cb = rpc_simple_cb(
'Error while trying to update tag %s, %s("%s")')
# end RPC part
if tag.name in self.db['__main__']:
raise TagConflict(
'A tag with the name %s was already registered' % tag.name)
# set special attributes on tag instance
tag.db = self
tag.sub_id = '__main__'
tag.start(self.main.evloop)
# register tag on the cc-server
if tag.value is not None:
self.rpc_register_tag(tag)
self.db['__main__'][tag.name] = tag
def remove_tag(self, tag_name):
tag = self.db['__main__'].pop(tag_name)
tag.db = None
tag.sub_id = None
tag.stop()
# unregister tag on the cc-server
if tag.value is not None:
self.rpc_unregister_tag(tag_name)
if tag.name in self.db[sub_id]:
raise TagConflict(
'A tag with the name %s (%s) was already registered' %
(tag.name, sub_id))
tag.db = self
tag.sub_id = sub_id
tag.start(self.main.evloop)
# register tag to the cc-server
if tag.value is not None:
self.rpc_register_sub_tag(sub_id, tag)
self.db[sub_id][tag.name] = tag
def remove_sub_tag(self, sub_id, tag_name):
tag = self.db[sub_id].pop(tag_name)
tag.db = None
tag.sub_id = None
tag.stop()
# unregister tag to the cc-server
if tag.value is not None:
self.rpc_unregister_sub_tag(sub_id, tag_name)
def add_sub_object(self, sub_id, tags, type_):
self.rpc_register_sub_object(sub_id, type_)
self._object_types[sub_id] = type_
# add sub object tags
for t in tags:
self.add_sub_tag(sub_id, t)
def remove_sub_object(self, sub_id):
for tag in self.db[sub_id].itervalues():
tag.stop()
tag.db = None
tag.sub_id = None
# we don't need to unregister each sub tag on the cc-server because
# it will be done when we unregister the object
del self.db[sub_id]
del self._object_types[sub_id]