import logging from threading import Thread logger = logging.getLogger(__name__) class JobManager(object): def __init__(self, main_loop): """ :param main_loop: :class:`MainLoop` instance """ def counter(): i = 0 while True: yield i i += 1 self.job_id = counter() self.main = main_loop #: keep an index of all jobs self.jobs = {} def job_start(self): pass def job_stop(self): pass def notify(self, job): """Called when a job is done.""" # by now only remove the job self.remove(job.id) def cancel(self, job_id): """Cancel a job.""" self.jobs[job_id].stop() def remove(self, job_id): try: return self.jobs.pop(job_id) except KeyError: logger.error('Job %s does not exist', job_id) def create(self, job_constructor, *args, **kwargs): """Create a new job and populate job id.""" job = job_constructor(self, self.main.evloop, *args, **kwargs) self.jobs[job.id] = job return job def get(self, job_id): return self.jobs[job_id] def start(self): pass def stop(self): pass class BaseThreadedJob(Thread): """Job running in a background thread. Handles job notification to the job manager. """ def __init__(self, job_manager, ev_loop): Thread.__init__(self) #: report progress in % self.progress = 0. self.job_manager = job_manager #: job id self.id = job_manager.job_id.next() self.running = False def pre_job(self): """Job preparation that is called when doing start, it can raise exceptions to report error to the caller. In the latter case, it will also removes itself from the job list. """ pass def run_job(self): """Overide this method to define what your job do.""" raise NotImplementedError def run(self): try: self.run_job() except Exception: pass finally: self.running = False def notify(self): self.job_manager.notify(self) def start(self): # first we run pre_job as it could raise an exception try: self.pre_job() except Exception: # in case of error we must remove the job from the manager self.notify() raise # then we start the watcher when it's safe self.running = True Thread.start(self) # thread will signal when it's done using async def wait(self): """For jobs running in a background, this method MUST be called in order to remove the job from the list in the end. """ self.join() self.notify() def start_current(self): """Start job in current thread.""" try: self.pre_job() self.running = True self.run_job() # we could log exceptions here but rather do it inside the run_job # method finally: self.notify() def stop(self): self.running = False