Newer
Older
class JobManager(object):
def __init__(self, main_loop):
"""
:param main_loop: :class:`MainLoop` instance
"""
def counter():
i = 0
while True:
yield i
i += 1
self.job_id = counter()
self.main = main_loop
#: keep an index of all jobs
self.jobs = {}
def job_start(self):
pass
def job_stop(self):
pass
def notify(self, job):
"""Called when a job is done."""
# by now only remove the job
self.remove(job)
def cancel(self, job_id):
"""Cancel a job."""
try:
return self.jobs.pop(job_id)
except KeyError:
logger.error('Job %s does not exist', job_id)
def create(self, job_constructor, *args, **kwargs):
"""Create a new job and populate job id."""
job = job_constructor(self, self.main.evloop, *args, **kwargs)
self.jobs[job.id] = job
return job
def get(self, job_id):
return self.jobs[job_id]
def start(self):
pass
def stop(self):
pass
class BaseThreadedJob(Thread):
"""Job running in a background thread.
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
Handles job notification to the job manager.
"""
def __init__(self, job_manager, ev_loop):
Thread.__init__(self)
#: report progress in %
self.progress = 0.
#: async watcher to handle notification to main loop
self.watcher = ev_loop.async(self.notify_cb)
self.job_manager = job_manager
self.error_msg = 'Unexpected exception during job execution'
#: job id
self.id = job_manager.job_id.next()
self.running = False
def pre_job(self):
"""Job preparation that is called when doing start, it can raise
exceptions to report error to the caller.
"""
pass
def run_job(self):
"""Overide this method to define what your job do."""
raise NotImplementedError
def run(self):
try:
self.run_job()
except Exception:
logger.exception(self.error_msg)
raise
finally:
self.running = False
self.watcher.send()
def notify_cb(self, *args):
self.watcher.stop()
self.job_manager.notify(self)
def start(self):
# first we run pre_job as it could raise an exception
try:
self.pre_job()
except Exception:
# in case of error we must remove the job from the manager
self.job_manager.remove(self)
raise
# then we start the watcher when it's safe
self.watcher.start()
self.running = True
Thread.start(self)
def start_current(self):
"""Start job in current thread."""
try:
self.pre_job()
except Exception:
self.job_manager.remove(self)
raise
self.running = True
self.run_job()
def stop(self):
self.running = False