Skip to content
Snippets Groups Projects
jobs.py 26.1 KiB
Newer Older
# -*- coding: utf-8 -*-

Thibault VINCENT's avatar
Thibault VINCENT committed
import os, time, socket, select
from hashlib import md5
from datetime import datetime
Thibault VINCENT's avatar
Thibault VINCENT committed
from threading import Lock, RLock, Thread, Event
from logging import debug
from utils import RWLock
from drbd import DRBD
from errors import (JobManagerError, JobError, XferJobError, ReceiveFileJobError,
            SendFileJobError, LVMError, DRBDError)
Thibault VINCENT's avatar
Thibault VINCENT committed
from errors import (JobManagerError, JobError, XferJobError, 
            ReceiveFileJobError, SendFileJobError, DrbdCopyJobError, LVMError,
            DRBDError, TCPTunnelJobError)

#fixme
import traceback, sys

class JobManager(Thread, object):
    '''
    '''
    MAX_JID = 65535
    
    def __init__(self, hypervisor):
        '''
        '''
        super(JobManager, self).__init__()
        self._mutex = RLock()
        self._next_jid = 1
        # job objects indexed by job ID
        self._jobs = {}
        # new jobs IDs
        self._jobs_pending = []
        # running jobs IDs
        self._jobs_running = []
        # dead jobs IDs
        self._jobs_crashed = []
        self._jobs_finished = []
    
    def _job_crashed(self, jid):
        '''
        '''
        debug('JobManager._job_crashed: id=`%i`' % jid)
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`', jid)
            # move job to the crashed queue
            self._jobs_crashed.append(jid)
            # clean any queue that may contain the job
            for queue in [self._jobs_pending, self._jobs_running,
                                                        self._jobs_finished]:
                if jid in queue:
                    queue.remove(jid)
    
    def _job_finished(self, jid):
        debug('JobManager._job_finished: id=`%i`' % jid)
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`', jid)
            # move job to the finished queue
            self._jobs_finished.append(jid)
            # remove from running queue
            self._jobs_running.remove(jid)
    
    def run(self):
        '''
        '''
        while True:
            pass
    
    def append(self, job):
        '''
        '''
        with self._mutex:
            jid = self._next_jid
            self._next_jid += 1
            self._jobs[jid] = job
            self._jobs_pending.append(jid)
        debug('JobManager.append: new job pending with id `%i`', jid)
        return jid
    
    def cancel(self, jid):
        '''
        '''
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`' % jid)
            # job should be running
            if jid not in self._jobs_running:
                raise JobManagerError('job `%i` is not running' % jid)
            # ask the job to stop it's execution
            self._jobs[jid]._cancelled = True
    
    def schedule_immediate(self, jid):
        '''
        '''
        debug('JobManager.schedule_immediate: id=`%i`', jid)
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`' % jid)
            # job should be pending execution
            if jid not in self._jobs_pending:
                raise JobManagerError('job `%i` not prepared for execution'%jid)
            # execute job
            self._jobs_running.append(jid)
            self._jobs_pending.remove(jid)
            self._jobs[jid].start()
    
    def is_job(self, jid):
        '''
        '''
        with self._mutex:
            return jid in self._jobs
    
    def get_job(self, jid):
        '''
        '''
        with self._mutex:
            if self.is_job(jid):
                return self._jobs[jid]
            else:
                raise JobManagerError('unknown job ID `%i`' % jid)
    
    def is_job_pending(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_pending
    
    def is_job_running(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_running
    
    def is_job_crashed(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_crashed
    
    def is_job_finished(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_finished
    
    def is_job_cancelled(self, jid):
        '''
        '''
        with self._mutex:
            job = self.get_job(jid)
            return job._cancelled is True


class JobLog():
    '''
    '''
    def __init__(self):
        '''
        '''
        self._items = []
        self._mutex = RLock()
    
    def __str__(self):
        '''
        '''
        res = ""
        for tup in self._items:
            res += "[%s] %s\n" % (datetime.fromtimestamp(tup[0]).strftime(
                                                "%Y-%m-%d %H:%M:%S"), tup[1])
        return res
    
    def append(self, message):
        '''
        '''
        debug('JobLog.append: %s', message)
        with self._mutex:
            self._items.append((time.time(), message))


class BaseJob(Thread, object):
    '''
    '''
    def __init__(self, manager):
        '''
        '''
        super(BaseJob, self).__init__()
        self._manager = manager
        self._log = JobLog()
        # job state
        self._type = self.__class__.__name__
        self._validated = None
        self._ready_run = False
        self._cancelled = False
        # store job in manager
        self._jid = self._manager.append(self)
        self.name = '%s-%i' % (self._type, self._jid)
        self._log.append('stored with id `%i` and type `%s`' % (self._jid,
                                                                    self._type))
    
    def _validate(self):
        '''
        To be implemented in derivate class
        '''
        return False
    
    def _prepare(self):
        '''
        To be implemented in derivate class
        '''
        return False
    
    def _job(self):
        '''
        To be implemented in derivate class
        '''
        self._log.append('nothing to do')
        raise JobError('empty job')
    
    def is_ready():
        '''
        '''
        return self._ready_run is True
    
    def get_id(self):
        '''
        '''
        return self._jid
    
    def get_log(self):
        '''
        '''
        return str(self._log)
    
    def start_now(self):
        '''
        '''
        self._manager.schedule_immediate(self._jid)
    
    def run(self):
        '''
        '''
        try:
            if not self._ready_run:
                self.prepare()
            self._log.append("running")
            self._job()
            self._log.append("finished")
            self._manager._job_finished(self._jid)
        except Exception as err:
            self._log.append("crashed: `%r`: `%s`" % (err, err))
            self._manager._job_crashed(self._jid)
    
    def prepare(self):
        '''
        '''
        if not self._ready_run:
            # run validation if not done yet
            if self._validated is None:
                try:
                    self._validated = self._validate()
                except:
                    self._validated = False
                finally:
                    # do not continue if validation fails
                    if not self._validate:
                        self._log.append('validation failed')
                        raise JobError('validation failed')
            # run validation in derivated Job class
            try:
                ready = self._prepare()
            except:
                ready = False
            finally:
                self._ready_run = ready
                # do not continue if preparation fails
                if not ready:
                    self._log.append('preparation failed')
                    raise JobError('preparation failed')
                else:
                    self._log.append('ready for execution')


class XferJob(BaseJob):
    '''
    '''
    TYPES = [
        'SendFileJob',
        'ReceiveFileJob',
    ]
    
    # tcp port range used for transferts
    TCP_PORT_MIN = 42000
    TCP_PORT_MAX = 42999
    
    def __init__(self, manager, path):
        '''
        '''
        self._path = path
        self._md5 = md5()
        self._checksum = None
        super(XferJob, self).__init__(manager)
    
    def _validate(self):
        '''
        '''
        return self._type in self.TYPES
    
    def get_checksum(self):
        return self._checksum


class SendFileJob(XferJob):
    '''
    '''
    def __init__(self, manager, path, remote_host, remote_port):
        '''
        '''
        self._host = remote_host
        self._port = int(remote_port)
        super(SendFileJob, self).__init__(manager, path)
    
    def _validate(self):
        '''
        '''
        return ( super(SendFileJob, self)._validate() 
                    and os.path.isfile(self._path)
                    and os.access(self._path, os.F_OK | os.R_OK )
                    and self._port >= self.TCP_PORT_MIN
                    and self._port <= self.TCP_PORT_MAX
                    and isinstance(self._host, str))
    
    def _prepare(self):
        '''
        '''
        # create socket
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        return bool(self._sock)
    
    def _job(self):
        '''
        '''
        fd = None
        try:
            # try to connect
            self._log.append('connecting to `%s`:`%i`' % (self._host,
                                                                    self._port))
            self._sock.settimeout(10)
            retry = 0
            connected = False
            while retry < 6 and not connected and not self._cancelled:
                retry += 1
                try:
                    self._sock.connect((self._host, self._port))
                except socket.timeout:
                    pass
                except socket.error:
                    time.sleep(1)
                    pass
                else:
                    connected = True
            if self._cancelled:
                self._log.append('stopped connect attempt (job was cancelled)')
            elif connected:
                self._log.append('connected')
                # open file
                self._log.append('opening source file `%s`' % self._path)
                fd = open(self._path, 'rb')
                # read and send file
                self._log.append('sending data')
                while True and not self._cancelled:
                    buf = fd.read(4096)
                    if not buf:
                        break
                    self._sock.send(buf)
                    self._md5.update(buf)
                # report transfert completion
                if self._cancelled:
                    self._log.append('transfert aborted (job was cancelled)')
                else:
                    self._log.append('no more data to send')
                    self._checksum = self._md5.hexdigest()
                    self._md5 = None
                    self._log.append('transfered data checksum = `%s`' %
                                                                self._checksum)
            else:
                self._log.append('remote host did not accept connection, '
                                                        'aborting emission')
        except:
            #traceback.print_exc(file=sys.stdout)
            raise SendFileJobError('FIXME error reporting')
        finally:
            # close socket and files anyway
            try:
                self._sock.close()
            except:
                pass
            try:
                fd.close()
            except:
                pass


class ReceiveFileJob(XferJob):
    '''
    '''
    _ports_used = []
    _ports_used_mutex = RWLock()
    
    def __init__(self, manager, path, create=False):
        '''
        '''
        self._create = create
        self._port = None
        super(ReceiveFileJob, self).__init__(manager, path)
    
    def _validate(self):
        '''
        '''
        return ( super(ReceiveFileJob, self)._validate() and
                (
                    (   not self._create and
                        os.path.isfile(self._path) and
                        os.access(self._path, os.F_OK | os.W_OK ))
                    or
                    (   self._create and
                        os.path.isdir(os.path.dirname(self._path)) and
                        os.access(os.path.dirname(self._path), os.F_OK |os.W_OK)
                    )
                ))
    
    def _prepare(self):
        '''
        '''
        # create socket
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # bind socket to a port
        for port in range(self.TCP_PORT_MIN, self.TCP_PORT_MAX):
            # find a port to bind to
            try:
                with self._ports_used_mutex.write:
                    if port not in self._ports_used:
                        self._sock.bind(('', port))
                        self._port = port
                        self._ports_used.append(port)
                        break
            except socket.error:
                pass
        if not self._port:
            msg = 'no port to listen to'
            self._log.append(msg)
            raise ReceiveFileJobError(msg)
        return True
    
    def _job(self):
        '''
        '''
        fd = None
        try:            
            self._sock.listen(1)
            self._sock.settimeout(1)
            # wait for connection
            self._log.append('waiting for connection on port `%i`' % self._port)
            retry = 0
            conn = None
            while retry < 60 and not conn and not self._cancelled:
                retry += 1
                try:
                    conn, addr = self._sock.accept()
                except socket.timeout as err:
                    pass
            if self._cancelled:
                self._log.append('stopped waiting (job was cancelled)')
            if conn:
                self._log.append('client connected from `%s`')
                # open file
                self._log.append('opening destination `%s`' % self._path)
                fd = open(self._path, 'wb')
                # write to the file
                self._log.append('receiving data')
                while True and not self._cancelled:
                    buf = conn.recv(4096)
                    if not buf:
                        break
                    fd.write(buf)
                    self._md5.update(buf)
                # report transfert completion
                if self._cancelled:
                    self._log.append('transfert aborted (job was cancelled)')
                else:
                    self._log.append('remote host closed connection')
                    self._checksum = self._md5.hexdigest()
                    self._md5 = None
                    self._log.append('transfered data checksum = `%s`' %
                                                                self._checksum)
            else:
                self._log.append('no connection received, aborting reception')
        except:
            #traceback.print_exc(file=sys.stdout)
            raise ReceiveFileJobError('FIXME error reporting')
        finally:
            # close socket and files anyway
            try:
                self._sock.close()
            except:
                pass
            with self._ports_used_mutex.write:
                    if self._port in self._ports_used:
                        self._ports_used.remove(port)
            try:
                fd.close()
            except:
                pass

    def get_port(self):
        '''
        '''
        return int(self._port)


Thibault VINCENT's avatar
Thibault VINCENT committed
class TCPTunnelJob(BaseJob):
    '''
    '''
    # tcp port range used for tunnels
    TCP_PORT_MIN = 44000
    TCP_PORT_MAX = 45999
    # global port list
    _ports_used = []
    _ports_used_mutex = RWLock()
Thibault VINCENT's avatar
Thibault VINCENT committed
    def __init__(self, manager):
        '''
        '''
        super(TCPTunnelJob, self).__init__(manager)
        self._mutex = Lock()
        self._event_client_conf = Event()
        self._event_server_conf = Event()
        self._event_server_bound = Event()
        self._server_port = None
        # set here because of _cancel()
        self._sock_client = None
        self._sock_server_listen = None
        self._sock_server = None
    
    def _cancel(self):
        '''
        '''
        # explicitely close connections, to release blocking socket calls
        if self._sock_client:
            try:
                self._sock_client.close()
            except: pass
        if self._sock_server_listen:
            try:
                self._sock_server_listen.close()
            except: pass
        if self._sock_server:
            try:
                self._sock_server.close()
            except: pass
Thibault VINCENT's avatar
Thibault VINCENT committed
    def _cleanup(self):
        '''
        '''
        # unlock events
        self._event_client_conf.set()
        self._event_server_conf.set()
        self._event_server_bound.set()
        # destroy sockets
        self._sock_client = None
        self._sock_server_listen = None
        self._sock_server = None
    
    def _validate(self):
        '''
        '''
        return True # FIXME maybe
    
    def _prepare(self):
        '''
        '''
        return True # FIXME maybe
    
    def _job(self):
        '''
        '''
        try:
            # wait for tunnel configuration for server
            if not self._cancelled:
                self._log.append('waiting for tunnel server configuration')
                self._event_server_conf.wait()
            
            ## setup server
            # create listening socket
            self._sock_server_listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            # bind listening socket to a port
            self._server_port = None
            for port in range(self.TCP_PORT_MIN, self.TCP_PORT_MAX):
                # find a port to bind to
                try:
                    with self._ports_used_mutex.write:
                        if port not in self._ports_used:
                            addr = '127.0.0.1' if self._server_local else '0.0.0.0'
                            self._sock_server_listen.bind((addr, port)) #FIXME
                            self._server_port = port
                            self._ports_used.append(port)
                            break
                except socket.error:
                    pass
            if not self._server_port:
                msg = 'no port to listen to'
                self._log.append(msg)
                raise TCPTunnelJobError(msg)
            else:
                self._event_server_bound.set()
                self._log.append('server bound to `%s:%d`' % (addr,
                                                        self._server_port))
            
            # wait for tunnel configuration for client
            if not self._cancelled:
                self._log.append('waiting for tunnel client configuration')
                self._event_client_conf.wait()
            
            # loop and reconnect forever untill cancellation
            while not self._cancelled:
                
                ## wait for a client to connect
                self._log.append('waiting for a client to connect')
                self._sock_server_listen.settimeout(1)
                self._sock_server_listen.listen(1)
                self._sock_server = None
                while not self._cancelled and self._sock_server is None:
                    try:
                        self._sock_server, local_addr = self._sock_server_listen.accept()
                        self._log.append('client connected from `%s`' % (local_addr,))
                    except socket.timeout:
                        pass
                if self._sock_server is None:
                    return # job cancelled
                
                ## connect to the endpoint
                timeout = 15 # FIXME must be argument
                retry_interval = 5
                self._sock_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self._sock_client.settimeout(timeout)
                connected = False
                while not self._cancelled and not connected:
                    try:
                        self._log.append('connecting to `%s`' % (self._endpoint,))
                        self._sock_client.connect(self._endpoint)
                        connected = True
                    except socket.timeout:
                        self._log.append('no response after %d seconds' % timeout)
                    except socket.error:
                        self._log.append('socket error (connection refused ?)')
                    time.sleep(retry_interval)
                if self._cancelled:
                    return
                if not connected:
                    continue
                
                ## tunnel data between sockets
                # init socket poller
                mask_ro = select.EPOLLIN ^ select.EPOLLERR ^ select.EPOLLHUP
                mask_rw = mask_ro ^ select.EPOLLOUT
                poller = select.epoll()
                poller.register(self._sock_server, mask_ro)
                poller.register(self._sock_client, mask_ro)
                # forward data until a connection is closed
                buf_server = ''
                buf_client = ''
                connected = True
                while not self._cancelled and connected:
                    # wait for events on both sockets
                    poll = poller.poll()
                    ## process events for read/write operations only
                    for fd, events in poll:
                        # events are for server socket
                        if fd == self._sock_server.fileno():
                            # read available
                            if events & select.EPOLLIN:
                                # read incoming data
                                try:
                                    read = self._sock_server.recv(4096)
                                except socket.error:
                                    connected = False
                                buf_server += read
                                # set the other socket to notify us when it's
                                # available for writing
                                if len(buf_server):
                                    poller.modify(self._sock_client, mask_rw)
                            # write available
                            if events & select.EPOLLOUT:
                                # try to send the whole buffer
                                try:
                                    sent = self._sock_server.send(buf_client)
                                except socket.error:
                                    connected = False
                                # drop sent data from the buffer
                                buf_client = buf_client[sent:]
                                # if the buffer becomes empty, stop write polling
                                if not len(buf_client):
                                    poller.modify(self._sock_server, mask_ro)
                        # events for client socket
                        else:
                            # read available
                            if events & select.EPOLLIN:
                                # read incoming data
                                try:
                                    read = self._sock_client.recv(4096)
                                except socket.error:
                                    connected = False
                                buf_client += read
                                # set the other socket to notify us when it's
                                # available for writing
                                if len(buf_client):
                                    poller.modify(self._sock_server, mask_rw)
                            # write available
                            if events & select.EPOLLOUT:
                                # try to send the whole buffer
                                try:
                                    sent = self._sock_client.send(buf_server)
                                except socket.error:
                                    connected = False
                                # drop sent data from the buffer
                                buf_server = buf_server[sent:]
                                # if the buffer becomes empty, stop write polling
                                if not len(buf_server):
                                    poller.modify(self._sock_client, mask_ro)
                if connected is False:
                    self._log.append('disconnected')
                try:
                    self._sock_server.close()
                except: pass
                try:
                    self._sock_client.close()
                except: pass
        except Exception as err:
            traceback.print_exc()
            raise err
    
    def tunnel_get_server_port(self):
        '''
        '''
        self._event_server_bound.wait()
        return self._server_port
    
    def tunnel_connect(self, endpoint):
        '''
        '''
        with self._mutex:
            if not self._event_client_conf.is_set():
                self._endpoint = endpoint
                self._event_client_conf.set()
            else:
                raise TCPTunnelJobError('remote endpoint already set')
    
    def tunnel_listen(self, local):
        '''
        '''
        with self._mutex:
            if not self._event_server_conf.is_set():
                self._server_local = local is True
                self._event_server_conf.set()
            else:
                raise TCPTunnelJobError('server parameters already set')