Skip to content
Snippets Groups Projects
jobs.py 12.8 KiB
Newer Older
# -*- coding: utf-8 -*-

import os, time, socket
from datetime import datetime
from threading import Thread, RLock
from logging import debug
from utils import RWLock
from errors import (JobManagerError, JobError, XferJobError, ReceiveFileJobError,
            SendFileJobError)

#fixme
import traceback, sys

class JobManager(Thread, object):
    '''
    '''
    MAX_JID = 65535
    
    def __init__(self, hypervisor):
        '''
        '''
        super(JobManager, self).__init__()
        self._mutex = RLock()
        self._next_jid = 1
        self._jobs = {}
        self._jobs_pending = []
        self._jobs_running = []
        self._jobs_crashed = []
        self._jobs_canceled = []
        self._jobs_finished = []
    
    def _job_crashed(self, jid):
        '''
        '''
        debug('JobManager._job_crashed: id=`%i`' % jid)
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`', jid)
            # move job to the crashed queue
            self._jobs_crashed.append(jid)
            # clean any queue that may contain the job
            for queue in [self._jobs_pending, self._jobs_running,
                                    self._jobs_canceled, self._jobs_finished]:
                if jid in queue:
                    queue.remove(jid)
    
    def _job_finished(self, jid):
        debug('JobManager._job_finished: id=`%i`' % jid)
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`', jid)
            # move job to the finished queue
            self._jobs_finished.append(jid)
            # remove from running queue
            self._jobs_running.remove(jid)
    
    def append(self, job):
        '''
        '''
        with self._mutex:
            jid = self._next_jid
            self._next_jid += 1
            self._jobs[jid] = job
            self._jobs_pending.append(jid)
        debug('JobManager.append: new job pending with id `%i`', jid)
        return jid
    
    def run(self):
        '''
        '''
        while True:
            pass
    
    def schedule_immediate(self, jid):
        '''
        '''
        debug('JobManager.schedule_immediate: id=`%i`', jid)
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`' % jid)
            # job should be pending execution
            if jid not in self._jobs_pending:
                raise JobManagerError('job `%i` not prepared for execution' %
                                                                            jid)
            # execute job
            self._jobs_running.append(jid)
            self._jobs_pending.remove(jid)
            self._jobs[jid].start()
    
    def get_job(self, jid):
        '''
        '''
        with self._mutex:
            if self.is_job(jid):
                return self._jobs[jid]
            else:
                raise JobManagerError('unknown job ID `%i`' % jid)
    
    def is_job(self, jid):
        '''
        '''
        with self._mutex:
            return jid in self._jobs
    
    def is_job_running(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_running
    
    def is_job_finished(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_finished


class JobLog():
    '''
    '''
    def __init__(self):
        '''
        '''
        self._items = []
        self._mutex = RLock()
    
    def __str__(self):
        '''
        '''
        res = ""
        for tup in self._items:
            res += "[%s] %s\n" % (datetime.fromtimestamp(tup[0]).strftime(
                                                "%Y-%m-%d %H:%M:%S"), tup[1])
        return res
    
    def append(self, message):
        '''
        '''
        debug('JobLog.append: %s', message)
        with self._mutex:
            self._items.append((time.time(), message))


class BaseJob(Thread, object):
    '''
    '''
    def __init__(self, manager):
        '''
        '''
        super(BaseJob, self).__init__()
        self._manager = manager
        self._log = JobLog()
        # job state
        self._type = self.__class__.__name__
        self._validated = None
        self._ready_run = False
        # store job in manager
        self._jid = self._manager.append(self)
        self._log.append('stored with id `%i` and type `%s`' % (self._jid,
                                                                    self._type))
    
    def _validate(self):
        '''
        To be implemented in derivate class
        '''
        return False
    
    def _prepare(self):
        '''
        To be implemented in derivate class
        '''
        return False
    
    def _job(self):
        '''
        To be implemented in derivate class
        '''
        self._log.append('nothing to do')
        raise JobError('empty job')
    
    def is_ready():
        '''
        '''
        return self._ready_run is True
    
    def get_id(self):
        '''
        '''
        return self._jid
    
    def get_log(self):
        '''
        '''
        return str(self._log)
    
    def start_now(self):
        '''
        '''
        self._manager.schedule_immediate(self._jid)
    
    def run(self):
        '''
        '''
        try:
            if not self._ready_run:
                self.prepare()
            self._log.append("running")
            self._job()
            self._log.append("finished")
            self._manager._job_finished(self._jid)
        except Exception as err:
            self._log.append("crashed: `%r`: `%s`" % (err, err))
            self._manager._job_crashed(self._jid)
    
    def prepare(self):
        '''
        '''
        if not self._ready_run:
            # run validation if not done yet
            if self._validated is None:
                try:
                    self._validated = self._validate()
                except:
                    self._validated = False
                finally:
                    # do not continue if validation fails
                    if not self._validate:
                        self._log.append('validation failed')
                        raise JobError('validation failed')
            # run validation in derivated Job class
            try:
                ready = self._prepare()
            except:
                ready = False
            finally:
                self._ready_run = ready
                # do not continue if preparation fails
                if not ready:
                    self._log.append('preparation failed')
                    raise JobError('preparation failed')
                else:
                    self._log.append('ready for execution')


class BlowJob(BaseJob):
    '''
    '''
    def __init__(self, manager):
        raise Exception('I LOVE MY KITTY')


class XferJob(BaseJob):
    '''
    '''
    TYPES = [
        'SendFileJob',
        'ReceiveFileJob',
    ]
    
    # tcp port range used for transferts
    TCP_PORT_MIN = 42000
    TCP_PORT_MAX = 42999
    
    def __init__(self, manager, path):
        '''
        '''
        self._path = path
        self._csum = None
        super(XferJob, self).__init__(manager)
    
    def _validate(self):
        '''
        '''
        return self._type in self.TYPES
    
    def get_checksum(self):
        # FIXME lol
        return '625f4986bccba2dbd70cdd9ef80af787'


class SendFileJob(XferJob):
    '''
    '''
    def __init__(self, manager, path, remote_host, remote_port):
        '''
        '''
        self._host = remote_host
        self._port = int(remote_port)
        super(SendFileJob, self).__init__(manager, path)
    
    def _validate(self):
        '''
        '''
        return ( super(SendFileJob, self)._validate() 
                    and os.path.isfile(self._path)
                    and os.access(self._path, os.F_OK | os.R_OK )
                    and self._port >= self.TCP_PORT_MIN
                    and self._port <= self.TCP_PORT_MAX
                    and isinstance(self._host, str))
    
    def _prepare(self):
        '''
        '''
        # create socket
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        return bool(self._sock)
    
    def _job(self):
        '''
        '''
        fd = None
        try:
            # try to connect
            self._log.append('connecting to `%s`:`%i`' % (self._host,
                                                                    self._port))
            self._sock.connect((self._host, self._port))
            self._log.append('connected')
            # open file
            self._log.append('opening source file `%s`' % self._path)
            fd = open(self._path, 'rb')
            # read and send file
            self._log.append('sending data')
            while True:
                buf = fd.read(4096)
                if not buf:
                    break
                self._sock.send(buf)
        except:
            traceback.print_exc(file=sys.stdout)
            raise SendFileJobError('FIXME error reporting')
        finally:
            # close socket and files anyway
            try:
                self._sock.close()
            except:
                pass
            if fd:
                try:
                    fd.close()
                except:
                    pass


class ReceiveFileJob(XferJob):
    '''
    '''
    _ports_used = []
    _ports_used_mutex = RWLock()
    
    def __init__(self, manager, path, create=False):
        '''
        '''
        self._create = create
        self._port = None
        super(ReceiveFileJob, self).__init__(manager, path)
    
    def _validate(self):
        '''
        '''
        return ( super(ReceiveFileJob, self)._validate() and
                (
                    (   not self._create and
                        os.path.isfile(self._path) and
                        os.access(self._path, os.F_OK | os.W_OK ))
                    or
                    (   self._create and
                        os.path.isdir(os.path.dirname(self._path)) and
                        os.access(os.path.dirname(self._path), os.F_OK |os.W_OK)
                    )
                ))
    
    def _prepare(self):
        '''
        '''
        # create socket
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # bind socket to a port
        for port in range(self.TCP_PORT_MIN, self.TCP_PORT_MAX):
            # find a port to bind to
            try:
                with self._ports_used_mutex.write:
                    if port not in self._ports_used:
                        self._sock.bind(('', port))
                        self._port = port
                        self._ports_used.append(port)
                        break
            except socket.error:
                pass
        if not self._port:
            self._log.append('no port to listen to')
            raise ReceiveFileJobError('no port to listen to')
        return True
    
    def _job(self):
        '''
        '''
        fd = None
        try:            
            self._sock.listen(1)
            # wait for connection
            self._log.append('waiting for connection on port `%i`' % self._port)
            conn, addr = self._sock.accept()
            if conn:
                self._log.append('client connected from `%s`')
                # open file
                self._log.append('opening destination `%s`' % self._path)
                fd = open(self._path, 'wb')
                # write to the file
                self._log.append('receiving data')
                while True:
                    buf = conn.recv(4096)
                    if not buf:
                        break
                    fd.write(buf)
                self._log.append('remote host closed connection')
            else:
                self._log.append('no connection received, aborting reception')
        except:
            traceback.print_exc(file=sys.stdout)
            raise ReceiveFileJobError('FIXME error reporting')
        finally:
            # close socket and files anyway
            try:
                self._sock.close()
            except:
                pass
            if fd:
                try:
                    fd.close()
                except:
                    pass

    def get_port(self):
        '''
        '''
        return int(self._port)


if __name__ == '__main__':
    jm = JobManager(None)
    jm.start()
    
    #r = ReceiveFileJob(jm, '/tmp/receive')
    #r.start_now()
    
    s = SendFileJob(jm, '/etc/hosts', '127.0.0.1', '42000')
    s.start_now()