Newer
Older
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class JobManager(object):
def __init__(self, main_loop):
"""
:param main_loop: :class:`MainLoop` instance
"""
def counter():
i = 0
while True:
yield i
i += 1
self.job_id = counter()
self.main = main_loop
#: keep an index of all jobs
self.jobs = {}
def job_start(self):
pass
def job_stop(self):
pass
def notify(self, job):
"""Called when a job is done."""
# by now only remove the job
self.remove(job)
def cancel(self, job_id):
"""Cancel a job."""
self.jobs[job_id].running = False
def remove(self, job):
return self.jobs.pop(job.id)
def create(self, job_constructor, *args, **kwargs):
"""Create a new job and populate job id."""
job = job_constructor(self, self.main.evloop, *args, **kwargs)
self.jobs[job.id] = job
return job
def get(self, job_id):
return self.jobs[job_id]
def start(self):
pass
def stop(self):
pass
class BaseJob(Thread):
"""Base job
Handles job notification to the job manager.
"""
def __init__(self, job_manager, ev_loop):
Thread.__init__(self)
#: report progress in %
self.progress = 0.
#: async watcher to handle notification to main loop
self.watcher = ev_loop.async(self.notify_cb)
self.job_manager = job_manager
self.error_msg = 'Unexpected exception during job execution'
#: job id
self.id = job_manager.job_id.next()
self.running = False
def pre_job(self):
"""Job preparation that is called when doing start, it can raise
exceptions to report error to the caller.
"""
pass
def run_job(self):
"""Overide this method to define what your job do."""
raise NotImplementedError
def run(self):
try:
self.run_job()
except Exception:
logger.exception(self.error_msg)
raise
finally:
self.running = False
self.watcher.send()
def notify_cb(self, *args):
self.watcher.stop()
self.job_manager.notify(self)
def start(self):
# first we run pre_job as it could raise an exception
try:
self.pre_job()
except Exception:
# in case of error we must remove the job from the manager
self.job_manager.remove(self)
raise
# then we start the watcher when it's safe
self.watcher.start()
self.running = True
Thread.start(self)
def start_current(self):
"""Start job in current thread."""
try:
self.pre_job()
except Exception:
self.job_manager.remove(self)
raise
self.running = True
self.run_job()
def stop(self):
self.running = False