import logging from threading import Thread logger = logging.getLogger(__name__) class JobManager(object): def __init__(self, main_loop): """ :param main_loop: :class:`MainLoop` instance """ def counter(): i = 0 while True: yield i i += 1 self.job_id = counter() self.main = main_loop #: keep an index of all jobs self.jobs = {} def job_start(self): pass def job_stop(self): pass def notify(self, job): """Called when a job is done.""" # by now only remove the job self.remove(job) def cancel(self, job_id): """Cancel a job.""" self.jobs[job_id].stop() def remove(self, job_id): return self.jobs.pop(job_id) def create(self, job_constructor, *args, **kwargs): """Create a new job and populate job id.""" job = job_constructor(self, self.main.evloop, *args, **kwargs) self.jobs[job.id] = job return job def get(self, job_id): return self.jobs[job_id] def start(self): pass def stop(self): pass class BaseThreadedJob(Thread): """Job running in a background thread. Handles job notification to the job manager. """ def __init__(self, job_manager, ev_loop): Thread.__init__(self) #: report progress in % self.progress = 0. #: async watcher to handle notification to main loop self.watcher = ev_loop.async(self.notify_cb) self.job_manager = job_manager self.error_msg = 'Unexpected exception during job execution' #: job id self.id = job_manager.job_id.next() self.running = False def pre_job(self): """Job preparation that is called when doing start, it can raise exceptions to report error to the caller. """ pass def run_job(self): """Overide this method to define what your job do.""" raise NotImplementedError def run(self): try: self.run_job() except Exception: logger.exception(self.error_msg) raise finally: self.running = False self.watcher.send() def notify_cb(self, *args): self.watcher.stop() self.job_manager.notify(self) def start(self): # first we run pre_job as it could raise an exception try: self.pre_job() except Exception: # in case of error we must remove the job from the manager self.job_manager.remove(self) raise # then we start the watcher when it's safe self.watcher.start() self.running = True Thread.start(self) def start_current(self): """Start job in current thread.""" try: self.pre_job() except Exception: self.job_manager.remove(self) raise self.running = True self.run_job() def stop(self): self.running = False