Commit 710284fb authored by Thibault VINCENT's avatar Thibault VINCENT
Browse files

add: tcp tunneling job

parent 664ed1b4
Loading
Loading
Loading
Loading
+249 −2
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-

import os, time, socket
import os, time, socket, select
from hashlib import md5
from datetime import datetime
from threading import Thread, RLock
from threading import Lock, RLock, Thread, Event
from logging import debug
from utils import RWLock
from drbd import DRBD
from errors import (JobManagerError, JobError, XferJobError, ReceiveFileJobError,
            SendFileJobError, LVMError, DRBDError)
from errors import (JobManagerError, JobError, XferJobError, 
            ReceiveFileJobError, SendFileJobError, DrbdCopyJobError, LVMError,
            DRBDError, TCPTunnelJobError)

#fixme
import traceback, sys
@@ -528,5 +531,249 @@ class ReceiveFileJob(XferJob):
        return int(self._port)


class TCPTunnelJob(BaseJob):
    '''
    '''
    # tcp port range used for tunnels
    TCP_PORT_MIN = 44000
    TCP_PORT_MAX = 45999
    # global port list
    _ports_used = []
    _ports_used_mutex = RWLock()
    
    def __init__(self, manager):
        '''
        '''
        super(TCPTunnelJob, self).__init__(manager)
        self._mutex = Lock()
        self._event_client_conf = Event()
        self._event_server_conf = Event()
        self._event_server_bound = Event()
        self._server_port = None
        # set here because of _cancel()
        self._sock_client = None
        self._sock_server_listen = None
        self._sock_server = None
    
    def _cancel(self):
        '''
        '''
        # explicitely close connections, to release blocking socket calls
        if self._sock_client:
            try:
                self._sock_client.close()
            except: pass
        if self._sock_server_listen:
            try:
                self._sock_server_listen.close()
            except: pass
        if self._sock_server:
            try:
                self._sock_server.close()
            except: pass
    
    def _cleanup(self):
        '''
        '''
        # unlock events
        self._event_client_conf.set()
        self._event_server_conf.set()
        self._event_server_bound.set()
        # destroy sockets
        self._sock_client = None
        self._sock_server_listen = None
        self._sock_server = None
    
    def _validate(self):
        '''
        '''
        return True # FIXME maybe
    
    def _prepare(self):
        '''
        '''
        return True # FIXME maybe
    
    def _job(self):
        '''
        '''
        try:
            # wait for tunnel configuration for server
            if not self._cancelled:
                self._log.append('waiting for tunnel server configuration')
                self._event_server_conf.wait()
            
            ## setup server
            # create listening socket
            self._sock_server_listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            # bind listening socket to a port
            self._server_port = None
            for port in range(self.TCP_PORT_MIN, self.TCP_PORT_MAX):
                # find a port to bind to
                try:
                    with self._ports_used_mutex.write:
                        if port not in self._ports_used:
                            addr = '127.0.0.1' if self._server_local else '0.0.0.0'
                            self._sock_server_listen.bind((addr, port)) #FIXME
                            self._server_port = port
                            self._ports_used.append(port)
                            break
                except socket.error:
                    pass
            if not self._server_port:
                msg = 'no port to listen to'
                self._log.append(msg)
                raise TCPTunnelJobError(msg)
            else:
                self._event_server_bound.set()
                self._log.append('server bound to `%s:%d`' % (addr,
                                                        self._server_port))
            
            # wait for tunnel configuration for client
            if not self._cancelled:
                self._log.append('waiting for tunnel client configuration')
                self._event_client_conf.wait()
            
            # loop and reconnect forever untill cancellation
            while not self._cancelled:
                
                ## wait for a client to connect
                self._log.append('waiting for a client to connect')
                self._sock_server_listen.settimeout(1)
                self._sock_server_listen.listen(1)
                self._sock_server = None
                while not self._cancelled and self._sock_server is None:
                    try:
                        self._sock_server, local_addr = self._sock_server_listen.accept()
                        self._log.append('client connected from `%s`' % (local_addr,))
                    except socket.timeout:
                        pass
                if self._sock_server is None:
                    return # job cancelled
                
                ## connect to the endpoint
                timeout = 15 # FIXME must be argument
                retry_interval = 5
                self._sock_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self._sock_client.settimeout(timeout)
                connected = False
                while not self._cancelled and not connected:
                    try:
                        self._log.append('connecting to `%s`' % (self._endpoint,))
                        self._sock_client.connect(self._endpoint)
                        connected = True
                    except socket.timeout:
                        self._log.append('no response after %d seconds' % timeout)
                    except socket.error:
                        self._log.append('socket error (connection refused ?)')
                    time.sleep(retry_interval)
                if self._cancelled:
                    return
                if not connected:
                    continue
                
                ## tunnel data between sockets
                # init socket poller
                mask_ro = select.EPOLLIN ^ select.EPOLLERR ^ select.EPOLLHUP
                mask_rw = mask_ro ^ select.EPOLLOUT
                poller = select.epoll()
                poller.register(self._sock_server, mask_ro)
                poller.register(self._sock_client, mask_ro)
                # forward data until a connection is closed
                buf_server = ''
                buf_client = ''
                connected = True
                while not self._cancelled and connected:
                    # wait for events on both sockets
                    poll = poller.poll()
                    ## process events for read/write operations only
                    for fd, events in poll:
                        # events are for server socket
                        if fd == self._sock_server.fileno():
                            # read available
                            if events & select.EPOLLIN:
                                # read incoming data
                                try:
                                    read = self._sock_server.recv(4096)
                                except socket.error:
                                    connected = False
                                buf_server += read
                                # set the other socket to notify us when it's
                                # available for writing
                                if len(buf_server):
                                    poller.modify(self._sock_client, mask_rw)
                            # write available
                            if events & select.EPOLLOUT:
                                # try to send the whole buffer
                                try:
                                    sent = self._sock_server.send(buf_client)
                                except socket.error:
                                    connected = False
                                # drop sent data from the buffer
                                buf_client = buf_client[sent:]
                                # if the buffer becomes empty, stop write polling
                                if not len(buf_client):
                                    poller.modify(self._sock_server, mask_ro)
                        # events for client socket
                        else:
                            # read available
                            if events & select.EPOLLIN:
                                # read incoming data
                                try:
                                    read = self._sock_client.recv(4096)
                                except socket.error:
                                    connected = False
                                buf_client += read
                                # set the other socket to notify us when it's
                                # available for writing
                                if len(buf_client):
                                    poller.modify(self._sock_server, mask_rw)
                            # write available
                            if events & select.EPOLLOUT:
                                # try to send the whole buffer
                                try:
                                    sent = self._sock_client.send(buf_server)
                                except socket.error:
                                    connected = False
                                # drop sent data from the buffer
                                buf_server = buf_server[sent:]
                                # if the buffer becomes empty, stop write polling
                                if not len(buf_server):
                                    poller.modify(self._sock_client, mask_ro)
                if connected is False:
                    self._log.append('disconnected')
                try:
                    self._sock_server.close()
                except: pass
                try:
                    self._sock_client.close()
                except: pass
        except Exception as err:
            traceback.print_exc()
            raise err
    
    def tunnel_get_server_port(self):
        '''
        '''
        self._event_server_bound.wait()
        return self._server_port
    
    def tunnel_connect(self, endpoint):
        '''
        '''
        with self._mutex:
            if not self._event_client_conf.is_set():
                self._endpoint = endpoint
                self._event_client_conf.set()
            else:
                raise TCPTunnelJobError('remote endpoint already set')
    
    def tunnel_listen(self, local):
        '''
        '''
        with self._mutex:
            if not self._event_server_conf.is_set():
                self._server_local = local is True
                self._event_server_conf.set()
            else:
                raise TCPTunnelJobError('server parameters already set')