-
Anael Beutot authoredAnael Beutot authored
jobs.py 3.04 KiB
import logging
from threading import Thread
logger = logging.getLogger(__name__)
class JobManager(object):
def __init__(self, main_loop):
"""
:param main_loop: :class:`MainLoop` instance
"""
def counter():
i = 0
while True:
yield i
i += 1
self.job_id = counter()
self.main = main_loop
#: keep an index of all jobs
self.jobs = {}
def job_start(self):
pass
def job_stop(self):
pass
def notify(self, job):
"""Called when a job is done."""
# by now only remove the job
self.remove(job)
def cancel(self, job_id):
"""Cancel a job."""
self.jobs[job_id].stop()
def remove(self, job_id):
return self.jobs.pop(job_id)
def create(self, job_constructor, *args, **kwargs):
"""Create a new job and populate job id."""
job = job_constructor(self, self.main.evloop, *args, **kwargs)
self.jobs[job.id] = job
return job
def get(self, job_id):
return self.jobs[job_id]
def start(self):
pass
def stop(self):
pass
class BaseThreadedJob(Thread):
"""Job running in a background thread.
Handles job notification to the job manager.
"""
def __init__(self, job_manager, ev_loop):
Thread.__init__(self)
#: report progress in %
self.progress = 0.
#: async watcher to handle notification to main loop
self.watcher = ev_loop.async(self.notify_cb)
self.job_manager = job_manager
self.error_msg = 'Unexpected exception during job execution'
#: job id
self.id = job_manager.job_id.next()
self.running = False
def pre_job(self):
"""Job preparation that is called when doing start, it can raise
exceptions to report error to the caller.
"""
pass
def run_job(self):
"""Overide this method to define what your job do."""
raise NotImplementedError
def run(self):
try:
self.run_job()
except Exception:
logger.exception(self.error_msg)
raise
finally:
self.running = False
self.watcher.send()
def notify_cb(self, *args):
self.watcher.stop()
self.job_manager.notify(self)
def start(self):
# first we run pre_job as it could raise an exception
try:
self.pre_job()
except Exception:
# in case of error we must remove the job from the manager
self.job_manager.remove(self)
raise
# then we start the watcher when it's safe
self.watcher.start()
self.running = True
Thread.start(self)
def start_current(self):
"""Start job in current thread."""
try:
self.pre_job()
except Exception:
self.job_manager.remove(self)
raise
self.running = True
self.run_job()
def stop(self):
self.running = False