Skip to content
jobs.py 7.32 KiB
Newer Older
import io
import os
import socket
import logging
from hashlib import md5

from ccnode.jobs import BaseThreadedJob


logger = logging.getLogger(__name__)


class ImportVolume(BaseThreadedJob):
    """Import volume job.

    """
    BUFFER_LEN = 8192 * 16
    HASH = md5

    def __init__(self, job_manager, ev_loop, volume):
        BaseThreadedJob.__init__(self, job_manager, ev_loop)

        self.checksum = None
        self.volume = volume
        # where the other node will connect
        self.port = None

        # fds
        self.sock = None
        self.client_sock = None
        self.disk = None

    def clean_fds(self):
        if self.sock is not None:
            self.sock.close()
            self.sock = None
        if self.client_sock is not None:
            self.client_sock.close()
            self.client_sock = None
        if self.disk is not None:
            self.disk.close()
            self.disk = None

    def pre_job(self):
        """
        :returns: port number the socket is listening on
        """
        # create socket
        try:
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except socket.error:
            logger.exception('Error while creating socket for volume export')
            self.clean_fds()
            raise
        try:
            self.sock.settimeout(10.)
        except socket.error:
            logger.exception('Cannot set timeout on socket for volume export')
            self.clean_fds()
            raise
        try:
            self.sock.bind(('0.0.0.0', 0))
        except socket.error:
            logger.exception('Error while binding socket for volume export')
            self.clean_fds()
            raise
        try:
            self.sock.listen(1)
        except socket.error:
            logger.exception('Error while listening on socket')
            self.clean_fds()
            raise

        # open local disk
        try:
            self.disk = io.open(self.volume.path, 'wb', 0)
        except IOError:
            logger.exception('Error while trying to open local disk')
            self.clean_fds()
            raise

        self.port = self.sock.getsockname()[1]
        return self.port

    def run_job(self):
        # FIXME raised exceptions in this functions will be in the context of a
        # thread that is not running in the sjRPC, therefore these won't be
        # caught
        try:
            self.client_sock, _ = self.sock.accept()
        except socket.timeout:
            logger.exception('Error for importing job: client did not connect')
            self.clean_fds()
            raise
        except socket.error:
            logger.exception('Error while accepting socket')
            self.clean_fds()
            raise

        # close the listening socket
        self.sock.close()
        self.sock = None

        checksum = self.HASH()

        # start downloading disk image
        while self.running:
            try:
                received = []  # keep a list of received buffers in order to do
                               # only one concatenation in the end
                total_received = 0
                while True:
                    recv_buf = self.client_sock.recv(self.BUFFER_LEN - total_received)
                    # logger.debug('Received %d', len(recv_buf))
                    if not recv_buf:  # EOF
                        # in case received in not empty, we will come back here
                        # once again and it returns EOF one more time
                        break
                    total_received += len(recv_buf)
                    received.append(recv_buf)
                    if total_received == self.BUFFER_LEN:
                        break
            except socket.error:
                logger.exception('Error while receiving disk image')
                self.clean_fds()
                raise
            buffer_ = b''.join(received)
            if not buffer_:
                logger.debug('Received EOF import job')
                break
            checksum.update(buffer_)
            try:
                written = 0
                # FIXME never write small chuncks
                # in which case does disk.write would not write all the buffer ?
                to_send = buffer_
                while True:
                    written += self.disk.write(to_send)
                    # logger.debug('Written %s to disk', written)
                    to_send = buffer(buffer_, written)
                    if not to_send:
                        break
            except IOError:
                logger.exception('Error while writing image to disk')
                self.clean_fds()
                raise

        # here we could not have received the full disk but we don't consider
        # this as an error in the import part
        self.checksum = checksum.hexdigest()
        # clean the fds
        self.clean_fds()
        logger.debug('Volume import done')


class ExportVolume(BaseThreadedJob):
    """Export volume job.

    """
    BUFFER_LEN = 8192 * 16
    HASH = md5

    def __init__(self, job_manager, ev_loop, volume, raddr, rport):
        """
        :param volume: :class:`Volume` instance
        :param raddr: remote IP address
        :param rport: remote TCP port
        """
        BaseThreadedJob.__init__(self, job_manager, ev_loop)

        # where to connect to send the volume
        self.raddr = raddr
        self.rport = rport

        self.volume = volume
        self.checksum = None

        # fds
        self.sock = None
        self.disk = None

    def clean_fds(self):
        if self.sock is not None:
            self.sock.close()
            self.sock = None
        if self.disk is not None:
            self.disk.close()
            self.disk = None

    def pre_job(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # connect to the remote host
        try:
            self.sock.connect((self.raddr, self.rport))
        except socket.error as exc:
            logger.exception('Error while trying to connect to remote host %s',
                            os.strerror(exc.errno))
            self.clean_fds()
            raise

        # open local volume
        try:
            self.disk = io.open(self.volume.path, 'rb', 0)
        except IOError:
            logger.exception('Error while opening disk for export job')
            self.clean_fds()
            raise

    def run_job(self):
        checksum = self.HASH()
        # sent_count = 0

        # do copy
        while self.running:
            try:
                read = self.disk.read(self.BUFFER_LEN)
            except IOError:
                logger.exception('Error while reading from disk')
                self.clean_fds()
                break
            # read length may be less than BUFFER_LEN but we don't care as it
            # will go over TCP
            if not read:  # end of file
                # logger.debug('EOF, exported %d bytes', sent_count)
                break
            # sent_count += len(read)
            # logger.debug('Read %d from disk', len(read))
            checksum.update(read)
            try:
                self.sock.sendall(read)
            except socket.error:
                logger.exception('Error while sending through socket')
                self.clean_fds()
                break


        self.checksum = checksum.hexdigest()
        self.clean_fds()