Loading cloudcontrol/common/client/tags.py +44 −1 Original line number Diff line number Diff line import Queue import inspect import logging import weakref from functools import partial import threading from functools import partial, wraps from itertools import chain from collections import defaultdict Loading Loading @@ -350,6 +352,20 @@ class TagDB(object): # end dict like def thread_check(func): """Decorator for method that need to be executed in pyev thread.""" @wraps(func) def decorated(self, *args, **kwargs): if threading.current_thread() is self.pyev_thread: return func(self, *args, **kwargs) else: self.queue.put(partial(func, self, *args, **kwargs)) self.async.send() # return None return decorated class RootTagDB(TagDB): """Root tag database. Loading @@ -366,8 +382,29 @@ class RootTagDB(TagDB): #: dict for async call storage, keep a part of log message self.async_calls = dict() # we keep track of pyev thread because all watchers must be started from # the same thread (ie Tag watchers) # of course if the TagDB is not initialized from the pyev thread, this # would results in possible errors self.pyev_thread = threading.current_thread() self.queue = Queue.Queue() self.async = self.main.evloop.async(self.async_cb) self.async.start() TagDB.__init__(self, tags=tags, sub_tags=sub_tags) def async_cb(self, watcher, revents): logger.debug('Async RootTagDB') while True: try: work = self.queue.get_nowait() except Queue.Empty: break try: work() except Exception: logger.exception('Error async work on RootTagDB') # RPC part def rpc_call(self, opaque, callback, remote_name, *args, **kwargs): """Local helper for all rpc calls. Loading Loading @@ -468,6 +505,7 @@ class RootTagDB(TagDB): 'Error while trying to update tag %s, %s("%s")') # end RPC part @thread_check def add_tag(self, tag): # set special attributes on tag instance tag.db = self Loading @@ -478,6 +516,7 @@ class RootTagDB(TagDB): self.rpc_register_tag(tag) self.db['__main__'][tag.name] = tag @thread_check def remove_tag(self, tag_name): tag = self.db['__main__'].pop(tag_name) tag.db = None Loading @@ -487,6 +526,7 @@ class RootTagDB(TagDB): if tag.value is not None: self.rpc_unregister_tag(tag_name) @thread_check def add_sub_tag(self, sub_id, tag): tag.db = self tag.sub_id = sub_id Loading @@ -496,6 +536,7 @@ class RootTagDB(TagDB): self.rpc_register_sub_tag(sub_id, tag) self.db[sub_id][tag.name] = tag @thread_check def remove_sub_tag(self, sub_id, tag_name): tag = self.db[sub_id].pop(tag_name) tag.db = None Loading @@ -505,6 +546,7 @@ class RootTagDB(TagDB): if tag.value is not None: self.rpc_unregister_sub_tag(sub_id, tag_name) @thread_check def add_sub_object(self, sub_id, tags, type_): self.rpc_register_sub_object(sub_id, type_) self._object_types[sub_id] = type_ Loading @@ -512,6 +554,7 @@ class RootTagDB(TagDB): for t in tags: self.add_sub_tag(sub_id, t) @thread_check def remove_sub_object(self, sub_id): for tag in self.db[sub_id].itervalues(): tag.stop() Loading Loading
cloudcontrol/common/client/tags.py +44 −1 Original line number Diff line number Diff line import Queue import inspect import logging import weakref from functools import partial import threading from functools import partial, wraps from itertools import chain from collections import defaultdict Loading Loading @@ -350,6 +352,20 @@ class TagDB(object): # end dict like def thread_check(func): """Decorator for method that need to be executed in pyev thread.""" @wraps(func) def decorated(self, *args, **kwargs): if threading.current_thread() is self.pyev_thread: return func(self, *args, **kwargs) else: self.queue.put(partial(func, self, *args, **kwargs)) self.async.send() # return None return decorated class RootTagDB(TagDB): """Root tag database. Loading @@ -366,8 +382,29 @@ class RootTagDB(TagDB): #: dict for async call storage, keep a part of log message self.async_calls = dict() # we keep track of pyev thread because all watchers must be started from # the same thread (ie Tag watchers) # of course if the TagDB is not initialized from the pyev thread, this # would results in possible errors self.pyev_thread = threading.current_thread() self.queue = Queue.Queue() self.async = self.main.evloop.async(self.async_cb) self.async.start() TagDB.__init__(self, tags=tags, sub_tags=sub_tags) def async_cb(self, watcher, revents): logger.debug('Async RootTagDB') while True: try: work = self.queue.get_nowait() except Queue.Empty: break try: work() except Exception: logger.exception('Error async work on RootTagDB') # RPC part def rpc_call(self, opaque, callback, remote_name, *args, **kwargs): """Local helper for all rpc calls. Loading Loading @@ -468,6 +505,7 @@ class RootTagDB(TagDB): 'Error while trying to update tag %s, %s("%s")') # end RPC part @thread_check def add_tag(self, tag): # set special attributes on tag instance tag.db = self Loading @@ -478,6 +516,7 @@ class RootTagDB(TagDB): self.rpc_register_tag(tag) self.db['__main__'][tag.name] = tag @thread_check def remove_tag(self, tag_name): tag = self.db['__main__'].pop(tag_name) tag.db = None Loading @@ -487,6 +526,7 @@ class RootTagDB(TagDB): if tag.value is not None: self.rpc_unregister_tag(tag_name) @thread_check def add_sub_tag(self, sub_id, tag): tag.db = self tag.sub_id = sub_id Loading @@ -496,6 +536,7 @@ class RootTagDB(TagDB): self.rpc_register_sub_tag(sub_id, tag) self.db[sub_id][tag.name] = tag @thread_check def remove_sub_tag(self, sub_id, tag_name): tag = self.db[sub_id].pop(tag_name) tag.db = None Loading @@ -505,6 +546,7 @@ class RootTagDB(TagDB): if tag.value is not None: self.rpc_unregister_sub_tag(sub_id, tag_name) @thread_check def add_sub_object(self, sub_id, tags, type_): self.rpc_register_sub_object(sub_id, type_) self._object_types[sub_id] = type_ Loading @@ -512,6 +554,7 @@ class RootTagDB(TagDB): for t in tags: self.add_sub_tag(sub_id, t) @thread_check def remove_sub_object(self, sub_id): for tag in self.db[sub_id].itervalues(): tag.stop() Loading