Commit fc44ac06 authored by Anael Beutot's avatar Anael Beutot Committed by Antoine Millet
Browse files

Added support for executing work in pyev thread from any thread in MainLoop

parent 047910f8
Loading
Loading
Loading
Loading
+52 −0
Original line number Diff line number Diff line
import time
import Queue
import signal
import threading
import logging
import logging.config
from threading import Thread
@@ -156,6 +158,13 @@ class MainLoop(object):

        self.evloop = pyev.Loop(debug=self.config.debug)

        # keeps track of libev loop thread, create threading queue and an async
        # watcher for performing work in libev thread from external thread
        # see decorators in utils.py
        self.pyev_thread = threading.current_thread()
        self.async_queue = Queue.Queue()
        self.async_watcher = self.evloop.async(self.async_work_cb)

        # set signal watchers
        self.signals = {
            signal.SIGINT: self.stop,
@@ -205,6 +214,48 @@ class MainLoop(object):
    def configure_logging(self):
        raise NotImplementedError

    def call_in_main_thread(self, func, *args, **kwargs):
        """Performs work in pyev thread.
        """
        if threading.current_thread() is self.pyev_thread:
            # if called from main thread, don't do anything special
            return func(*args, **kwargs)
        else:
            class Return(object):
                def __init__(self):
                    self.return_value = None
                    self.exc = None
            return_ = Return()
            event = threading.Event()
            def cb():
                try:
                    return_.return_value = func(*args, **kwargs)
                except Exception as e:
                    return_.exc = e
                finally:
                    event.set()
            self.async_queue.put(cb)
            self.async_watcher.send()

            # wait for work to be processed and raise error
            event.wait()
            if return_.exc is not None:
                raise return_.exc
            return return_.return_value

    def async_work_cb(self, watcher, revents):
        logger.debug('Async work processing')
        while True:
            try:
                work = self.async_queue.get_nowait()
            except Queue.Empty:
                break
            try:
                work()
            except Exception:
                # this should not happen but just in case
                logger.exception('Error async work')

    # RPC handlers definitions
    @threadless
    def sub_tags(self, sub_id, tags=None):
@@ -290,6 +341,7 @@ class MainLoop(object):

    def start(self):
        logger.info('Starting node')
        self.async_watcher.start()
        for signal_ in self.signals.itervalues():
            signal_.start()
        logger.debug('About to connect')