Commit 9efcdff8 authored by Anael Beutot's avatar Anael Beutot
Browse files

Import and export volume jobs inherit from BaseIOJob.

Replace logging calls with sys.stderr.write in order to prevent deadlock in the
fork.
parent 871d1ab3
Loading
Loading
Loading
Loading
+30 −22
Original line number Diff line number Diff line
@@ -8,27 +8,28 @@ from time import sleep
from hashlib import md5
from StringIO import StringIO
from subprocess import CalledProcessError
import sys
from xml.etree import ElementTree as et

import pyev

from cloudcontrol.node.exc import TunnelError, DRBDAllocationError, DRBDError
from cloudcontrol.node.jobs import BaseThreadedJob
from cloudcontrol.node.jobs import BaseIOJob
from cloudcontrol.node.utils import SocketBuffer, subproc_call, Singleton


logger = logging.getLogger(__name__)


class ImportVolume(BaseThreadedJob):
class ImportVolume(BaseIOJob):
    """Import volume job.

    """
    BUFFER_LEN = 8192 * 16
    HASH = md5

    def __init__(self, job_manager, ev_loop, volume):
        BaseThreadedJob.__init__(self, job_manager, ev_loop)
    def __init__(self, job_manager, volume):
        BaseIOJob.__init__(self, job_manager)

        self.checksum = None
        self.volume = volume
@@ -40,6 +41,11 @@ class ImportVolume(BaseThreadedJob):
        self.client_sock = None
        self.disk = None

    @property
    def open_fds(self):
        return [fo.fileno() for fo in (self.sock, self.client_sock, self.disk)
                if fo is not None]

    def clean_fds(self):
        if self.sock is not None:
            self.sock.close()
@@ -93,17 +99,14 @@ class ImportVolume(BaseThreadedJob):
        return self.port

    def run_job(self):
        # FIXME raised exceptions in this functions will be in the context of a
        # thread that is not running in the sjRPC, therefore these won't be
        # caught
        try:
            self.client_sock, _ = self.sock.accept()
        except socket.timeout:
            logger.exception('Error for importing job: client did not connect')
            sys.stderr.write('Error for importing job: client did not connect\n')
            self.clean_fds()
            raise
        except socket.error:
            logger.exception('Error while accepting socket')
            sys.stderr.write('Error while accepting socket\n')
            self.clean_fds()
            raise

@@ -121,7 +124,7 @@ class ImportVolume(BaseThreadedJob):
                total_received = 0
                while True:
                    recv_buf = self.client_sock.recv(self.BUFFER_LEN - total_received)
                    # logger.debug('Received %d', len(recv_buf))
                    # sys.stderr.write('Received %d\n' % len(recv_buf))
                    if not recv_buf:  # EOF
                        # in case received in not empty, we will come back here
                        # once again and it returns EOF one more time
@@ -131,12 +134,12 @@ class ImportVolume(BaseThreadedJob):
                    if total_received == self.BUFFER_LEN:
                        break
            except socket.error:
                logger.exception('Error while receiving disk image')
                sys.stderr.write('Error while receiving disk image\n')
                self.clean_fds()
                raise
            buffer_ = b''.join(received)
            if not buffer_:
                logger.debug('Received EOF import job')
                sys.stderr.write('Received EOF import job\n')
                break
            checksum.update(buffer_)
            try:
@@ -146,12 +149,12 @@ class ImportVolume(BaseThreadedJob):
                to_send = buffer_
                while True:
                    written += self.disk.write(to_send)
                    # logger.debug('Written %s to disk', written)
                    # sys.stderr.write('Written %s to disk\n' % written)
                    to_send = buffer(buffer_, written)
                    if not to_send:
                        break
            except IOError:
                logger.exception('Error while writing image to disk')
                sys.stderr.write('Error while writing image to disk\n')
                self.clean_fds()
                raise

@@ -160,23 +163,23 @@ class ImportVolume(BaseThreadedJob):
        self.checksum = checksum.hexdigest()
        # clean the fds
        self.clean_fds()
        logger.debug('Volume import done')
        sys.stderr.write('Volume import done\n')


class ExportVolume(BaseThreadedJob):
class ExportVolume(BaseIOJob):
    """Export volume job.

    """
    BUFFER_LEN = 8192 * 16
    HASH = md5

    def __init__(self, job_manager, ev_loop, volume, raddr, rport):
    def __init__(self, job_manager, volume, raddr, rport):
        """
        :param volume: :class:`Volume` instance
        :param raddr: remote IP address
        :param rport: remote TCP port
        """
        BaseThreadedJob.__init__(self, job_manager, ev_loop)
        BaseIOJob.__init__(self, job_manager)

        # where to connect to send the volume
        self.raddr = raddr
@@ -189,6 +192,11 @@ class ExportVolume(BaseThreadedJob):
        self.sock = None
        self.disk = None

    @property
    def open_fds(self):
        return [fo.fileno() for fo in (self.sock, self.disk)
                if fo is not None]

    def clean_fds(self):
        if self.sock is not None:
            self.sock.close()
@@ -225,21 +233,21 @@ class ExportVolume(BaseThreadedJob):
            try:
                read = self.disk.read(self.BUFFER_LEN)
            except IOError:
                logger.exception('Error while reading from disk')
                sys.stderr.write('Error while reading from disk\n')
                self.clean_fds()
                break
            # read length may be less than BUFFER_LEN but we don't care as it
            # will go over TCP
            if not read:  # end of file
                # logger.debug('EOF, exported %d bytes', sent_count)
                # sys.stderr.write('EOF, exported %d bytes\n' % sent_count)
                break
            # sent_count += len(read)
            # logger.debug('Read %d from disk', len(read))
            # sys.stderr.write('Read %d from disk\n' % len(read))
            checksum.update(read)
            try:
                self.sock.sendall(read)
            except socket.error:
                logger.exception('Error while sending through socket')
                sys.stderr.write('Error while sending through socket\n')
                self.clean_fds()
                break