Commit 63e3d25f authored by Anael Beutot's avatar Anael Beutot
Browse files

TunnelJob now inherits from ForkedJob

Replaced all logger call in child.
Added dedicated libev loop.
Handle exceptions using fatal helpers of ForkedJob.
parent 738918f4
Loading
Loading
Loading
Loading
+53 −54
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@ from xml.etree import ElementTree as et
import pyev

from cloudcontrol.node.exc import TunnelError, DRBDAllocationError, DRBDError
from cloudcontrol.node.jobs import BaseIOJob
from cloudcontrol.node.jobs import BaseIOJob, ForkedJob
from cloudcontrol.node.utils import SocketBuffer, subproc_call, Singleton


@@ -256,24 +256,23 @@ class ExportVolume(BaseIOJob):
        self.clean_fds()


class TCPTunnel(object):
class TCPTunnel(ForkedJob):
    """Handles a TCP tunnel."""

    BUFFER_LEN = 8096

    def __init__(self, job_manager, ev_loop, connect=None, listen='0.0.0.0'):
    def __init__(self, job_manager, connect=None, listen='0.0.0.0'):
        """
        :param job_manager: :class:`JobManager` instance
        :param ev_loop: pyev loop instance (to create watchers from)
        :param connect: where to connect one end of the tunnel (a tuple, as
            given to socket.connect)
        :param listen: which interface to listen to for the other end of the
            tunnel
        """
        #: job id
        self.id = job_manager.job_id.next()
        ForkedJob.__init__(self, job_manager)

        self.ev_loop = ev_loop
        # create a new libev loop that will run inside our child
        self.ev_loop = pyev.Loop()
        self.connect = connect
        self.listen = listen
        #: port is assigned by the kernel
@@ -303,8 +302,18 @@ class TCPTunnel(object):
        # to source_sock
        self.output_buffer = SocketBuffer()

    @property
    def open_fds(self):
        return [fo.fileno() for fo in (self.source_sock, self.dest_sock)
                if fo is not None]

    def after_fork(self):
        self.ev_loop.reset()

    def close(self):
        logger.debug('Closing job %d', self.id)
        # as this could be called from child, don't use logger (this is for
        # debug anyway)
        sys.stderr.write('Closing job %d' % self.id)
        # stop watchers
        if self.source_reader is not None:
            self.source_reader.stop()
@@ -418,6 +427,10 @@ class TCPTunnel(object):

        self.connect_state = 'CONNECTING'

    def run_job(self):
        sys.stderr.write('Will start ev loop in child\n')
        self.ev_loop.start()

    def accept_cb(self, watcher, revents):
        try:
            new_source, remote = self.source_sock.accept()
@@ -427,11 +440,8 @@ class TCPTunnel(object):
                return

            # else
            logger.exception('Error while accepting new connection on'
            self.fatal_exc('Error while accepting new connection on'
                           ' sock_source for tunnel job')
            self.close()
            self.error = exc.errno
            return

        # everything went fine
        self.source_sock.close()  # we won't accept connections
@@ -440,18 +450,15 @@ class TCPTunnel(object):
        try:
            self.source_sock.setblocking(0)
        except socket.error as exc:
            logger.exception('Cannot set source socket in non blocking for'
            self.fatal_exc('Cannot set source socket in non blocking for'
                           ' tunnel job: %s', os.strerror(exc.errno))
            self.close()
            self.error = exc.errno
            return
        self.source_reader.stop()
        self.source_reader = self.ev_loop.io(new_source, pyev.EV_READ,
                                             self.read_cb)
        self.source_writer = self.ev_loop.io(new_source, pyev.EV_WRITE,
                                             self.write_cb)
        logger.debug('Successfully accepted remote client %s for tunnel job %d',
                     remote, self.id)
        sys.stderr.write('Successfully accepted remote client %s for tunnel'
                         ' job %d\n' % (remote, self.id))
        self.listen_state = 'CONNECTED'
        if self.connect_state == 'CONNECTED':
            # start the watchers only if both ends are ready to accept data
@@ -462,10 +469,8 @@ class TCPTunnel(object):
        # check that connection was a success
        error = self.dest_sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if error:
            logger.error('Error during connect for tunnel job, %s' %
            self.fatal('Error during connect for tunnel job, %s\n' %
                       os.strerror(error))
            self.close()
            return

        # else we setup watcher with proper events
        self.dest_reader = self.ev_loop.io(self.dest_sock, pyev.EV_READ,
@@ -473,8 +478,8 @@ class TCPTunnel(object):
        self.dest_writer.stop()
        self.dest_writer = self.ev_loop.io(self.dest_sock, pyev.EV_WRITE,
                                           self.write_cb)
        logger.debug('Successfully connected to remote endpoint %s %d',
                     self.connect, self.id)
        sys.stderr.write('Successfully connected to remote endpoint %s %d\n' %
                         (self.connect, self.id))
        self.connect_state = 'CONNECTED'
        if self.listen_state == 'CONNECTED':
            # start the watchers only if both ends are ready to accept data
@@ -483,58 +488,55 @@ class TCPTunnel(object):

    def read_cb(self, watcher, revents):
        if watcher == self.dest_reader:
            # logger.debug('Read event on dest %s', self.id)
            # sys.stderr.write('Read event on dest %s\n' % self.id)
            sock = self.dest_sock
            buffer_ = self.output_buffer
            other_watcher = self.source_writer
        else:
            # logger.debug('Read event on source %s', self.id)
            # sys.stderr.write('Read event on source %s\n' % self.id)
            sock = self.source_sock
            buffer_ = self.input_buffer
            other_watcher = self.dest_writer

        # logger.debug('Will loop into event')
        # sys.stderr.write('Will loop into event\n')
        while True:
            try:
                incoming = sock.recv(self.BUFFER_LEN)
            except socket.error as exc:
                if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
                    # logger.debug('EAGAIN')
                    # sys.stderr.write('EAGAIN\n')
                    break
                # else: unexpected error
                logger.exception('Unexpected error while reading on socket'
                                 ' for tunnel job, %s', os.strerror(exc.errno))
                self.close()
                self.error = exc.errno
                return
                self.fatal_exc('Unexpected error while reading on socket'
                               ' for tunnel job, %s\n', os.strerror(exc.errno))

            if not incoming:
                # EOF
                # logger.debug('EOF')
                # sys.stderr.write('EOF\n')
                self.close()
                return
            # logger.debug('Read %d bytes', len(incoming))
            # sys.stderr.write('Read %d bytes\n' % len(incoming))
            buffer_.append(incoming)
            if buffer_.is_full():
                # logger.debug('Buffer is full')
                # sys.stderr.write('Buffer is full\n')
                watcher.stop()
                break

        # we did read some bytes that we could write to the other end
        if not buffer_.is_empty():
            # logger.debug('Starting other watcher')
            # sys.stderr.write('Starting other watcher\n')
            other_watcher.start()

        # logger.debug('Read event done')
        # sys.stderr.write('Read event done\n')

    def write_cb(self, watcher, revents):
        if watcher == self.dest_writer:
            # logger.debug('Write event on dest %s', self.id)
            # sys.stderr.write('Write event on dest %s', self.id)
            sock = self.dest_sock
            buffer_ = self.input_buffer
            other_watcher = self.source_reader
        else:
            # logger.debug('Write event on source %s', self.id)
            # sys.stderr.write('Write event on source %s\n' % self.id)
            sock = self.source_sock
            buffer_ = self.output_buffer
            other_watcher = self.dest_reader
@@ -544,7 +546,7 @@ class TCPTunnel(object):
                to_send = buffer_.popleft()
            except IndexError:
                # buffer is empty, we should stop write event
                # logger.debug('Buffer is empty')
                # sys.stderr.write('Buffer is empty\n')
                watcher.stop()
                break
            send_buffer = to_send
@@ -555,17 +557,14 @@ class TCPTunnel(object):
                except socket.error as exc:
                    if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
                        buffer_.appendleft(to_send[total_sent:])
                        # logger.debug('EAGAIN')
                        # sys.stderr.write('EAGAIN\n')
                        break
                    # else: unexpected error
                    logger.exception('Unexpected error while writting on socket'
                    self.fatal_exc('Unexpected error while writting on socket'
                                   ' for tunnel job, %s',
                                   os.strerror(exc.errno))
                    self.close()
                    self.error = exc.errno
                    return

                # logger.debug('Written %d bytes', written)
                # sys.stderr.write('Written %d bytes\n' % written)
                if written == len(send_buffer):
                    break

@@ -575,10 +574,10 @@ class TCPTunnel(object):

        # if we can read on the other end
        if not buffer_.is_full():
            # logger.debug('Starting other watcher')
            # sys.stderr.write('Starting other watcher\n')
            other_watcher.start()

        # logger.debug('Proccessed write event')
        # sys.stderr.write('Proccessed write event\n')


class DRBDAllocator(object):