Skip to content
jobs.py 38.4 KiB
Newer Older
# -*- coding: utf-8 -*-

Thibault VINCENT's avatar
Thibault VINCENT committed
import os, time, socket, select
from hashlib import md5
from datetime import datetime
Thibault VINCENT's avatar
Thibault VINCENT committed
from threading import Lock, RLock, Thread, Event
from logging import debug
from utils import RWLock
from drbd import DRBD
from lvm import LVM
Thibault VINCENT's avatar
Thibault VINCENT committed
from errors import (JobManagerError, JobError, XferJobError, 
            ReceiveFileJobError, SendFileJobError, DrbdCopyJobError, LVMError,
            DRBDError, TCPTunnelJobError)

#fixme
import traceback, sys

class JobManager(object):
    '''
    '''
    MAX_JID = 65535
    
    def __init__(self):
        '''
        '''
        super(JobManager, self).__init__()
        self._mutex = RLock()
        self._next_jid = 1
        # job objects indexed by job ID
        self._jobs = {}
        # new jobs IDs
        self._jobs_pending = []
        # running jobs IDs
        self._jobs_running = []
        # dead jobs IDs
        self._jobs_crashed = []
        self._jobs_finished = []
    
    def _job_crashed(self, jid):
        debug('JobManager._job_crashed: id=`%i`' % jid)
        with self._mutex:
            # job should exist
            job = self.get_job(jid)
            # move job to the crashed queue
            self._jobs_crashed.append(jid)
            # set the cancelled flag to block pending commands
            job._cancelled = True
            # clean any queue that may contain the job
            for queue in [self._jobs_pending, self._jobs_running,
                                                        self._jobs_finished]:
                if jid in queue:
                    queue.remove(jid)
            # FIXME put at the end because it may raise errors
            # execute job self cleanup
            job._cleanup()
    
    def _job_finished(self, jid):
        debug('JobManager._job_finished: id=`%i`' % jid)
        with self._mutex:
            # job should exist
            job = self.get_job(jid)
            # move job to the finished queue
            self._jobs_finished.append(jid)
            # remove from running queue
            self._jobs_running.remove(jid)
            # FIXME put at the end because it may raise errors
            # execute job self cleanup
            job._cleanup()
    def run(self):
        '''
        '''
    def append(self, job):
        '''
        '''
        with self._mutex:
            jid = self._next_jid
            self._next_jid += 1
            self._jobs[jid] = job
            self._jobs_pending.append(jid)
        debug('JobManager.append: new job pending with id `%i`', jid)
        return jid
    
    def cancel(self, jid):
        '''
        '''
        with self._mutex:
            # job should exist
            job = self.get_job(jid)
            # job should be running
            if jid not in self._jobs_running:
                raise JobManagerError('job `%i` is not running' % jid)
            # ask the job to stop it's execution
            self._jobs[jid]._cancelled = True
            self._jobs[jid]._cancel()
            # do NOT execute _cleanup() here !!!
    
    def list(self):
        '''
        '''
        with self._mutex:
            pending = []
            running = []
            crashed = []
            finished = []
            orphaned = []
            for jid, job in self._jobs.iteritems():
                s = (jid, job.get_type())
                if jid in self._jobs_pending:
                    pending.append(s)
                elif jid in self._jobs_running:
                    running.append(s)
                elif jid in self._jobs_crashed:
                    crashed.append(s)
                elif jid in self._jobs_finished:
                    finished.append(s)
                else:
                    orphaned.append(s)
            return {
                'pending'   : pending,
                'running'   : running,
                'crashed'   : crashed,
                'finished'  : finished,
                'orphaned'  : orphaned,
                }
    
    def schedule_immediate(self, jid):
        '''
        '''
        debug('JobManager.schedule_immediate: id=`%i`', jid)
        with self._mutex:
            # job should exist
            job = self.get_job(jid)
            # job should be pending execution
            if jid not in self._jobs_pending:
                raise JobManagerError('job `%i` not prepared for execution'%jid)
            # execute job
            self._jobs_running.append(jid)
            self._jobs_pending.remove(jid)
            job.start()
    def is_job(self, jid):
        '''
        '''
        with self._mutex:
            return jid in self._jobs
    
    def get_job(self, jid):
        '''
        '''
        with self._mutex:
            if self.is_job(jid):
                return self._jobs[jid]
            else:
                raise JobManagerError('unknown job ID `%i`' % jid)
    
    def is_job_pending(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_pending
    
    def is_job_running(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_running
    
    def is_job_crashed(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_crashed
    
    def is_job_finished(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_finished
    
    def is_job_cancelled(self, jid):
        '''
        '''
        with self._mutex:
            job = self.get_job(jid)
            return job._cancelled is True


class JobLog():
    '''
    '''
    def __init__(self):
        '''
        '''
        self._items = []
        self._mutex = RLock()
    
    def __str__(self):
        '''
        '''
        res = ""
        for tup in self._items:
            res += "[%s] %s\n" % (datetime.fromtimestamp(tup[0]).strftime(
                                                "%Y-%m-%d %H:%M:%S"), tup[1])
        return res
    
    def append(self, message):
        '''
        '''
        # FIXME add limit to buffer
        debug('JobLog.append: %s', message)
        with self._mutex:
            self._items.append((time.time(), message))


class BaseJob(Thread, object):
    '''
    '''
    def __init__(self, manager):
        '''
        '''
        super(BaseJob, self).__init__()
        self._manager = manager
        self._log = JobLog()
        # job state
        self._type = self.__class__.__name__
        self._validated = None
        self._ready_run = False
        self._cancelled = False
        # store job in manager
        self._jid = self._manager.append(self)
        self.name = '%s-%i' % (self._type, self._jid)
        self._log.append('stored with id `%i` and type `%s`' % (self._jid,
                                                                    self._type))
    
    def _validate(self):
        '''
        To be implemented in derivate class
        '''
        return False
    
    def _prepare(self):
        '''
        To be implemented in derivate class
        '''
        return False
        
    def _cancel(self):
        '''
        To be implemented in derivate class, or not
        '''
        pass
    
    def _cleanup(self):
        '''
        To be implemented in derivate class, or not
        '''
        pass
    
    def _job(self):
        '''
        To be implemented in derivate class
        '''
        self._log.append('nothing to do')
        raise JobError('empty job')
    
    def is_ready(self):
        '''
        '''
        return self._ready_run is True
    
    def get_id(self):
        '''
        '''
        return self._jid
    
    def get_log(self):
        '''
        '''
        return str(self._log)
    
    def get_type(self):
        '''
        '''
        return self._type
    
    def start_now(self):
        '''
        '''
        self._manager.schedule_immediate(self._jid)
    
    def run(self):
        '''
        '''
        try:
            if not self._ready_run:
                self.prepare()
            self._log.append("running")
            self._job()
            self._log.append("finished")
            self._manager._job_finished(self._jid)
        except Exception as err:
            self._log.append("*** CRASHED: `%r`: `%s`" % (err, err))
            self._manager._job_crashed(self._jid)
    
    def prepare(self):
        '''
        '''
        if not self._ready_run:
            # run validation if not done yet
            if self._validated is None:
                try:
                    self._validated = self._validate()
                except:
                    self._validated = False
                finally:
                    # do not continue if validation fails
                    if not self._validate:
                        self._log.append('validation failed')
                        raise JobError('validation failed')
            # run validation in derivated Job class
            try:
                ready = self._prepare()
            except:
                ready = False
            finally:
                self._ready_run = ready
                # do not continue if preparation fails
                if not ready:
                    self._log.append('preparation failed')
                    raise JobError('preparation failed')
                else:
                    self._log.append('ready for execution')


class XferJob(BaseJob):
    '''
    '''
    TYPES = [
        'SendFileJob',
        'ReceiveFileJob',
    ]
    
    # tcp port range used for transferts
    TCP_PORT_MIN = 42000
    TCP_PORT_MAX = 43999
    
    def __init__(self, manager, path):
        '''
        '''
        self._path = path
        self._md5 = md5()
        self._checksum = None
        super(XferJob, self).__init__(manager)
    
    def _validate(self):
        '''
        '''
        return self._type in self.TYPES
    
    def get_checksum(self):
        return self._checksum
#TODO rewrite with the _cancel() and _cleanup() methods
class SendFileJob(XferJob):
    '''
    '''
    def __init__(self, manager, path, remote_host, remote_port):
        '''
        '''
        self._host = remote_host
        self._local_port = int(remote_port)
        super(SendFileJob, self).__init__(manager, path)
    
    def _validate(self):
        '''
        '''
        return ( super(SendFileJob, self)._validate() 
                    and os.path.isfile(self._path)
                    and os.access(self._path, os.F_OK | os.R_OK )
                    and self._local_port >= self.TCP_PORT_MIN
                    and self._local_port <= self.TCP_PORT_MAX
                    and isinstance(self._host, str))
    
    def _prepare(self):
        '''
        '''
        # create socket
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        return bool(self._sock)
    
    def _job(self):
        '''
        '''
        fd = None
        try:
            # try to connect
            self._log.append('connecting to `%s`:`%i`' % (self._host,
                                                                    self._local_port))
            self._sock.settimeout(10)
            retry = 0
            connected = False
            while retry < 6 and not connected and not self._cancelled:
                retry += 1
                try:
                    self._sock.connect((self._host, self._local_port))
                except socket.timeout:
                    pass
                except socket.error:
                    time.sleep(1)
                    pass
                else:
                    connected = True
            if self._cancelled:
                self._log.append('stopped connect attempt (job was cancelled)')
            elif connected:
                self._log.append('connected')
                # open file
                self._log.append('opening source file `%s`' % self._path)
                fd = open(self._path, 'rb')
                # read and send file
                self._log.append('sending data')
                while True and not self._cancelled:
                    buf = fd.read(4096)
                    if not buf:
                        break
                    self._sock.send(buf)
                    self._md5.update(buf)
                # report transfert completion
                if self._cancelled:
                    self._log.append('transfert aborted (job was cancelled)')
                else:
                    self._log.append('no more data to send')
                    self._checksum = self._md5.hexdigest()
                    self._md5 = None
                    self._log.append('transfered data checksum = `%s`' %
                                                                self._checksum)
            else:
                self._log.append('remote host did not accept connection, '
                                                        'aborting emission')
        except:
            #traceback.print_exc(file=sys.stdout)
            raise SendFileJobError('FIXME error reporting')
        finally:
            # close socket and files anyway
            try:
                self._sock.close()
            except:
                pass
            try:
                fd.close()
            except:
                pass
#TODO rewrite with the _cancel() and _cleanup() methods
class ReceiveFileJob(XferJob):
    '''
    '''
    _ports_used_local = []
    _ports_used_mutex = RWLock()
    
    def __init__(self, manager, path, create=False):
        '''
        '''
        self._create = create
        self._local_port = None
        super(ReceiveFileJob, self).__init__(manager, path)
    
    def _validate(self):
        '''
        '''
        return ( super(ReceiveFileJob, self)._validate() and
                (
                    (   not self._create and
                        os.path.isfile(self._path) and
                        os.access(self._path, os.F_OK | os.W_OK ))
                    or
                    (   self._create and
                        os.path.isdir(os.path.dirname(self._path)) and
                        os.access(os.path.dirname(self._path), os.F_OK |os.W_OK)
                    )
                ))
    
    def _prepare(self):
        '''
        '''
        # create socket
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # bind socket to a port
        for port in range(self.TCP_PORT_MIN, self.TCP_PORT_MAX):
            # find a port to bind to
            try:
                with self._ports_used_mutex.write:
                    if port not in self._ports_used_local:
                        self._sock.bind(('', port))
                        self._local_port = port
                        self._ports_used_local.append(port)
                        break
            except socket.error:
                pass
        if not self._local_port:
            msg = 'no port to listen to'
            self._log.append(msg)
            raise ReceiveFileJobError(msg)
        return True
    
    def _job(self):
        '''
        '''
        fd = None
        try:            
            self._sock.listen(1)
            self._sock.settimeout(1)
            # wait for connection
            self._log.append('waiting for connection on port `%i`' % self._local_port)
            retry = 0
            conn = None
            while retry < 60 and not conn and not self._cancelled:
                retry += 1
                try:
                    conn, addr = self._sock.accept()
                except socket.timeout as err:
                    pass
            if self._cancelled:
                self._log.append('stopped waiting (job was cancelled)')
            if conn:
                self._log.append('client connected from `%s`')
                # open file
                self._log.append('opening destination `%s`' % self._path)
                fd = open(self._path, 'wb')
                # write to the file
                self._log.append('receiving data')
                while True and not self._cancelled:
                    buf = conn.recv(4096)
                    if not buf:
                        break
                    fd.write(buf)
                    self._md5.update(buf)
                # report transfert completion
                if self._cancelled:
                    self._log.append('transfert aborted (job was cancelled)')
                else:
                    self._log.append('remote host closed connection')
                    self._checksum = self._md5.hexdigest()
                    self._md5 = None
                    self._log.append('transfered data checksum = `%s`' %
                                                                self._checksum)
            else:
                self._log.append('no connection received, aborting reception')
        except:
            #traceback.print_exc(file=sys.stdout)
            raise ReceiveFileJobError('FIXME error reporting')
        finally:
            # close socket and files anyway
            try:
                self._sock.close()
            except:
                pass
            with self._ports_used_mutex.write:
                    if self._local_port in self._ports_used_local:
                        self._ports_used_local.remove(self._local_port)
            try:
                fd.close()
            except:
                pass

    def get_port(self):
        '''
        '''
        return int(self._local_port)


class DrbdCopyJob(BaseJob):
    '''
    '''
    def __init__(self, manager, lvm, drbd_pool, source_vg, source_lv):
        '''
        '''
        self._mutex_step = Lock()
        self._lvm = lvm
        self._pool = drbd_pool
        self._source_vgname = source_vg
        self._source_lvname = source_lv
        super(DrbdCopyJob, self).__init__(manager)
        self._drbd = self._pool.new_device()
        # checkpoint events
        self._event_connected_ready = Event()
        self._event_connected = Event()
        self._event_rolechosen_ready = Event()
        self._event_rolechosen = Event()
        self._event_sleep_ready = Event()
        self._event_sleep = Event()
    
    def _cancel(self):
        '''
        '''
        # release some pending calls
Thibault VINCENT's avatar
Thibault VINCENT committed
        self._event_connected_ready.set()
        self._event_connected.set()
        self._event_rolechosen_ready.set()
        self._event_rolechosen.set()
        self._event_sleep_ready.set()
        self._event_sleep.set()
Thibault VINCENT's avatar
Thibault VINCENT committed
        # disconnect the node
        self._drbd.disconnect()
    
    def _cleanup(self):
        '''
        '''
        # destroy drbd device
        try:
            self._log.append('destroying the drbd device')
            self._drbd.destroy()
        except Exception as err:
            self._log.append('failed to destroy the drbd device')
        else:
            self._log.append('successfully destroyed the drbd device')
        
        # we wait for any potential job step to finish
        # should be quick after the drbd destruction (kills all waits)
        with self._mutex_step:
            # destroy drbd meta
            try:
                self._log.append('removing the drbd meta volume')
                self._lvm.get_vg(self._source_vgname).remove(self._meta_lvname)
            except Exception as err:
                self._log.append('failed to remove the drbd meta volume')
            else:
                self._log.append('successfully removed the drbd meta volume')
            # remove source LV copy
            try:
                self._log.append('removing the DM device copy')
                self._lvm.dm_remove(self._copy_dm)
            except Exception as err:
                self._log.append('failed to remove the DM device copy')
            else:
                self._log.append('successfully removed DM device copy')
    
    def _prepare(self):
        '''
        '''
        self._lvm.reload()
        # get source LV handle
        try:
            lv = self._lvm.get_lv(self._source_vgname, self._source_lvname)
        except:
            msg = 'failed to fetch LV `%s/%s`' % (self._source_vgname,
                                                    self._source_lvname)
            self._log.append(msg)
            raise DrbdCopyJobError(msg)
        # get source LV infos
        self._source_path = lv.path
        self._source_size = lv.size
        self._source_dm = lv.dm_name()
        self._source_table = lv.dm_table()
        self._drbd_table = None
        # we got here, so all went fine
        # just a safety
        return lv.size > 0
    
    def _job(self):
        '''
        '''
        # checkpoint
        if self._cancelled:
            return
        
        self._lvm.reload()
        
        with self._mutex_step:
            # create drbd meta
            try:
                self._log.append('creating a drbd meta volume')
                # now let's create a meta device for DRBD in the same VG
                # MS = C/32768 + 4MiB  &&  MS >= 128MiB
                # http://www.drbd.org/users-guide/ch-internals.html
                self._meta_lvname = '%s.drbdmeta' % self._source_lvname
                self._meta_path = '/dev/%s/%s' % (self._source_vgname,
                                                            self._meta_lvname)
                self._meta_size = int(max(self._source_size/32768 + 4 * 2**20,
                                                            128 * 2**20))
                self._lvm.get_vg(self._source_vgname).create(self._meta_lvname,
                                                            self._meta_size)
                self._lvm.reload()
                self._meta_dm = self._lvm.get_lv(self._source_vgname,
                                                    self._meta_lvname).dm_name()
            except Exception as err:
                self._log.append('failed to create a drbd meta volume')
                raise err
            else:
                self._log.append('successfully created the drbd meta volume'
                                                    ' `%s`' % self._meta_path)
        
        # checkpoint
        if self._cancelled:
            return
        
        with self._mutex_step:
            # create a copy of the source LV
            try:
                self._log.append('creating a device copy of `%s`' %
                                                        self._source_path)
                self._copy_dm = '%s.%s.copy' % (self._source_vgname,
                                                        self._source_lvname)
                self._copy_path = '/dev/mapper/%s' % self._copy_dm
                self._lvm.dm_create(self._copy_dm, self._source_table)
            except Exception as err:
                self._log.append('failed to create a device copy')
                raise err
            else:
                self._log.append('successfully copied device as `%s`' %
                                                        self._copy_path)
        
        # checkpoint
        if self._cancelled:
            return
        
        with self._mutex_step:
            # setup the drbd device
            try:
                self._log.append('configuring the drbd device `%s`' %
                                                        self._drbd.get_path())
                self._drbd.setup(self._copy_path, self._meta_path)
                self._drbd_table = '0 %d linear %s 0' % (self._source_size / 512
Thibault VINCENT's avatar
Thibault VINCENT committed
                                                    , self._drbd.get_path())
            except Exception as err:
                self._log.append('failed to configure the drbd device')
                raise err
            else:
                self._log.append('successfully configured the drbd device')
        
        # blocking checkpoint
        # wait for call to connect()
        if self._cancelled:
            return
        self._event_connected_ready.set()
        self._event_connected.wait()
        if self._cancelled:
            return
        
        with self._mutex_step:
            # wait for a connection with the remote host
            try:
                self._log.append('waiting for node connection')
                self._drbd.wait_connection()
            except Exception as err:
                self._log.append('failed to establish a connection')
                raise err
            else:
                self._log.append('connected with a remote node')
        
        # blocking checkpoint
        # wait for call to role()
        if self._cancelled:
            return
        self._event_rolechosen_ready.set()
        self._event_rolechosen.wait()
        if self._cancelled:
            return
        
        # wait for the sync completion
        try:
            self._log.append('waiting for initial device sync')
            self._drbd.wait_sync()
        except Exception as err:
Thibault VINCENT's avatar
Thibault VINCENT committed
            import traceback
            traceback.print_exc()
            self._log.append('failed to sync the DRBD')
            raise err
        else:
            self._log.append('DRBD is sync\'ed !')
        
        # sleep until the DRBD is shutdown
        self._log.append('job entering sleep')
        self._event_sleep_ready.set()
        self._event_sleep.wait()
        self._log.append('job leaving sleep')
    
    def drbd_get_port(self):
        '''
        '''
        return self._drbd.get_port()
    
    def drbd_connect(self, remote_addr, remote_port):
        '''
        '''
        if not self._event_connected.is_set():
            if not self._event_connected_ready.is_set():
                self._log.append('can\'t connect right now, waiting for '
                                        'initialization to complete...')
                self._event_connected_ready.wait()
            with self._mutex_step:
                # set endpoint
                self._drbd.connect(remote_addr, remote_port)
                # unlock any connection wait
                self._event_connected.set()
        else:
            raise DrbdCopyJobError('already connected')
    
    def drbd_role(self, primary=None):
        '''
        '''
        if self._event_connected.is_set():
            self._event_rolechosen_ready.wait()
            if primary is True or primary is False:
                # change node role
                if primary:
                    self._log.append('switching to primary mode')
                    self._drbd.primary()
                else:
                    self._log.append('switching to secondary mode')
                    self._drbd.secondary()
                # unlock any role change wait
                self._event_rolechosen.set()
        else:
            raise DrbdCopyJobError('cannot change role, not connected yet')
    
    def drbd_waitsync(self):
        '''
        '''
        self._log.append('waiting for sync')
        # wait for initial sync and setup to be completed
        self._event_sleep_ready.wait()
Thibault VINCENT's avatar
Thibault VINCENT committed
        # wait for additionnal DRBD sync
        with self._mutex_step:
            self._drbd.wait_sync()
    
    def drbd_takeover(self, state):
        '''
        '''
        # FIXME comment that
        # FIXME check events
        with self._mutex_step:
            #
            if state is True:
                if self._drbd_table is not None:
                    table = self._drbd_table
                else:
                    raise DrbdCopyJobError('no DRBD table available yet')
            else:
                table = self._source_table
            #
            self._lvm.dm_set_table(self._source_dm, table)
    
Thibault VINCENT's avatar
Thibault VINCENT committed
    def drbd_status(self):
Thibault VINCENT's avatar
Thibault VINCENT committed
        return self._drbd.status()
Thibault VINCENT's avatar
Thibault VINCENT committed
class TCPTunnelJob(BaseJob):
    '''
    '''
    # tcp port range used for tunnels
    TCP_PORT_MIN = 44000
    TCP_PORT_MAX = 45999
    # global port list
    _ports_used = []
    _ports_used_mutex = RWLock()
Thibault VINCENT's avatar
Thibault VINCENT committed
    def __init__(self, manager):
        '''
        '''
        super(TCPTunnelJob, self).__init__(manager)
        self._mutex = Lock()
        self._event_client_conf = Event()
        self._event_server_conf = Event()
        self._event_server_bound = Event()
        self._server_port = None
        # set here because of _cancel()
        self._sock_client = None
        self._sock_server_listen = None
        self._sock_server = None
    
    def _cancel(self):
        '''
        '''
        # explicitely close connections, to release blocking socket calls
        if self._sock_client:
            try:
                self._sock_client.close()
            except: pass
        if self._sock_server_listen:
            try:
                self._sock_server_listen.close()
            except: pass
        if self._sock_server:
            try:
                self._sock_server.close()
            except: pass
Thibault VINCENT's avatar
Thibault VINCENT committed
    def _cleanup(self):
        '''
        '''
        # unlock events
        self._event_client_conf.set()
        self._event_server_conf.set()
        self._event_server_bound.set()
        # destroy sockets
        self._sock_client = None
        self._sock_server_listen = None
        self._sock_server = None
    
    def _validate(self):
        '''
        '''
        return True # FIXME maybe
    
    def _prepare(self):
        '''
        '''
        return True # FIXME maybe
    
    def _job(self):
        '''
        '''
        try:
            # wait for tunnel configuration for server
            if not self._cancelled:
                self._log.append('waiting for tunnel server configuration')
                self._event_server_conf.wait()
            
            ## setup server
            # create listening socket
            self._sock_server_listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            # bind listening socket to a port
            self._server_port = None
            for port in range(self.TCP_PORT_MIN, self.TCP_PORT_MAX):
                # find a port to bind to
                try:
                    with self._ports_used_mutex.write:
                        if port not in self._ports_used:
                            addr = '127.0.0.1' if self._server_local else '0.0.0.0'
                            self._sock_server_listen.bind((addr, port)) #FIXME
                            self._server_port = port
                            self._ports_used.append(port)
                            break
                except socket.error:
                    pass
            if not self._server_port:
                msg = 'no port to listen to'
                self._log.append(msg)
                raise TCPTunnelJobError(msg)
            else:
                self._event_server_bound.set()
                self._log.append('server bound to `%s:%d`' % (addr,
                                                        self._server_port))
            
            # wait for tunnel configuration for client
            if not self._cancelled:
                self._log.append('waiting for tunnel client configuration')
                self._event_client_conf.wait()
            
            # loop and reconnect forever untill cancellation
            while not self._cancelled:
                
                ## wait for a client to connect
                self._log.append('waiting for a client to connect')
                self._sock_server_listen.settimeout(1)
                self._sock_server_listen.listen(1)
                self._sock_server = None
                while not self._cancelled and self._sock_server is None:
                    try:
                        self._sock_server, local_addr = self._sock_server_listen.accept()
                        self._log.append('client connected from `%s`' % (local_addr,))
                    except socket.timeout:
                        pass
                if self._sock_server is None:
                    break # job cancelled
Thibault VINCENT's avatar
Thibault VINCENT committed
                
                ## connect to the endpoint
                timeout = 15 # FIXME must be argument
                retry_interval = 5
                # FIXME detection of socket type
                if isinstance(self._endpoint, str):
                    self._sock_client = socket.socket(socket.AF_UNIX,
                                                        socket.SOCK_STREAM)
                else:
                    self._sock_client = socket.socket(socket.AF_INET,
                                                        socket.SOCK_STREAM)
Thibault VINCENT's avatar
Thibault VINCENT committed
                self._sock_client.settimeout(timeout)
                connected = False
                while not self._cancelled and not connected:
                    try:
                        self._log.append('connecting to `%s`' % (self._endpoint,))