Newer
Older
#!/usr/bin/env python
#coding=utf8
'''
The current living objects database.
'''
from __future__ import absolute_import
from datetime import datetime, timedelta
import logging
from threading import RLock
from ccserver.tql import TqlObject
from ccserver.orderedset import OrderedSet
from ccserver.exceptions import AlreadyRegistered, UnknownObjectError
TTL_SERVER_DELTA = 1 # Delta to apply for all tags
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class ObjectsDB(object):
'''
The object database.
This class abstract the access to the current living objects database. The
database store a list of :class:`TqlObject` instances, each defining an
object with its tags.
Objects are registered by :meth:`register` method, and removed by
:meth:`unregister` method. You can update the tags of registered objects
with :meth:`update` method. A call to :meth:`all` method give you the
complete list of objects with an automatic update before to return it.
.. note:
All calls to the :class:`ObjectDB` are thread-safe.
'''
def __init__(self, server):
# The server to use to complete database:
self._server = server
# The object database:
self._objects = {} # Dict of oid -> TqlObject instances
# The TTL database:
self._ttls = {} # (obj_id, tag) -> time of death
# The access lock of the object database:
self._lock = RLock()
# The list of received message while update:
self._msgs = []
self._requested = {} # object id -> set of tags
# Total amount of query updates:
self._nb_queries = 0
def update(self, ids=None, tags=None, tags_novalue=None):
'''
Update the database according to the TTL values.
:param ids: list of ids to update (or None for all)
:param tags: list of tags to update (or None for all)
:param tags_noval: list of tags to update (only the name, not the
attached value, None for all)
'''
with self._lock:
self._update(ids, tags, tags_novalue)
self._update_user_defined(ids)
def _update(self, ids=None, tags=None, tags_novalue=None):
'''
First step of the database update process.
This methods make essentially two things:
* Calculate the final list of tags to get and to check
* Send the request to each client involved in the update
.. warning:
The call to this method is not threadsafe and must be done only
by :meth:`update` method.
'''
self._msgs = []
# Cast input to the right type:
if tags is not None:
tags = set(tags)
if tags_novalue is not None:
tags_novalue = set(tags_novalue)
if tags is not None:
tags_novalue -= tags
if ids is not None:
# The main-loop, we will pop each object id from the set and
# process it:
while ids:
oid = ids.pop()
obj = self._objects.get(oid)
if obj is None:
raise UnknownObjectError('%r not found in database' % oid)
if '__parent' in obj:
# Current object is proxified by a parent:
parent = obj['__parent']
# Parent is removed from the list if it present (because
# it has been updated):
if parent['id'] in ids:
self._update_object(parent, tags, tags_novalue)
else:
self._update_object(parent, (), (), only_udt=True)
# Search for sibling objects:
sg = set((o for o in ids if o.startswith('%s.' % parent['id'])))
for sibling in sg:
sibling = self.get_by_id(sibling)
self._update_object(sibling, tags, tags_novalue)
else:
# Current object is a standard node:
self._update_object(obj, tags, tags_novalue)
def _update_user_defined(self, ids):
'''
Update the user defined tags (previously fetched
by :meth:`_update_object`).
'''
if ids is not None:
ids = set(ids)
else:
ids = set(self._objects)
for objid in ids:
obj = self.get_by_id(objid)
if '__parent' in obj:
user_tags = obj['__parent']['__user_defined_tags']
else:
user_tags = obj['__user_defined_tags']
obj.update(user_tags)
# Delete ttls if present:
for tag in user_tags:
if (obj['id'], tag) in self._ttls:
del self._ttls[(obj['id'], tag)]
'''
Process the responses received from clients.
'''
msgs = self._server.manager.wait(frozenset(self._msgs), timeout=4)
# Avoid error if all tags are asked (not optimized case because we
# can't known the real list of tags:
if tags is None:
tags = ()
received = set()
obj = msg['data']
assert isinstance(obj, TqlObject), 'data obj is not TqlObject'
# Update tags if received from the peer:
if msg.get('error') is not None:
logging.warning('Error from client while getting tags: %s',
msg.get('error'))
for name in tags:
if name in obj:
old = obj[name]
obj[name] = '#ERR#OLD:%s' % old
else:
obj[name] = '#ERR#'
self._ttls[(obj['id'], name)] = now + timedelta(0, 10)
else:
returned = msg['return']
assert isinstance(returned, dict), 'returned tags is not a dict'
for name, attrs in returned.iteritems():
obj[name] = attrs.get('value')
# Update the ttl
ttl = attrs.get('ttl', DEFAULT_TTL)
if ttl == -1:
tod = datetime.max
else:
ttl += TTL_SERVER_DELTA
dt = timedelta(seconds=ttl)
tod = now + dt
self._ttls[(obj['id'], name)] = tod
for oid in set(self._requested) - received:
obj = self.get_by_id(oid)
o_tags = self._requested[oid]
if o_tags is not None:
for name in o_tags:
obj[name] = '#ERR#'
self._ttls[(obj['id'], name)] = now + timedelta(0, 10)
def _update_object(self, obj, tags, tags_novalue, only_udt=False):
'''
Calculate the list of tags to request for a client, and send the
request to the client.
'''
parent = obj.get('__parent')
if parent is None:
user_tags = self._server.conf.show(obj['id'])['tags']
obj['__user_defined_tags'] = user_tags
client_id = obj['id']
else:
user_tags = parent['__user_defined_tags']
client_id = parent['id']
tags = set(tags)
tags -= set(user_tags)
if tags_novalue is not None:
tags_novalue = set(tags_novalue)
tags_novalue -= set(user_tags)
try:
client = self._server.get_connection(client_id)
except KeyError:
# The client is not connected, we need to keep only unttlised tags:
for name in obj.keys():
if (obj['id'], name) in self._ttls:
del obj[name]
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
else:
# The client is connected:
if parent is None:
obj.update(client.get_tags())
# First, check if each tag is not already cached:
now = datetime.now()
if tags is not None:
for tag in tags.copy():
if now < self._ttls.get((obj['id'], tag), now):
tags.remove(tag)
# Cast as jsonizable type:
tags = tuple(tags)
if tags_novalue is not None:
for tag in tags_novalue.copy():
if now < self._ttls.get((obj['id'], tag), now):
tags_novalue.remove(tag)
# Cast as jsonizable type:
tags_novalue = tuple(tags_novalue)
# Ask tags to the node:
if (tags is None or tags) or (tags_novalue is None or tags_novalue):
self._request_tags(obj, client, tags, tags_novalue)
def _request_tags(self, obj, client, tags, tags_novalue):
'''
Send the request to the client.
'''
if '__parent' in obj:
subid = obj['id'].partition('.')[2]
msg_id = client.connection.async_call('sub_tags', subid, tags,
tags_novalue, _data=obj)
else:
msg_id = client.connection.async_call('get_tags', tags,
tags_novalue, _data=obj)
self._msgs.append(msg_id)
def get_by_id(self, oid):
'''
Get an registered object by its id.
:param oid: the complete oid of the object to get
:return: a :class:`TqlObject` instance
'''
return self._objects[oid]
Antoine Millet
committed
def register(self, obj, cleanable=()):
'''
Populate the database with the specified object. All the tags specified
Antoine Millet
committed
in the passed object are not declared in cache and will never be
cleaned.
:param obj: the object to register
:param clenable: the optionnal list of tags to declare as cleanable when
the cache is cleaned
'''
with self._lock:
if 'id' not in obj:
raise ValueError('id tag must exists on the object')
elif obj['id'] in self._objects:
raise AlreadyRegistered('The object is already registered')
else:
self._objects[obj['id']] = obj
Antoine Millet
committed
for tag in cleanable:
self._ttls[(obj['id'], tag)] = datetime.max
def unregister(self, obj_id):
'''
Remove an object from the manager by its id.
'''
with self._lock:
if obj_id in self._objects:
self.unregister_children(obj_id)
self.clean_ttls(obj_id)
raise ValueError('The object %r is not registered' % obj_id)
def unregister_children(self, obj_id):
'''
Remove all children of an object.
'''
with self._lock:
if obj_id in self._objects:
parent = self._objects[obj_id]
for key, obj in self._objects.items():
if obj.get('__parent') is parent:
self.clean_ttls(obj['id'])
del self._objects[key]
else:
raise ValueError('The object %r is not registered' % obj_id)
def clean_ttls(self, obj_id):
'''
Remove TTLs for all tags for the specified object.
'''
with self._lock:
obj = self.get_by_id(obj_id)
for oid, tag in self._ttls.keys():
if oid == obj_id:
del self._ttls[(oid, tag)]
try:
del obj[tag]
except:
pass
def get_ids(self):
'''
Get the set of all the first level ids.
'''
ids = set()
for oid in self._objects:
parid = oid.partition('.')[0]
ids.add(parid)
return ids
def all(self, tags, to_check):
'''
Get all the objects registered on the server with specified tags.
'''
self.update(tags=tags, tags_novalue=to_check)
for obj in self._objects.values():
yield obj
def some(self, ids, tags=set(), to_check=set()):
'''
Get specified objects registed on the server with specified tags.
'''
self.update(ids, tags=tags, tags_novalue=to_check)
for obj_id in ids:
yield self._objects[obj_id]
def stats(self):
'''
Get stats about the current state of the object database.
'''
stats = {}
stats['count'] = len(self._objects)
cached = dead = 0
now = datetime.now()
for date in self._ttls.values():
if date > now:
dead += 1
else:
cached += 1
stats['cached'] = cached
stats['dead'] = dead
stats['total'] = cached + dead
stats['queries'] = self._nb_queries
return stats