Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from threading import Lock
from cloudcontrol.server.jobs.hotmigration import HotMigrationJob
from cloudcontrol.server.jobs.coldmigration import ColdMigrationJob
from cloudcontrol.server.jobs.clone import CloneJob
from cloudcontrol.server.jobs.killclient import KillClientJob
from cloudcontrol.server.jobs.killoldcli import KillOldCliJob
from cloudcontrol.server.exceptions import (BadJobTypeError, UnknownJobError)
class JobsManager(object):
""" Manage the current job list.
:param server: The :class:`CCServer` instance.
"""
JOBS_TYPES = {
'kill': KillClientJob,
'kill_oldcli': KillOldCliJob,
'cold_migrate': ColdMigrationJob,
'hot_migrate': HotMigrationJob,
'clone': CloneJob,
}
def __init__(self, logger, server):
self.logger = logger
# The main job dict, the keys are id of jobs.
self._jobs = {}
# The server:
self.server = server
# The id of the next job and it's lock:
self._current_id = 1
self._current_id_lock = Lock()
def create(self, jtype, **kwargs):
""" Create a new job.
:param jtype: the type of the new job
:param \*\*kwargs: arguments to pass to the job
:raise BadJobTypeError: when invalid jtype is passed
"""
jobid = self.get_id()
jobtype = JobsManager.JOBS_TYPES.get(jtype)
if jobtype is None:
raise BadJobTypeError('Invalid job type %r' % jtype)
job = jobtype(self.logger.getChild(str(jobid)), self, id=jobid, **kwargs)
self._jobs[jobid] = job
job.daemon = True
job.start()
return job
def get_id(self):
""" Get the current id and increment the counter.
"""
with self._current_id_lock:
jobid = self._current_id
self._current_id += 1
return jobid
def cancel(self, jobid):
""" Cancel the provided job.
"""
job = self._jobs.get(jobid)
if job is None:
raise UnknownJobError('Invalid job id: %r' % jobid)
elif job.get('_hidden', False):
raise UnknownJobError('Invalid job id: %r (hidden)' % jobid)
else:
job.cancel()
def purge(self):
""" Purge all done jobs.
"""
for job in self._jobs.values():
if job['done']:
del self._jobs[job['id']]
def iterjobs(self, show_done=True, show_running=True):
""" Iter over jobs.
:param done: If set, iterate over done or not done jobs.
"""
for job in self._jobs.itervalues():
if (show_done and job['done'] or show_running and not job['done']
and not job.get('_hidden')):
yield job