Loading cloudcontrol/common/client/tags.py +8 −59 Original line number Diff line number Diff line import Queue import inspect import logging import weakref import threading from functools import partial, wraps from functools import partial from itertools import chain from collections import defaultdict from cloudcontrol.common.client.utils import main_thread from cloudcontrol.common.client.exc import TagConflict Loading Loading @@ -452,36 +452,6 @@ class TagDB(object): # end dict like def thread_check(func): """Decorator for method that need to be executed in pyev thread.""" @wraps(func) def decorated(self, *args, **kwargs): if threading.current_thread() is self.pyev_thread: # if called from main thread, don't do anything special return func(self, *args, **kwargs) else: exc = None return_value = None event = threading.Event() def cb(): try: return_value = func(self, *args, **kwargs) except Exception as e: exc = e finally: event.set() self.queue.put(cb) self.async.send() # wait for tag to be processed and raise error event.wait() if exc is not None: raise exc return return_value return decorated class RootTagDB(TagDB): """Root tag database. Loading @@ -498,32 +468,11 @@ class RootTagDB(TagDB): #: dict for RPC async call storage, keep a part of log message self.async_calls = dict() # we keep track of pyev thread because all watchers must be started from # the same thread (ie Tag watchers) # of course if the RootTagDB is not initialized from the pyev thread, this # would results in possible errors self.pyev_thread = threading.current_thread() self.queue = Queue.Queue() self.async = self.main.evloop.async(self.async_cb) self.async.start() TagDB.__init__(self, tags=tags, sub_tags=sub_tags) def set_parent(self, parent): raise NotImplemented('Cannot set parent on RootTagDB') def async_cb(self, watcher, revents): logger.debug('Async RootTagDB') while True: try: work = self.queue.get_nowait() except Queue.Empty: break try: work() except Exception: logger.exception('Error async work on RootTagDB') # RPC part def rpc_call(self, opaque, callback, remote_name, *args, **kwargs): """Local helper for all rpc calls. Loading Loading @@ -624,7 +573,7 @@ class RootTagDB(TagDB): 'Error while trying to update tag %s, %s("%s")') # end RPC part @thread_check @main_thread def add_tag(self, tag): if tag.name in self.db['__main__']: raise TagConflict( Loading @@ -639,7 +588,7 @@ class RootTagDB(TagDB): self.rpc_register_tag(tag) self.db['__main__'][tag.name] = tag @thread_check @main_thread def remove_tag(self, tag_name): tag = self.db['__main__'].pop(tag_name) tag.db = None Loading @@ -649,7 +598,7 @@ class RootTagDB(TagDB): if tag.value is not None: self.rpc_unregister_tag(tag_name) @thread_check @main_thread def add_sub_tag(self, sub_id, tag): if tag.name in self.db[sub_id]: raise TagConflict( Loading @@ -664,7 +613,7 @@ class RootTagDB(TagDB): self.rpc_register_sub_tag(sub_id, tag) self.db[sub_id][tag.name] = tag @thread_check @main_thread def remove_sub_tag(self, sub_id, tag_name): tag = self.db[sub_id].pop(tag_name) tag.db = None Loading @@ -674,7 +623,7 @@ class RootTagDB(TagDB): if tag.value is not None: self.rpc_unregister_sub_tag(sub_id, tag_name) @thread_check @main_thread def add_sub_object(self, sub_id, tags, type_): self.rpc_register_sub_object(sub_id, type_) self._object_types[sub_id] = type_ Loading @@ -682,7 +631,7 @@ class RootTagDB(TagDB): for t in tags: self.add_sub_tag(sub_id, t) @thread_check @main_thread def remove_sub_object(self, sub_id): for tag in self.db[sub_id].itervalues(): tag.stop() Loading Loading
cloudcontrol/common/client/tags.py +8 −59 Original line number Diff line number Diff line import Queue import inspect import logging import weakref import threading from functools import partial, wraps from functools import partial from itertools import chain from collections import defaultdict from cloudcontrol.common.client.utils import main_thread from cloudcontrol.common.client.exc import TagConflict Loading Loading @@ -452,36 +452,6 @@ class TagDB(object): # end dict like def thread_check(func): """Decorator for method that need to be executed in pyev thread.""" @wraps(func) def decorated(self, *args, **kwargs): if threading.current_thread() is self.pyev_thread: # if called from main thread, don't do anything special return func(self, *args, **kwargs) else: exc = None return_value = None event = threading.Event() def cb(): try: return_value = func(self, *args, **kwargs) except Exception as e: exc = e finally: event.set() self.queue.put(cb) self.async.send() # wait for tag to be processed and raise error event.wait() if exc is not None: raise exc return return_value return decorated class RootTagDB(TagDB): """Root tag database. Loading @@ -498,32 +468,11 @@ class RootTagDB(TagDB): #: dict for RPC async call storage, keep a part of log message self.async_calls = dict() # we keep track of pyev thread because all watchers must be started from # the same thread (ie Tag watchers) # of course if the RootTagDB is not initialized from the pyev thread, this # would results in possible errors self.pyev_thread = threading.current_thread() self.queue = Queue.Queue() self.async = self.main.evloop.async(self.async_cb) self.async.start() TagDB.__init__(self, tags=tags, sub_tags=sub_tags) def set_parent(self, parent): raise NotImplemented('Cannot set parent on RootTagDB') def async_cb(self, watcher, revents): logger.debug('Async RootTagDB') while True: try: work = self.queue.get_nowait() except Queue.Empty: break try: work() except Exception: logger.exception('Error async work on RootTagDB') # RPC part def rpc_call(self, opaque, callback, remote_name, *args, **kwargs): """Local helper for all rpc calls. Loading Loading @@ -624,7 +573,7 @@ class RootTagDB(TagDB): 'Error while trying to update tag %s, %s("%s")') # end RPC part @thread_check @main_thread def add_tag(self, tag): if tag.name in self.db['__main__']: raise TagConflict( Loading @@ -639,7 +588,7 @@ class RootTagDB(TagDB): self.rpc_register_tag(tag) self.db['__main__'][tag.name] = tag @thread_check @main_thread def remove_tag(self, tag_name): tag = self.db['__main__'].pop(tag_name) tag.db = None Loading @@ -649,7 +598,7 @@ class RootTagDB(TagDB): if tag.value is not None: self.rpc_unregister_tag(tag_name) @thread_check @main_thread def add_sub_tag(self, sub_id, tag): if tag.name in self.db[sub_id]: raise TagConflict( Loading @@ -664,7 +613,7 @@ class RootTagDB(TagDB): self.rpc_register_sub_tag(sub_id, tag) self.db[sub_id][tag.name] = tag @thread_check @main_thread def remove_sub_tag(self, sub_id, tag_name): tag = self.db[sub_id].pop(tag_name) tag.db = None Loading @@ -674,7 +623,7 @@ class RootTagDB(TagDB): if tag.value is not None: self.rpc_unregister_sub_tag(sub_id, tag_name) @thread_check @main_thread def add_sub_object(self, sub_id, tags, type_): self.rpc_register_sub_object(sub_id, type_) self._object_types[sub_id] = type_ Loading @@ -682,7 +631,7 @@ class RootTagDB(TagDB): for t in tags: self.add_sub_tag(sub_id, t) @thread_check @main_thread def remove_sub_object(self, sub_id): for tag in self.db[sub_id].itervalues(): tag.stop() Loading