Skip to content
jobs.py 3.14 KiB
Newer Older
import logging
from threading import Thread


logger = logging.getLogger(__name__)
class JobManager(object):
    def __init__(self, main_loop):
        """
        :param main_loop: :class:`MainLoop` instance
        """
        def counter():
            i = 0
            while True:
                yield i
                i += 1

        self.job_id = counter()
        self.main = main_loop
        #: keep an index of all jobs
        self.jobs = {}

    def job_start(self):
        pass

    def job_stop(self):
        pass

    def notify(self, job):
        """Called when a job is done."""
        # by now only remove the job
        self.remove(job)

    def cancel(self, job_id):
        """Cancel a job."""
Anael Beutot's avatar
Anael Beutot committed
        self.jobs[job_id].stop()
    def remove(self, job_id):
        try:
            return self.jobs.pop(job_id)
        except KeyError:
            logger.error('Job %s does not exist', job_id)

    def create(self, job_constructor, *args, **kwargs):
        """Create a new job and populate job id."""
        job = job_constructor(self, self.main.evloop, *args, **kwargs)
        self.jobs[job.id] = job
        return job

    def get(self, job_id):
        return self.jobs[job_id]

    def start(self):
        pass
    def stop(self):
        pass


class BaseThreadedJob(Thread):
    """Job running in a background thread.

    Handles job notification to the job manager.
    """
    def __init__(self, job_manager, ev_loop):
        Thread.__init__(self)
        #: report progress in %
        self.progress = 0.
        #: async watcher to handle notification to main loop
        self.watcher = ev_loop.async(self.notify_cb)

        self.job_manager = job_manager

        self.error_msg = 'Unexpected exception during job execution'

        #: job id
        self.id = job_manager.job_id.next()

        self.running = False

    def pre_job(self):
        """Job preparation that is called when doing start, it can raise
        exceptions to report error to the caller.

        """
        pass

    def run_job(self):
        """Overide this method to define what your job do."""
        raise NotImplementedError

    def run(self):
        try:
            self.run_job()
        except Exception:
            logger.exception(self.error_msg)
            raise
        finally:
            self.running = False
            self.watcher.send()

    def notify_cb(self, *args):
        self.watcher.stop()
        self.job_manager.notify(self)

    def start(self):
        # first we run pre_job as it could raise an exception
        try:
            self.pre_job()
        except Exception:
            # in case of error we must remove the job from the manager
            self.job_manager.remove(self)
            raise
        # then we start the watcher when it's safe
        self.watcher.start()
        self.running = True
        Thread.start(self)

    def start_current(self):
        """Start job in current thread."""
        try:
            self.pre_job()
        except Exception:
            self.job_manager.remove(self)
            raise
        self.running = True
        self.run_job()

    def stop(self):
        self.running = False