Newer
Older
#!/usr/bin/env python
#coding=utf8
'''
The current living objects database.
'''
from __future__ import absolute_import
from datetime import datetime, timedelta
import logging
from threading import RLock
from ccserver.tql import TqlObject
from ccserver.orderedset import OrderedSet
from ccserver.exceptions import AlreadyRegistered
TTL_SERVER_DELTA = 1 # Delta to apply for all tags
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class ObjectsDB(object):
'''
The object database.
This class abstract the access to the current living objects database. The
database store a list of :class:`TqlObject` instances, each defining an
object with its tags.
Objects are registered by :meth:`register` method, and removed by
:meth:`unregister` method. You can update the tags of registered objects
with :meth:`update` method. A call to :meth:`all` method give you the
complete list of objects with an automatic update before to return it.
.. note:
All calls to the :class:`ObjectDB` are thread-safe.
'''
def __init__(self, server):
# The server to use to complete database:
self._server = server
# The object database:
self._objects = {} # Dict of oid -> TqlObject instances
# The TTL database:
self._ttls = {} # (obj_id, tag) -> time of death
# The access lock of the object database:
self._lock = RLock()
# The list of received message while update:
self._msgs = []
# Total amount of query updates:
self._nb_queries = 0
def update(self, ids=None, tags=None, tags_novalue=None):
'''
Update the database according to the TTL values.
:param ids: list of ids to update (or None for all)
:param tags: list of tags to update (or None for all)
:param tags_noval: list of tags to update (only the name, not the
attached value, None for all)
'''
with self._lock:
self._update(ids, tags, tags_novalue)
self._process_responses()
self._update_user_defined(ids)
def _update(self, ids=None, tags=None, tags_novalue=None):
'''
First step of the database update process.
This methods make essentially two things:
* Calculate the final list of tags to get and to check
* Send the request to each client involved in the update
.. warning:
The call to this method is not threadsafe and must be done only
by :meth:`update` method.
'''
self._msgs = []
# Cast input to the right type:
if tags is not None:
tags = set(tags)
if tags_novalue is not None:
tags_novalue = set(tags_novalue)
if tags is not None:
tags_novalue -= tags
if ids is not None:
# The main-loop, we will pop each object id from the set and
# process it:
while ids:
oid = ids.pop()
obj = self._objects.get(oid)
assert obj is not None, 'object not found'
if '__parent' in obj:
# Current object is proxified by a parent:
parent = obj['__parent']
self._update_object(parent, tags, tags_novalue)
# Parent is removed from the list if it present (because
# it has been updated):
if parent['id'] in ids:
ids.remove(parent['id'])
# Search for sibling objects:
sg = set((o for o in ids if o.startswith('%s.' % parent['id'])))
for sibling in sg:
sibling = self.get_by_id(sibling)
self._update_object(sibling, tags, tags_novalue)
else:
# Current object is a standard node:
self._update_object(obj, tags, tags_novalue)
def _update_user_defined(self, ids):
'''
Update the user defined tags (previously fetched
by :meth:`_update_object`).
'''
if ids is not None:
ids = set(ids)
else:
ids = set(self._objects)
for objid in ids:
obj = self.get_by_id(objid)
if '__parent' in obj:
user_tags = obj['__parent']['__user_defined_tags']
else:
user_tags = obj['__user_defined_tags']
obj.update(user_tags)
def _process_responses(self):
'''
Process the responses received from clients.
'''
msgs = self._server.manager.wait(frozenset(self._msgs), timeout=4)
now = datetime.now()
for msg in msgs:
# Update tags if received from the peer:
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
if msg.get('error') is not None:
logging.error('Error from client while getting tags')
else:
returned = msg['return']
obj = msg['data']
assert isinstance(returned, dict), 'returned tags is not a dict'
assert isinstance(obj, TqlObject), 'data obj is not TqlObject'
for name, attrs in returned.iteritems():
obj[name] = attrs.get('value')
# Update the ttl
ttl = attrs.get('ttl', DEFAULT_TTL)
if ttl == -1:
tod = datetime.max
else:
ttl += TTL_SERVER_DELTA
dt = timedelta(seconds=ttl)
tod = now + dt
self._ttls[(obj['id'], name)] = tod
def _update_object(self, obj, tags, tags_novalue):
'''
Calculate the list of tags to request for a client, and send the
request to the client.
'''
parent = obj.get('__parent')
if parent is None:
user_tags = self._server.conf.show(obj['id'])['tags']
obj['__user_defined_tags'] = user_tags
client_id = obj['id']
else:
user_tags = parent['__user_defined_tags']
client_id = parent['id']
if tags is not None:
tags = set(tags)
tags -= set(user_tags)
if tags_novalue is not None:
tags_novalue = set(tags_novalue)
tags_novalue -= set(user_tags)
try:
client = self._server.get_connection(client_id)
except KeyError:
# The client is not connected, we need to keep only unttlised tags:
for name in obj.keys():
if (obj['id'], name) in self._ttls:
del obj[name]
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
else:
# The client is connected:
if parent is None:
obj.update(client.get_tags())
# First, check if each tag is not already cached:
now = datetime.now()
if tags is not None:
for tag in tags.copy():
if now < self._ttls.get((obj['id'], tag), now):
tags.remove(tag)
# Cast as jsonizable type:
tags = tuple(tags)
if tags_novalue is not None:
for tag in tags_novalue.copy():
if now < self._ttls.get((obj['id'], tag), now):
tags_novalue.remove(tag)
# Cast as jsonizable type:
tags_novalue = tuple(tags_novalue)
# Ask tags to the node:
if (tags is None or tags) or (tags_novalue is None or tags_novalue):
self._request_tags(obj, client, tags, tags_novalue)
def _request_tags(self, obj, client, tags, tags_novalue):
'''
Send the request to the client.
'''
if '__parent' in obj:
subid = obj['id'].partition('.')[2]
msg_id = client.connection.async_call('sub_tags', subid, tags,
tags_novalue, _data=obj)
else:
msg_id = client.connection.async_call('get_tags', tags,
tags_novalue, _data=obj)
self._msgs.append(msg_id)
def get_by_id(self, oid):
'''
Get an registered object by its id.
:param oid: the complete oid of the object to get
:return: a :class:`TqlObject` instance
'''
return self._objects[oid]
Antoine Millet
committed
def register(self, obj, cleanable=()):
'''
Populate the database with the specified object. All the tags specified
Antoine Millet
committed
in the passed object are not declared in cache and will never be
cleaned.
:param obj: the object to register
:param clenable: the optionnal list of tags to declare as cleanable when
the cache is cleaned
'''
with self._lock:
if 'id' not in obj:
raise ValueError('id tag must exists on the object')
elif obj['id'] in self._objects:
raise AlreadyRegistered('The object is already registered')
else:
self._objects[obj['id']] = obj
Antoine Millet
committed
for tag in cleanable:
self._ttls[(obj['id'], tag)] = datetime.max
def unregister(self, obj_id):
'''
Remove an object from the manager by its id.
'''
with self._lock:
if obj_id in self._objects:
self.unregister_children(obj_id)
del self._objects[obj_id]
self.clean_ttls(obj_id)
else:
raise ValueError('The object is not registered')
def unregister_children(self, obj_id):
'''
Remove all children of an object.
'''
with self._lock:
if obj_id in self._objects:
parent = self._objects[obj_id]
for key, obj in self._objects.items():
if obj.get('__parent') is parent:
del self._objects[key]
self.clean_ttls(obj['id'])
else:
raise ValueError('The object %r is not registered' % obj_id)
def clean_ttls(self, obj_id):
'''
Remove TTLs for all tags for the specified object.
'''
with self._lock:
for oid, tag in self._ttls.keys():
if oid == obj_id:
del self._ttls[(oid, tag)]
def get_ids(self):
'''
Get the set of all the first level ids.
'''
ids = set()
for oid in self._objects:
parid = oid.partition('.')[0]
ids.add(parid)
return ids
def all(self, tags, to_check):
'''
Get all the objects registered on the server with specified tags.
'''
self.update(tags=tags, tags_novalue=to_check)
for obj in self._objects.values():
yield obj
def some(self, ids, tags=set(), to_check=set()):
'''
Get specified objects registed on the server with specified tags.
'''
self.update(ids, tags=tags, tags_novalue=to_check)
for obj_id in ids:
yield self._objects[obj_id]
def stats(self):
'''
Get stats about the current state of the object database.
'''
stats = {}
stats['count'] = len(self._objects)
cached = dead = 0
now = datetime.now()
for date in self._ttls.values():
if date > now:
dead += 1
else:
cached += 1
stats['cached'] = cached
stats['dead'] = dead
stats['total'] = cached + dead
stats['queries'] = self._nb_queries
return stats