Commit de63350a authored by Thibault VINCENT's avatar Thibault VINCENT
Browse files

fix: improvements in job manager

parent 710284fb
Loading
Loading
Loading
Loading
+67 −15
Original line number Diff line number Diff line
@@ -16,12 +16,12 @@ from errors import (JobManagerError, JobError, XferJobError,
#fixme
import traceback, sys

class JobManager(Thread, object):
class JobManager(object):
    '''
    '''
    MAX_JID = 65535
    
    def __init__(self, hypervisor):
    def __init__(self):
        '''
        '''
        super(JobManager, self).__init__()
@@ -38,35 +38,40 @@ class JobManager(Thread, object):
        self._jobs_finished = []
    
    def _job_crashed(self, jid):
        '''
        '''
        debug('JobManager._job_crashed: id=`%i`' % jid)
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`', jid)
            job = self.get_job(jid)
            # move job to the crashed queue
            self._jobs_crashed.append(jid)
            # set the cancelled flag to block pending commands
            job._cancelled = True
            # clean any queue that may contain the job
            for queue in [self._jobs_pending, self._jobs_running,
                                                        self._jobs_finished]:
                if jid in queue:
                    queue.remove(jid)
            # FIXME put at the end because it may raise errors
            # execute job self cleanup
            job._cleanup()
    
    def _job_finished(self, jid):
        debug('JobManager._job_finished: id=`%i`' % jid)
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`', jid)
            job = self.get_job(jid)
            # move job to the finished queue
            self._jobs_finished.append(jid)
            # remove from running queue
            self._jobs_running.remove(jid)
            # FIXME put at the end because it may raise errors
            # execute job self cleanup
            job._cleanup()
    
    def run(self):
        '''
        '''
        # FIXME
        while True:
            pass
    
@@ -86,13 +91,43 @@ class JobManager(Thread, object):
        '''
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`' % jid)
            job = self.get_job(jid)
            # job should be running
            if jid not in self._jobs_running:
                raise JobManagerError('job `%i` is not running' % jid)
            # ask the job to stop it's execution
            self._jobs[jid]._cancelled = True
            self._jobs[jid]._cancel()
            # do NOT execute _cleanup() here !!!
    
    def list(self):
        '''
        '''
        with self._mutex:
            pending = []
            running = []
            crashed = []
            finished = []
            orphaned = []
            for jid, job in self._jobs.iteritems():
                s = (jid, job.get_type())
                if jid in self._jobs_pending:
                    pending.append(s)
                elif jid in self._jobs_running:
                    running.append(s)
                elif jid in self._jobs_crashed:
                    crashed.append(s)
                elif jid in self._jobs_finished:
                    finished.append(s)
                else:
                    orphaned.append(s)
            return {
                'pending'   : pending,
                'running'   : running,
                'crashed'   : crashed,
                'finished'  : finished,
                'orphaned'  : orphaned,
                }
    
    def schedule_immediate(self, jid):
        '''
@@ -100,15 +135,14 @@ class JobManager(Thread, object):
        debug('JobManager.schedule_immediate: id=`%i`', jid)
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`' % jid)
            job = self.get_job(jid)
            # job should be pending execution
            if jid not in self._jobs_pending:
                raise JobManagerError('job `%i` not prepared for execution'%jid)
            # execute job
            self._jobs_running.append(jid)
            self._jobs_pending.remove(jid)
            self._jobs[jid].start()
            job.start()
    
    def is_job(self, jid):
        '''
@@ -186,6 +220,7 @@ class JobLog():
    def append(self, message):
        '''
        '''
        # FIXME add limit to buffer
        debug('JobLog.append: %s', message)
        with self._mutex:
            self._items.append((time.time(), message))
@@ -223,6 +258,18 @@ class BaseJob(Thread, object):
        '''
        return False
        
    def _cancel(self):
        '''
        To be implemented in derivate class, or not
        '''
        pass
    
    def _cleanup(self):
        '''
        To be implemented in derivate class, or not
        '''
        pass
    
    def _job(self):
        '''
        To be implemented in derivate class
@@ -230,7 +277,7 @@ class BaseJob(Thread, object):
        self._log.append('nothing to do')
        raise JobError('empty job')
    
    def is_ready():
    def is_ready(self):
        '''
        '''
        return self._ready_run is True
@@ -245,6 +292,11 @@ class BaseJob(Thread, object):
        '''
        return str(self._log)
    
    def get_type(self):
        '''
        '''
        return self._type
    
    def start_now(self):
        '''
        '''
@@ -261,7 +313,7 @@ class BaseJob(Thread, object):
            self._log.append("finished")
            self._manager._job_finished(self._jid)
        except Exception as err:
            self._log.append("crashed: `%r`: `%s`" % (err, err))
            self._log.append("*** CRASHED: `%r`: `%s`" % (err, err))
            self._manager._job_crashed(self._jid)
    
    def prepare(self):