Newer
Older
#!/usr/bin/env python
#coding=utf8
'''
Jobs management on the server.
'''
from __future__ import absolute_import
import logging
import time
from datetime import datetime
from threading import Thread, Lock
from ccserver.exceptions import BadJobTypeError, UnknownJobError
class JobCancelError(Exception):
'''
Exception used by jobs to stop it when a cancel signal is sent.
'''
pass
class BaseJob(dict, Thread, object):
'''
A base class to define a job.
The standards job items are:
* id: id of the job
* status: message explaining the current job status
* done: True if the job is done
* cancelled: True if job has been cancelled by user
* created: job date of creation
* ended: job date of end (or None if done = False)
* duration: duration in seconds of the job (processed on export)
* author: author login of the job
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
:param manager: the :class:`JobsManager` instance.
'''
def __init__(self, manager, *args, **kwargs):
# Initialize the inherited classes:
dict.__init__(self, *args, **kwargs)
Thread.__init__(self)
# The manager of this job:
self.manager = manager
# Define default status:
self['status'] = 'pending'
self['done'] = False
# List of actions to do by the rollback method:
self._wayback = []
# Set the thread name:
self.name = 'job-%s' % self['id']
def __hash__(self):
return self['id'].__hash__()
def __setitem__(self, key, value):
if key == 'id':
raise KeyError('Key %r in read-only.' % key)
else:
super(BaseJob, self).__setitem__(key, value)
def __delitem__(self, key):
if key == 'id':
raise KeyError('Key %r in read-only.' % key)
else:
super(BaseJob, self).__delitem__(key)
def report(self, status, done=None):
'''
Report the status of the job.
:param status: the status to set to the job
:param done: is the job done, None to keep current value
'''
self['status'] = status
if done is not None:
self['done'] = done
def run(self):
'''
Run the job itself.
'''
try:
self.job()
except JobCancelError as err:
logging.error('Error while executing job: %s, %r', err, err)
else:
self.report('success', done=True)
def job(self):
'''
Method to override to define the job's behavior.
'''
def checkpoint(self, func=None):
'''
Check if job is not cancelled, else raise the CancellJobError. Also
add the provided function (optionnal) to the wayback list.
:param func: callable to add to the wayback list.
'''
if self['done']:
raise JobCancelError('Job has been cancelled by user')
if func is not None:
self._wayback.append(func)
'''
Rollback the job.
'''
self.report('rollbacking')
try:
for func in self._wayback:
func()
except Exception as err:
self.report('rollback failed: %s' % err, done=True)
else:
self.report('rollback: %s' % error, done=True)
def cancel(self):
'''
Cancel the job.
'''
self['done'] = True
self['status'] = 'cancelling'
Antoine Millet
committed
def export(self, props=None):
'''
Export the job in a simple dict format.
'''
exported = {}
for key, val in self.iteritems():
Antoine Millet
committed
if key.startswith('_') or (props is not None and key in props):
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
if isinstance(val, datetime):
now = datetime.now()
dt = now - val
val = dt.seconds + dt.days * 86400
exported[key] = val
return exported
class KillClientJob(BaseJob):
'''
A job used to kill connected accounts.
Mandatory items:
* account: the account login to kill
Optional items:
* gracetime: time before to kill the user
'''
def job(self):
gracetime = self.get('gracetime')
account = self.get('account')
assert account is not None, 'Account not specified'
if gracetime is not None:
time.sleep(int(gracetime))
self.checkpoint()
self.manager.server.kill(account)
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
class KillOldCliJob(BaseJob):
'''
Typically an hidden job used to kill clients who are connected/idle since
too much time.
Mandatory items:
* maxcon: maximum connection time in minutes
* maxidle: maximum idle time in minutes
Optional items:
* delay: delay in secondes between two checks (default 1m)
'''
DEFAULT_DELAY = 60
def job(self):
maxcon = self.get('maxcon')
assert maxcon is not None, 'maxcon is None'
maxidle = self.get('maxidle')
assert maxidle is not None, 'maxidle is None'
delay = self.get('delay', self.DEFAULT_DELAY)
while True:
self.checkpoint()
for client in self.manager.server.iter_connected_role('cli'):
if client.get_uptime() > (maxcon * 60):
self.manager.server.kill(client.login)
#TODO: handle idleing.
time.sleep(delay)
class JobsManager(object):
'''
Manage the current job list.
:param server: The :class:`CCServer` instance.
'''
JOBS_TYPES = {
'kill': KillClientJob,
'kill_oldcli': KillOldCliJob,
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
}
def __init__(self, server):
# The main job dict, the keys are id of jobs.
self._jobs = {}
# The server:
self.server = server
# The id of the next job and it's lock:
self._current_id = 1
self._current_id_lock = Lock()
def create(self, jtype, **kwargs):
'''
Create a new job.
:param jtype: the type of the new job
:param \*\*kwargs: arguments to pass to the job
:raise BadJobTypeError: when invalid jtype is passed
'''
jobid = self.get_id()
jobtype = JobsManager.JOBS_TYPES.get(jtype)
if jobtype is None:
raise BadJobTypeError('Invalid job type %r' % jtype)
job = jobtype(self, id=jobid, **kwargs)
job['since'] = datetime.now()
self._jobs[jobid] = job
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
job.start()
return job
def get_id(self):
'''
Get the current id and increment the counter.
'''
with self._current_id_lock:
jobid = self._current_id
self._current_id += 1
return jobid
def cancel(self, jobid):
'''
Cancel the provided job.
'''
job = self._jobs.get(jobid)
if job is None:
raise UnknownJobError('Jobid %r is unknown' % jobid)
else:
job.cancel()
def purge(self):
'''
Purge all done jobs.
'''
for job in self._jobs.values():
if job['done']:
del self._jobs[job['id']]
def iterjobs(self, done=None):
'''
Iter over jobs.
:param done: If set, iterate over done or not done jobs.
'''
for job in self._jobs.itervalues():
if (done is None or job['done'] == done) and not job.get('_hidden'):