Newer
Older
#!/usr/bin/env python
#coding=utf8
'''
The current living objects database.
'''
from datetime import datetime, timedelta
import logging
from threading import RLock
from tql import TqlObject
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from exceptions import AlreadyRegistered
DEFAULT_TTL = 0
TTL_SERVER_DELTA = 2 # Delta to apply for all tags
class ObjectsDB(object):
'''
The object database.
This class abstract the access to the current living objects database. The
database store a list of :class:`TqlObject` instances, each defining an
object with its tags.
Objects are registered by :meth:`register` method, and removed by
:meth:`unregister` method. You can update the tags of registered objects
with :meth:`update` method. A call to :meth:`all` method give you the
complete list of objects with an automatic update before to return it.
.. note:
All calls to the :class:`ObjectDB` are thread-safe.
'''
def __init__(self, server):
# The server to use to complete database:
self._server = server
# The object database:
self._objects = {} # Dict of oid -> TqlObject instances
# The TTL database:
self._ttls = {} # (obj_id, tag) -> time of death
# The access lock of the object database:
self._lock = RLock()
# The list of received message while update:
self._msgs = []
# Total amount of query updates:
self._nb_queries = 0
def update(self, ids=None, tags=None, tags_novalue=None):
'''
Update the database according to the TTL values.
:param ids: list of ids to update (or None for all)
:param tags: list of tags to update (or None for all)
:param tags_noval: list of tags to update (only the name, not the
attached value, None for all)
'''
with self._lock:
self._update(ids, tags, tags_novalue)
self._process_responses()
def _update(self, ids=None, tags=None, tags_novalue=None):
'''
First step of the database update process.
This methods make essentially two things:
* Calculate the final list of tags to get and to check
* Send the request to each client involved in the update
.. warning:
The call to this method is not threadsafe and must be done only
by :meth:`update` method.
'''
self._msgs = []
# Cast input to the right type:
if tags is not None:
tags = set(tags)
if tags_novalue is not None:
tags_novalue = set(tags_novalue)
if tags is not None:
tags_novalue -= tags
if ids is not None:
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# The main-loop, we will pop each object id from the set and
# process it:
while ids:
oid = ids.pop()
obj = self._objects.get(oid)
assert obj is not None, 'object not found'
if '__parent' in obj:
# Current object is proxified by a parent:
parent = obj['__parent']
self._update_object(parent, tags, tags_novalue)
# Parent is removed from the list if it present (because
# it has been updated):
if parent['id'] in ids:
ids.remove(parent['id'])
# Search for sibling objects:
sg = set((o for o in ids if o.startswith('%s.' % parent['id'])))
ids -= sg
for sibling in sg:
sibling = self.get_by_id(sibling)
self._update_object(sibling, tags, tags_novalue)
else:
# Current object is a standard node:
self._update_object(obj, tags, tags_novalue)
def _process_responses(self):
'''
Process the responses received from clients.
'''
msgs = self._server.manager.wait(frozenset(self._msgs), timeout=10)
now = datetime.now()
for msg in msgs:
if msg.get('error') is not None:
logging.error('Error from client while getting tags')
else:
returned = msg['return']
obj = msg['data']
assert isinstance(returned, dict), 'returned tags is not a dict'
assert isinstance(obj, TqlObject), 'data obj is not TqlObject'
if '__parent' in obj:
user_tags = obj['__parent']['__user_defined_tags']
else:
user_tags = obj['__user_defined_tags']
for name, attrs in returned.iteritems():
obj[name] = attrs.get('value')
# Update the ttl
ttl = attrs.get('ttl', DEFAULT_TTL)
if ttl == -1:
tod = datetime.max
else:
ttl += TTL_SERVER_DELTA
dt = timedelta(seconds=ttl)
tod = now + dt
self._ttls[(obj['id'], name)] = tod
obj.update(user_tags)
def _update_object(self, obj, tags, tags_novalue):
'''
Calculate the list of tags to request for a client, and send the
request to the client.
'''
parent = obj.get('__parent')
if parent is None:
user_tags = self._server.conf.show(obj['id'])['tags']
obj['__user_defined_tags'] = user_tags
client_id = obj['id']
else:
user_tags = parent['__user_defined_tags']
client_id = parent['id']
if tags is not None:
tags = set(tags)
tags -= set(user_tags)
if tags_novalue is not None:
tags_novalue = set(tags_novalue)
tags_novalue -= set(user_tags)
try:
client = self._server.get_connection(client_id)
except KeyError:
# The client is not connected, we need to keep only unttlised tags:
for name in obj.keys():
if (obj['id'], name) in self._ttls:
del obj[name]
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
else:
# The client is connected:
if parent is None:
obj.update(client.get_tags())
# First, check if each tag is not already cached:
now = datetime.now()
if tags is not None:
for tag in tags.copy():
if now < self._ttls.get((obj['id'], tag), now):
tags.remove(tag)
# Cast as jsonizable type:
tags = tuple(tags)
if tags_novalue is not None:
for tag in tags_novalue.copy():
if now < self._ttls.get((obj['id'], tag), now):
tags_novalue.remove(tag)
# Cast as jsonizable type:
tags_novalue = tuple(tags_novalue)
# Ask tags to the node:
if (tags is None or tags) or (tags_novalue is None or tags_novalue):
self._request_tags(obj, client, tags, tags_novalue)
def _request_tags(self, obj, client, tags, tags_novalue):
'''
Send the request to the client.
'''
if '__parent' in obj:
subid = obj['id'].partition('.')[2]
msg_id = client.connection.async_call('sub_tags', subid, tags,
tags_novalue, _data=obj)
else:
msg_id = client.connection.async_call('get_tags', tags,
tags_novalue, _data=obj)
self._msgs.append(msg_id)
def get_by_id(self, oid):
'''
Get an registered object by its id.
:param oid: the complete oid of the object to get
:return: a :class:`TqlObject` instance
'''
return self._objects[oid]
def register(self, obj):
'''
Populate the database with the specified object. All the tags specified
in the passed object will be declared as infite TTL.
'''
with self._lock:
if 'id' not in obj:
raise ValueError('id tag must exists on the object')
elif obj['id'] in self._objects:
raise AlreadyRegistered('The object is already registered')
else:
self._objects[obj['id']] = obj
def unregister(self, obj_id):
'''
Remove an object from the manager by its id.
'''
with self._lock:
if obj_id in self._objects:
self.unregister_children(obj_id)
del self._objects[obj_id]
self.clean_ttls(obj_id)
else:
raise ValueError('The object is not registered')
def unregister_children(self, obj_id):
'''
Remove all children of an object.
'''
with self._lock:
if obj_id in self._objects:
parent = self._objects[obj_id]
for key, obj in self._objects.items():
if obj.get('__parent') is parent:
del self._objects[key]
self.clean_ttls(obj['id'])
else:
raise ValueError('The object %r is not registered' % obj_id)
def clean_ttls(self, obj_id):
'''
Remove TTLs for all tags for the specified object.
'''
with self._lock:
for oid, tag in self._ttls.keys():
if oid == obj_id:
del self._ttls[(oid, tag)]
def get_ids(self):
'''
Get the set of all the first level ids.
'''
ids = set()
for oid in self._objects:
parid = oid.partition('.')[0]
ids.add(parid)
return ids
def all(self, tags, to_check):
'''
Get all the objects registered on the server with specified tags.
'''
self.update(tags=tags, tags_novalue=to_check)
for obj in self._objects.values():
yield obj
def some(self, ids, tags=set(), to_check=set()):
'''
Get specified objects registed on the server with specified tags.
'''
self.update(ids, tags=tags, tags_novalue=to_check)
for obj_id in ids:
yield self._objects[obj_id]
def stats(self):
'''
Get stats about the current state of the object database.
'''
stats = {}
stats['count'] = len(self._objects)
cached = dead = 0
now = datetime.now()
for date in self._ttls.values():
if date > now:
dead += 1
else:
cached += 1
stats['cached'] = cached
stats['dead'] = dead
stats['total'] = cached + dead
stats['queries'] = self._nb_queries
return stats