Loading cloudcontrol/common/client/tags.py +40 −6 Original line number Diff line number Diff line Loading @@ -15,7 +15,8 @@ logger = logging.getLogger(__name__) class Tag(object): """``class`` that abstract tags and act as a simple container.""" def __init__(self, name, valuable, ttl=-1, refresh=None, parent=None): def __init__(self, name, valuable, ttl=-1, refresh=None, parent=None, background=False): """ :param string name: tag name :param valuable: something that gives tag value, string or callable Loading @@ -26,8 +27,10 @@ class Tag(object): * positive integer in seconds :param None,int refresh: period used to refresh tag value on the node :param object obj: parent object the tag is attached to, it may be :param object parent: parent object the tag is attached to, it may be needed for the tag callable to process the tag :param bool background: calculate the tag value in a background thread (migth be usefull to not block the event loop) """ self.name = name self.value = None Loading @@ -35,6 +38,7 @@ class Tag(object): self.ttl = ttl self.refresh = refresh self.parent = parent if parent is None else weakref.proxy(parent) self.background = bool(background) if inspect.isfunction(valuable): self.is_function = True Loading @@ -48,7 +52,10 @@ class Tag(object): else: self.value = valuable #: timer instance self.watcher = None #: async watcher in case of background is True self.async = None # special arguments for tag db self.db = None Loading @@ -65,8 +72,34 @@ class Tag(object): def update_value(self): """Called when the tag value may change.""" prev_value = self.value if not self.background: self.calculate_value() if self.value != prev_value: self._handle_registration(prev_value) return # else # create a thread that will run the calculate_value method def thread_run(): self.calculate_value() self.async.send() # keep previous value in watcher's opaque self.async.data = prev_value self.async.start() th = threading.Thread(target=thread_run) th.daemon = True th.start() def async_cb(self, watcher, revents): if watcher.data != self.value: self._handle_registration(watcher.data) watcher.data = None watcher.stop() def _handle_registration(self, prev_value): if self.db is None: # when we calculate the tag, the latter could raise an exception # and could provoke a deconnection to the libvirt for example, Loading @@ -74,9 +107,7 @@ class Tag(object): # include the current (in case this happens, just return) return if prev_value == self.value: return elif prev_value is None: if prev_value is None: # we need to register tag again if self.sub_id == '__main__': logger.debug('Register tag %s', self.name) Loading Loading @@ -106,6 +137,9 @@ class Tag(object): """ :param loop: pyev loop """ if self.background: self.async = loop.async(self.async_cb) if not self.is_function: return Loading Loading
cloudcontrol/common/client/tags.py +40 −6 Original line number Diff line number Diff line Loading @@ -15,7 +15,8 @@ logger = logging.getLogger(__name__) class Tag(object): """``class`` that abstract tags and act as a simple container.""" def __init__(self, name, valuable, ttl=-1, refresh=None, parent=None): def __init__(self, name, valuable, ttl=-1, refresh=None, parent=None, background=False): """ :param string name: tag name :param valuable: something that gives tag value, string or callable Loading @@ -26,8 +27,10 @@ class Tag(object): * positive integer in seconds :param None,int refresh: period used to refresh tag value on the node :param object obj: parent object the tag is attached to, it may be :param object parent: parent object the tag is attached to, it may be needed for the tag callable to process the tag :param bool background: calculate the tag value in a background thread (migth be usefull to not block the event loop) """ self.name = name self.value = None Loading @@ -35,6 +38,7 @@ class Tag(object): self.ttl = ttl self.refresh = refresh self.parent = parent if parent is None else weakref.proxy(parent) self.background = bool(background) if inspect.isfunction(valuable): self.is_function = True Loading @@ -48,7 +52,10 @@ class Tag(object): else: self.value = valuable #: timer instance self.watcher = None #: async watcher in case of background is True self.async = None # special arguments for tag db self.db = None Loading @@ -65,8 +72,34 @@ class Tag(object): def update_value(self): """Called when the tag value may change.""" prev_value = self.value if not self.background: self.calculate_value() if self.value != prev_value: self._handle_registration(prev_value) return # else # create a thread that will run the calculate_value method def thread_run(): self.calculate_value() self.async.send() # keep previous value in watcher's opaque self.async.data = prev_value self.async.start() th = threading.Thread(target=thread_run) th.daemon = True th.start() def async_cb(self, watcher, revents): if watcher.data != self.value: self._handle_registration(watcher.data) watcher.data = None watcher.stop() def _handle_registration(self, prev_value): if self.db is None: # when we calculate the tag, the latter could raise an exception # and could provoke a deconnection to the libvirt for example, Loading @@ -74,9 +107,7 @@ class Tag(object): # include the current (in case this happens, just return) return if prev_value == self.value: return elif prev_value is None: if prev_value is None: # we need to register tag again if self.sub_id == '__main__': logger.debug('Register tag %s', self.name) Loading Loading @@ -106,6 +137,9 @@ class Tag(object): """ :param loop: pyev loop """ if self.background: self.async = loop.async(self.async_cb) if not self.is_function: return Loading