Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python
#coding=utf8
'''
Jobs management on the server.
'''
from __future__ import absolute_import
import logging
import time
from datetime import datetime
from threading import Thread, Lock
from ccserver.exceptions import BadJobTypeError, UnknownJobError
class JobCancelError(Exception):
'''
Exception used by jobs to stop it when a cancel signal is sent.
'''
pass
class BaseJob(dict, Thread, object):
'''
A base class to define a job.
:param manager: the :class:`JobsManager` instance.
'''
def __init__(self, manager, *args, **kwargs):
# Initialize the inherited classes:
dict.__init__(self, *args, **kwargs)
Thread.__init__(self)
# The manager of this job:
self.manager = manager
# Define default status:
self['status'] = 'pending'
self['done'] = False
# List of actions to do by the rollback method:
self._wayback = []
# Set the thread name:
self.name = 'job-%s' % self['id']
def __hash__(self):
return self['id'].__hash__()
def __setitem__(self, key, value):
if key == 'id':
raise KeyError('Key %r in read-only.' % key)
else:
super(BaseJob, self).__setitem__(key, value)
def __delitem__(self, key):
if key == 'id':
raise KeyError('Key %r in read-only.' % key)
else:
super(BaseJob, self).__delitem__(key)
def report(self, status, done=None):
'''
Report the status of the job.
:param status: the status to set to the job
:param done: is the job done, None to keep current value
'''
self['status'] = status
if done is not None:
self['done'] = done
def run(self):
'''
Run the job itself.
'''
try:
self.job()
except JobCancelError as err:
self.rollback('%s' % err)
except Exception as err:
logging.error('Error while executing job: %s', err)
self.rollback('%s' % err)
def job(self):
'''
Method to override to define the job's behavior.
'''
self.report('success', done=True)
def checkpoint(self, func=None):
'''
Check if job is not cancelled, else raise the CancellJobError. Also
add the provided function (optionnal) to the wayback list.
:param func: callable to add to the wayback list.
'''
if self['done']:
raise JobCancelError('Job has been cancelled by user')
if func is not None:
self._wayback.append(func)
def rollback(self, error):
'''
Rollback the job.
'''
self.report('rollbacking')
for func in self._wayback:
func()
self.report('rollback: %s' % error, done=True)
def cancel(self):
'''
Cancel the job.
'''
self['done'] = True
self['status'] = 'cancelling'
def export(self):
'''
Export the job in a simple dict format.
'''
exported = {}
for key, val in self.iteritems():
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
if isinstance(val, datetime):
now = datetime.now()
dt = now - val
val = dt.seconds + dt.days * 86400
exported[key] = val
return exported
class KillClientJob(BaseJob):
'''
A job used to kill connected accounts.
Mandatory items:
* account: the account login to kill
Optional items:
* gracetime: time before to kill the user
'''
def job(self):
gracetime = self.get('gracetime')
account = self.get('account')
assert account is not None, 'Account not specified'
if gracetime is not None:
time.sleep(int(gracetime))
self.checkpoint()
self.manager.server.kill(account)
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class KillOldCliJob(BaseJob):
'''
Typically an hidden job used to kill clients who are connected/idle since
too much time.
Mandatory items:
* maxcon: maximum connection time in minutes
* maxidle: maximum idle time in minutes
Optional items:
* delay: delay in secondes between two checks (default 1m)
'''
DEFAULT_DELAY = 60
def job(self):
maxcon = self.get('maxcon')
assert maxcon is not None, 'maxcon is None'
maxidle = self.get('maxidle')
assert maxidle is not None, 'maxidle is None'
delay = self.get('delay', self.DEFAULT_DELAY)
while True:
self.checkpoint()
for client in self.manager.server.iter_connected_role('cli'):
if client.get_uptime() > (maxcon * 60):
self.manager.server.kill(client.login)
#TODO: handle idleing.
time.sleep(delay)
class JobsManager(object):
'''
Manage the current job list.
:param server: The :class:`CCServer` instance.
'''
JOBS_TYPES = {
'kill': KillClientJob,
'kill_oldcli': KillOldCliJob,
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
}
def __init__(self, server):
# The main job dict, the keys are id of jobs.
self._jobs = {}
# The server:
self.server = server
# The id of the next job and it's lock:
self._current_id = 1
self._current_id_lock = Lock()
def create(self, jtype, **kwargs):
'''
Create a new job.
:param jtype: the type of the new job
:param \*\*kwargs: arguments to pass to the job
:raise BadJobTypeError: when invalid jtype is passed
'''
jobid = self.get_id()
jobtype = JobsManager.JOBS_TYPES.get(jtype)
if jobtype is None:
raise BadJobTypeError('Invalid job type %r' % jtype)
job = jobtype(self, id=jobid, **kwargs)
job['since'] = datetime.now()
self._jobs[jobid] = job
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
job.start()
return job
def get_id(self):
'''
Get the current id and increment the counter.
'''
with self._current_id_lock:
jobid = self._current_id
self._current_id += 1
return jobid
def cancel(self, jobid):
'''
Cancel the provided job.
'''
job = self._jobs.get(jobid)
if job is None:
raise UnknownJobError('Jobid %r is unknown' % jobid)
else:
job.cancel()
def purge(self):
'''
Purge all done jobs.
'''
for job in self._jobs.values():
if job['done']:
del self._jobs[job['id']]
def iterjobs(self, done=None):
'''
Iter over jobs.
:param done: If set, iterate over done or not done jobs.
'''
for job in self._jobs.itervalues():
if (done is None or job['done'] == done) and not job.get('_hidden'):