Skip to content
jobs.py 8.41 KiB
Newer Older
Antoine Millet's avatar
Antoine Millet committed
#!/usr/bin/env python
#coding=utf8

'''
Jobs management on the server.
'''

from __future__ import absolute_import

import logging
import time
from datetime import datetime
from threading import Thread, Lock

from ccserver.exceptions import BadJobTypeError, UnknownJobError

class JobCancelError(Exception):
    '''
    Exception used by jobs to stop it when a cancel signal is sent.
    '''
    pass


class BaseJob(dict, Thread, object):

    '''
    A base class to define a job.

Antoine Millet's avatar
Antoine Millet committed
    The standards job items are:

     * id: id of the job
     * status: message explaining the current job status
     * done: True if the job is done
     * cancelled: True if job has been cancelled by user
     * created: job date of creation
     * ended: job date of end (or None if done = False)
     * duration: duration in seconds of the job (processed on export)
     * author: author login of the job

Antoine Millet's avatar
Antoine Millet committed
    :param manager: the :class:`JobsManager` instance.
    '''

    def __init__(self, manager, *args, **kwargs):
        # Initialize the inherited classes:
        dict.__init__(self, *args, **kwargs)
        Thread.__init__(self)

        # The manager of this job:
        self.manager = manager

        # Define default job properties:
Antoine Millet's avatar
Antoine Millet committed
        self['status'] = 'pending'
        self['done'] = False
        self['cancelled'] = False
        self['created'] = datetime.now()
        self['ended'] = None
        self['duration'] = 0

        #~ assert self.get('author') is not None, 'author is not defined'
Antoine Millet's avatar
Antoine Millet committed

        # List of actions to do by the rollback method:
        self._wayback = []

        # Set the thread name:
        self.name = 'job-%s' % self['id']

    def __hash__(self):
        return self['id'].__hash__()

    def __setitem__(self, key, value):
        if key == 'id':
            raise KeyError('Key %r in read-only.' % key)
        else:
            super(BaseJob, self).__setitem__(key, value)

    def __delitem__(self, key):
        if key == 'id':
            raise KeyError('Key %r in read-only.' % key)
        else:
            super(BaseJob, self).__delitem__(key)

    def report(self, status, done=None):
        '''
        Report the status of the job.

        :param status: the status to set to the job
        :param done: is the job done, None to keep current value
        '''

        self['status'] = status
        if done is not None:
            self['ended'] = datetime.now()
Antoine Millet's avatar
Antoine Millet committed
            self['done'] = done

    def run(self):
        '''
        Run the job itself.
        '''

        try:
            self.job()
        except JobCancelError as err:
            self._rollback('%s' % err)
Antoine Millet's avatar
Antoine Millet committed
        except Exception as err:
Antoine Millet's avatar
Antoine Millet committed
            logging.error('Error while executing job: %s, %r', err, err)
            self._rollback('%s' % err)
        else:
            self.report('success', done=True)
Antoine Millet's avatar
Antoine Millet committed

    def job(self):
        '''
        Method to override to define the job's behavior.
        '''

Antoine Millet's avatar
Antoine Millet committed
        pass
Antoine Millet's avatar
Antoine Millet committed

    def checkpoint(self, func=None):
        '''
        Check if job is not cancelled, else raise the CancellJobError. Also
        add the provided function (optionnal) to the wayback list.

        :param func: callable to add to the wayback list.
        '''

        if self['cancelled']:
Antoine Millet's avatar
Antoine Millet committed
            raise JobCancelError('Job has been cancelled by user')

        if func is not None:
            self._wayback.append(func)

    def _rollback(self, error):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Rollback the job.
        '''

        self.report('rollbacking')
Antoine Millet's avatar
Antoine Millet committed
        try:
            for func in self._wayback:
                func()
        except Exception as err:
            self.report('rollback failed: %s' % err, done=True)
        else:
            self.report('cancelled: %s' % error, done=True)
Antoine Millet's avatar
Antoine Millet committed

    def cancel(self):
        '''
        Cancel the job.

        .. note::
            You can override this method to trigger an action when user cancel
            the job.
Antoine Millet's avatar
Antoine Millet committed
        '''

        if self['done']:
            raise JobError('Job is done')
        if self['cancelled']:
            raise JobError('Job is already cancelled')
        self['cancelled'] = True
        self.report('cancelling')
Antoine Millet's avatar
Antoine Millet committed

Antoine Millet's avatar
Antoine Millet committed
        '''
        Export the job in a simple dict format.
        '''

        exported = {}
        for key, val in self.iteritems():
            if key.startswith('_') or (props is not None and key in props):
                continue
Antoine Millet's avatar
Antoine Millet committed
            if isinstance(val, datetime):
                now = datetime.now()
                dt = now - val
                val = dt.seconds + dt.days * 86400

            if key == 'duration':
                if self['done']:
                    dt = self['ended'] - self['started']
                    val = dt.seconds + dt.days * 86400
                else:
                    now = datetime.now()
                    dt = now - self['started']
                    val = dt.seconds + dt.days * 86400

            exported[key] = str(val)
Antoine Millet's avatar
Antoine Millet committed
        return exported


class KillClientJob(BaseJob):

    '''
    A job used to kill connected accounts.

    Mandatory items:
     * account: the account login to kill

    Optional items:
     * gracetime: time before to kill the user
    '''

    def job(self):
        gracetime = self.get('gracetime')
        account = self.get('account')
        assert account is not None, 'Account not specified'

        if gracetime is not None:
            time.sleep(int(gracetime))

        self.checkpoint()

        self.manager.server.kill(account)


class KillOldCliJob(BaseJob):

    '''
    Typically an hidden job used to kill clients who are connected/idle since
    too much time.

    Mandatory items:
     * maxcon: maximum connection time in minutes
     * maxidle: maximum idle time in minutes

    Optional items:
     * delay: delay in secondes between two checks (default 1m)
    '''

    DEFAULT_DELAY = 60

    def job(self):
        maxcon = self.get('maxcon')
        assert maxcon is not None, 'maxcon is None'
        maxidle = self.get('maxidle')
        assert maxidle is not None, 'maxidle is None'
        delay = self.get('delay', self.DEFAULT_DELAY)
        
        while True:
            self.checkpoint()
            for client in self.manager.server.iter_connected_role('cli'):
                if client.get_uptime() > (maxcon * 60):
                    self.manager.server.kill(client.login)
                #TODO: handle idleing.
            time.sleep(delay)


Antoine Millet's avatar
Antoine Millet committed
class JobsManager(object):

    '''
    Manage the current job list.

    :param server: The :class:`CCServer` instance.
    '''

    JOBS_TYPES = {
        'kill': KillClientJob,
        'kill_oldcli': KillOldCliJob,
Antoine Millet's avatar
Antoine Millet committed
    }

    def __init__(self, server):

        # The main job dict, the keys are id of jobs.
        self._jobs = {}

        # The server:
        self.server = server

        # The id of the next job and it's lock:
        self._current_id = 1
        self._current_id_lock = Lock()

    def create(self, jtype, **kwargs):
        '''
        Create a new job.

        :param jtype: the type of the new job
        :param \*\*kwargs: arguments to pass to the job
        :raise BadJobTypeError: when invalid jtype is passed
        '''

        jobid = self.get_id()
        jobtype = JobsManager.JOBS_TYPES.get(jtype)

        if jobtype is None:
            raise BadJobTypeError('Invalid job type %r' % jtype)
        
        job = jobtype(self, id=jobid, **kwargs)
        self._jobs[jobid] = job
        job.daemon = True
Antoine Millet's avatar
Antoine Millet committed
        job.start()
        return job

    def get_id(self):
        '''
        Get the current id and increment the counter.
        '''

        with self._current_id_lock:
            jobid = self._current_id
            self._current_id += 1

        return jobid

    def cancel(self, jobid):
        '''
        Cancel the provided job.
        '''

        job = self._jobs.get(jobid)

        if job is None:
            raise UnknownJobError('Jobid %r is unknown' % jobid)
        else:
            job.cancel()

    def purge(self):
        '''
        Purge all done jobs.
        '''

        for job in self._jobs.values():
            if job['done']:
                del self._jobs[job['id']]

    def iterjobs(self, show_done=True, show_running=True):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Iter over jobs.

        :param done: If set, iterate over done or not done jobs.
        '''

        for job in self._jobs.itervalues():
            if (show_done and job['done'] or show_running and not job['done']
                and not job.get('_hidden')):
Antoine Millet's avatar
Antoine Millet committed
                yield job