Loading ccserver/ccserver.py +4 −1 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ from ccserver.exceptions import AlreadyRegistered, NotConnectedAccountError from ccserver.orderedset import OrderedSet from ccserver.tql import TqlParser, TqlObject from ccserver.objectsdb import ObjectsDB from ccserver.jobs import JobsManager class CCServer(object): ''' Loading Loading @@ -66,6 +66,9 @@ class CCServer(object): default_handler=WelcomeHandler(self), on_disconnect='on_disconnect') # The jobs manager: self.jobs = JobsManager(self) # Register accounts on the database: self._update_accounts() Loading ccserver/exceptions.py +8 −0 Original line number Diff line number Diff line Loading @@ -27,3 +27,11 @@ class ReservedTagError(Exception): class BadRoleError(Exception): pass class BadJobTypeError(Exception): pass class UnknownJobError(Exception): pass ccserver/jobs.py 0 → 100644 +258 −0 Original line number Diff line number Diff line #!/usr/bin/env python #coding=utf8 ''' Jobs management on the server. ''' from __future__ import absolute_import import logging import time from datetime import datetime from threading import Thread, Lock from ccserver.exceptions import BadJobTypeError, UnknownJobError class JobCancelError(Exception): ''' Exception used by jobs to stop it when a cancel signal is sent. ''' pass class BaseJob(dict, Thread, object): ''' A base class to define a job. :param manager: the :class:`JobsManager` instance. ''' def __init__(self, manager, *args, **kwargs): # Initialize the inherited classes: dict.__init__(self, *args, **kwargs) Thread.__init__(self) # The manager of this job: self.manager = manager # Define default status: self['status'] = 'pending' self['done'] = False # List of actions to do by the rollback method: self._wayback = [] # Set the thread name: self.name = 'job-%s' % self['id'] def __hash__(self): return self['id'].__hash__() def __setitem__(self, key, value): if key == 'id': raise KeyError('Key %r in read-only.' % key) else: super(BaseJob, self).__setitem__(key, value) def __delitem__(self, key): if key == 'id': raise KeyError('Key %r in read-only.' % key) else: super(BaseJob, self).__delitem__(key) def report(self, status, done=None): ''' Report the status of the job. :param status: the status to set to the job :param done: is the job done, None to keep current value ''' self['status'] = status if done is not None: self['done'] = done def run(self): ''' Run the job itself. ''' try: self.job() except JobCancelError as err: self.rollback('%s' % err) except Exception as err: logging.error('Error while executing job: %s', err) self.rollback('%s' % err) def job(self): ''' Method to override to define the job's behavior. ''' self.report('success', done=True) def checkpoint(self, func=None): ''' Check if job is not cancelled, else raise the CancellJobError. Also add the provided function (optionnal) to the wayback list. :param func: callable to add to the wayback list. ''' if self['done']: raise JobCancelError('Job has been cancelled by user') if func is not None: self._wayback.append(func) def rollback(self, error): ''' Rollback the job. ''' self.report('rollbacking') for func in self._wayback: func() self.report('rollback: %s' % error, done=True) def cancel(self): ''' Cancel the job. ''' self['done'] = True self['status'] = 'cancelling' def export(self): ''' Export the job in a simple dict format. ''' exported = {} for key, val in self.iteritems(): if isinstance(val, datetime): now = datetime.now() dt = now - val val = dt.seconds + dt.days * 86400 exported[key] = val return exported class KillClientJob(BaseJob): ''' A job used to kill connected accounts. Mandatory items: * account: the account login to kill Optional items: * gracetime: time before to kill the user ''' def job(self): gracetime = self.get('gracetime') account = self.get('account') assert account is not None, 'Account not specified' if gracetime is not None: time.sleep(int(gracetime)) self.checkpoint() self.manager.server.kill(account) class JobsManager(object): ''' Manage the current job list. :param server: The :class:`CCServer` instance. ''' JOBS_TYPES = { 'kill': KillClientJob, } def __init__(self, server): # The main job dict, the keys are id of jobs. self._jobs = {} # The server: self.server = server # The id of the next job and it's lock: self._current_id = 1 self._current_id_lock = Lock() def create(self, jtype, **kwargs): ''' Create a new job. :param jtype: the type of the new job :param \*\*kwargs: arguments to pass to the job :raise BadJobTypeError: when invalid jtype is passed ''' jobid = self.get_id() jobtype = JobsManager.JOBS_TYPES.get(jtype) if jobtype is None: raise BadJobTypeError('Invalid job type %r' % jtype) job = jobtype(self, id=jobid, **kwargs) job['since'] = datetime.now() self._jobs[jobid] = job job.start() return job def get_id(self): ''' Get the current id and increment the counter. ''' with self._current_id_lock: jobid = self._current_id self._current_id += 1 return jobid def cancel(self, jobid): ''' Cancel the provided job. ''' job = self._jobs.get(jobid) if job is None: raise UnknownJobError('Jobid %r is unknown' % jobid) else: job.cancel() def purge(self): ''' Purge all done jobs. ''' for job in self._jobs.values(): if job['done']: del self._jobs[job['id']] def iterjobs(self, done=None): ''' Iter over jobs. :param done: If set, iterate over done or not done jobs. ''' for job in self._jobs.itervalues(): if done is not None and job['done'] != done: continue else: yield job Loading
ccserver/ccserver.py +4 −1 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ from ccserver.exceptions import AlreadyRegistered, NotConnectedAccountError from ccserver.orderedset import OrderedSet from ccserver.tql import TqlParser, TqlObject from ccserver.objectsdb import ObjectsDB from ccserver.jobs import JobsManager class CCServer(object): ''' Loading Loading @@ -66,6 +66,9 @@ class CCServer(object): default_handler=WelcomeHandler(self), on_disconnect='on_disconnect') # The jobs manager: self.jobs = JobsManager(self) # Register accounts on the database: self._update_accounts() Loading
ccserver/exceptions.py +8 −0 Original line number Diff line number Diff line Loading @@ -27,3 +27,11 @@ class ReservedTagError(Exception): class BadRoleError(Exception): pass class BadJobTypeError(Exception): pass class UnknownJobError(Exception): pass
ccserver/jobs.py 0 → 100644 +258 −0 Original line number Diff line number Diff line #!/usr/bin/env python #coding=utf8 ''' Jobs management on the server. ''' from __future__ import absolute_import import logging import time from datetime import datetime from threading import Thread, Lock from ccserver.exceptions import BadJobTypeError, UnknownJobError class JobCancelError(Exception): ''' Exception used by jobs to stop it when a cancel signal is sent. ''' pass class BaseJob(dict, Thread, object): ''' A base class to define a job. :param manager: the :class:`JobsManager` instance. ''' def __init__(self, manager, *args, **kwargs): # Initialize the inherited classes: dict.__init__(self, *args, **kwargs) Thread.__init__(self) # The manager of this job: self.manager = manager # Define default status: self['status'] = 'pending' self['done'] = False # List of actions to do by the rollback method: self._wayback = [] # Set the thread name: self.name = 'job-%s' % self['id'] def __hash__(self): return self['id'].__hash__() def __setitem__(self, key, value): if key == 'id': raise KeyError('Key %r in read-only.' % key) else: super(BaseJob, self).__setitem__(key, value) def __delitem__(self, key): if key == 'id': raise KeyError('Key %r in read-only.' % key) else: super(BaseJob, self).__delitem__(key) def report(self, status, done=None): ''' Report the status of the job. :param status: the status to set to the job :param done: is the job done, None to keep current value ''' self['status'] = status if done is not None: self['done'] = done def run(self): ''' Run the job itself. ''' try: self.job() except JobCancelError as err: self.rollback('%s' % err) except Exception as err: logging.error('Error while executing job: %s', err) self.rollback('%s' % err) def job(self): ''' Method to override to define the job's behavior. ''' self.report('success', done=True) def checkpoint(self, func=None): ''' Check if job is not cancelled, else raise the CancellJobError. Also add the provided function (optionnal) to the wayback list. :param func: callable to add to the wayback list. ''' if self['done']: raise JobCancelError('Job has been cancelled by user') if func is not None: self._wayback.append(func) def rollback(self, error): ''' Rollback the job. ''' self.report('rollbacking') for func in self._wayback: func() self.report('rollback: %s' % error, done=True) def cancel(self): ''' Cancel the job. ''' self['done'] = True self['status'] = 'cancelling' def export(self): ''' Export the job in a simple dict format. ''' exported = {} for key, val in self.iteritems(): if isinstance(val, datetime): now = datetime.now() dt = now - val val = dt.seconds + dt.days * 86400 exported[key] = val return exported class KillClientJob(BaseJob): ''' A job used to kill connected accounts. Mandatory items: * account: the account login to kill Optional items: * gracetime: time before to kill the user ''' def job(self): gracetime = self.get('gracetime') account = self.get('account') assert account is not None, 'Account not specified' if gracetime is not None: time.sleep(int(gracetime)) self.checkpoint() self.manager.server.kill(account) class JobsManager(object): ''' Manage the current job list. :param server: The :class:`CCServer` instance. ''' JOBS_TYPES = { 'kill': KillClientJob, } def __init__(self, server): # The main job dict, the keys are id of jobs. self._jobs = {} # The server: self.server = server # The id of the next job and it's lock: self._current_id = 1 self._current_id_lock = Lock() def create(self, jtype, **kwargs): ''' Create a new job. :param jtype: the type of the new job :param \*\*kwargs: arguments to pass to the job :raise BadJobTypeError: when invalid jtype is passed ''' jobid = self.get_id() jobtype = JobsManager.JOBS_TYPES.get(jtype) if jobtype is None: raise BadJobTypeError('Invalid job type %r' % jtype) job = jobtype(self, id=jobid, **kwargs) job['since'] = datetime.now() self._jobs[jobid] = job job.start() return job def get_id(self): ''' Get the current id and increment the counter. ''' with self._current_id_lock: jobid = self._current_id self._current_id += 1 return jobid def cancel(self, jobid): ''' Cancel the provided job. ''' job = self._jobs.get(jobid) if job is None: raise UnknownJobError('Jobid %r is unknown' % jobid) else: job.cancel() def purge(self): ''' Purge all done jobs. ''' for job in self._jobs.values(): if job['done']: del self._jobs[job['id']] def iterjobs(self, done=None): ''' Iter over jobs. :param done: If set, iterate over done or not done jobs. ''' for job in self._jobs.itervalues(): if done is not None and job['done'] != done: continue else: yield job