Skip to content
jobs.py 3.19 KiB
Newer Older
import logging
from threading import Thread


logger = logging.getLogger(__name__)
class JobManager(object):
    def __init__(self, main_loop):
        """
        :param main_loop: :class:`MainLoop` instance
        """
        def counter():
            i = 0
            while True:
                yield i
                i += 1

        self.job_id = counter()
        self.main = main_loop
        #: keep an index of all jobs
        self.jobs = {}

    def job_start(self):
        pass

    def job_stop(self):
        pass

    def notify(self, job):
        """Called when a job is done."""
        # by now only remove the job
        self.remove(job)

    def cancel(self, job_id):
        """Cancel a job."""
Anael Beutot's avatar
Anael Beutot committed
        self.jobs[job_id].stop()
    def remove(self, job_id):
        try:
            return self.jobs.pop(job_id)
        except KeyError:
            logger.error('Job %s does not exist', job_id)

    def create(self, job_constructor, *args, **kwargs):
        """Create a new job and populate job id."""
        job = job_constructor(self, self.main.evloop, *args, **kwargs)
        self.jobs[job.id] = job
        return job

    def get(self, job_id):
        return self.jobs[job_id]

    def start(self):
        pass
    def stop(self):
        pass


class BaseThreadedJob(Thread):
    """Job running in a background thread.

    Handles job notification to the job manager.
    """
    def __init__(self, job_manager, ev_loop):
        Thread.__init__(self)
        #: report progress in %
        self.progress = 0.

        self.job_manager = job_manager

        #: job id
        self.id = job_manager.job_id.next()

        self.running = False

    def pre_job(self):
        """Job preparation that is called when doing start, it can raise
        exceptions to report error to the caller. In the latter case, it will
        also removes itself from the job list.

        """
        pass

    def run_job(self):
        """Overide this method to define what your job do."""
        raise NotImplementedError

    def run(self):
        try:
            self.run_job()
        except Exception:
            pass
        finally:
            self.running = False

    def notify(self):
        self.job_manager.notify(self)

    def start(self):
        # first we run pre_job as it could raise an exception
        try:
            self.pre_job()
        except Exception:
            # in case of error we must remove the job from the manager
            self.notify()
            raise
        # then we start the watcher when it's safe
        self.running = True
        Thread.start(self)  # thread will signal when it's done using async

    def wait(self):
        """For jobs running in a background, this method MUST be called in order
        to remove the job from the list in the end.

        """
        self.join()
        self.notify()

    def start_current(self):
        """Start job in current thread."""
        try:
            self.pre_job()
            self.running = True
            self.run_job()
            # we could log exceptions here but rather do it inside the run_job
            # method
        finally:
            self.notify()

    def stop(self):
        self.running = False