Commit a64dbd20 authored by Anael Beutot's avatar Anael Beutot
Browse files

Refactored BaseThreadedJob.

Handles correctly termination.
parent a06afe7f
Loading
Loading
Loading
Loading
+20 −18
Original line number Diff line number Diff line
@@ -66,13 +66,9 @@ class BaseThreadedJob(Thread):
        Thread.__init__(self)
        #: report progress in %
        self.progress = 0.
        #: async watcher to handle notification to main loop
        self.watcher = ev_loop.async(self.notify_cb)

        self.job_manager = job_manager

        self.error_msg = 'Unexpected exception during job execution'

        #: job id
        self.id = job_manager.job_id.next()

@@ -80,7 +76,8 @@ class BaseThreadedJob(Thread):

    def pre_job(self):
        """Job preparation that is called when doing start, it can raise
        exceptions to report error to the caller.
        exceptions to report error to the caller. In the latter case, it will
        also removes itself from the job list.

        """
        pass
@@ -93,14 +90,11 @@ class BaseThreadedJob(Thread):
        try:
            self.run_job()
        except Exception:
            logger.exception(self.error_msg)
            raise
            pass
        finally:
            self.running = False
            self.watcher.send()

    def notify_cb(self, *args):
        self.watcher.stop()
    def notify(self):
        self.job_manager.notify(self)

    def start(self):
@@ -109,22 +103,30 @@ class BaseThreadedJob(Thread):
            self.pre_job()
        except Exception:
            # in case of error we must remove the job from the manager
            self.job_manager.remove(self)
            self.notify()
            raise
        # then we start the watcher when it's safe
        self.watcher.start()
        self.running = True
        Thread.start(self)
        Thread.start(self)  # thread will signal when it's done using async

    def wait(self):
        """For jobs running in a background, this method MUST be called in order
        to remove the job from the list in the end.

        """
        self.join()
        self.notify()

    def start_current(self):
        """Start job in current thread."""
        try:
            self.pre_job()
        except Exception:
            self.job_manager.remove(self)
            raise
            self.running = True
            self.run_job()
            # we could log exceptions here but rather do it inside the run_job
            # method
        finally:
            self.notify()

    def stop(self):
        self.running = False