import io import os import socket import logging from hashlib import md5 from ccnode.jobs import BaseJob logger = logging.getLogger(__name__) class ImportVolume(BaseJob): """Import volume job. """ BUFFER_LEN = 8192 * 16 HASH = md5 def __init__(self, job_manager, ev_loop, volume): BaseJob.__init__(self, job_manager, ev_loop) self.checksum = None self.volume = volume # where the other node will connect self.port = None # fds self.sock = None self.client_sock = None self.disk = None def clean_fds(self): if self.sock is not None: self.sock.close() self.sock = None if self.client_sock is not None: self.client_sock.close() self.client_sock = None if self.disk is not None: self.disk.close() self.disk = None def pre_job(self): """ :returns: port number the socket is listening on """ # create socket try: self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) except socket.error: logger.exception('Error while creating socket for volume export') self.clean_fds() raise try: self.sock.settimeout(10.) except socket.error: logger.exception('Cannot set timeout on socket for volume export') self.clean_fds() raise try: self.sock.bind(('0.0.0.0', 0)) except socket.error: logger.exception('Error while binding socket for volume export') self.clean_fds() raise try: self.sock.listen(1) except socket.error: logger.exception('Error while listening on socket') self.clean_fds() raise # open local disk try: self.disk = io.open(self.volume.path, 'wb', 0) except IOError: logger.exception('Error while trying to open local disk') self.clean_fds() raise self.port = self.sock.getsockname()[1] return self.port def run_job(self): # FIXME raised exceptions in this functions will be in the context of a # thread that is not running in the sjRPC, therefore these won't be # caught try: self.client_sock, _ = self.sock.accept() except socket.timeout: logger.exception('Error for importing job: client did not connect') self.clean_fds() raise except socket.error: logger.exception('Error while accepting socket') self.clean_fds() raise # close the listening socket self.sock.close() self.sock = None checksum = self.HASH() # start downloading disk image while self.running: try: received = [] # keep a list of received buffers in order to do # only one concatenation in the end total_received = 0 while True: recv_buf = self.client_sock.recv(self.BUFFER_LEN - total_received) # logger.debug('Received %d', len(recv_buf)) if not recv_buf: # EOF # in case received in not empty, we will come back here # once again and it returns EOF one more time break total_received += len(recv_buf) received.append(recv_buf) if total_received == self.BUFFER_LEN: break except socket.error: logger.exception('Error while receiving disk image') self.clean_fds() raise buffer_ = b''.join(received) if not buffer_: logger.debug('Received EOF import job') break checksum.update(buffer_) try: written = 0 # FIXME never write small chuncks # in which case does disk.write would not write all the buffer ? to_send = buffer_ while True: written += self.disk.write(to_send) # logger.debug('Written %s to disk', written) to_send = buffer(buffer_, written) if not to_send: break except IOError: logger.exception('Error while writing image to disk') self.clean_fds() raise # here we could not have received the full disk but we don't consider # this as an error in the import part self.checksum = checksum.hexdigest() # clean the fds self.clean_fds() logger.debug('Volume import done') class ExportVolume(BaseJob): """Export volume job. """ BUFFER_LEN = 8192 * 16 HASH = md5 def __init__(self, job_manager, ev_loop, volume, raddr, rport): """ :param volume: :class:`Volume` instance :param raddr: remote IP address :param rport: remote TCP port """ BaseJob.__init__(self, job_manager, ev_loop) # where to connect to send the volume self.raddr = raddr self.rport = rport self.volume = volume self.checksum = None # fds self.sock = None self.disk = None def clean_fds(self): if self.sock is not None: self.sock.close() self.sock = None if self.disk is not None: self.disk.close() self.disk = None def pre_job(self): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # connect to the remote host try: self.sock.connect((self.raddr, self.rport)) except socket.error as exc: logger.exception('Error while trying to connect to remote host %s', os.strerror(exc.errno)) self.clean_fds() raise # open local volume try: self.disk = io.open(self.volume.path, 'rb', 0) except IOError: logger.exception('Error while opening disk for export job') self.clean_fds() raise def run_job(self): checksum = self.HASH() # sent_count = 0 # do copy while self.running: try: read = self.disk.read(self.BUFFER_LEN) except IOError: logger.exception('Error while reading from disk') self.clean_fds() break # read length may be less than BUFFER_LEN but we don't care as it # will go over TCP if not read: # end of file # logger.debug('EOF, exported %d bytes', sent_count) break # sent_count += len(read) # logger.debug('Read %d from disk', len(read)) checksum.update(read) try: self.sock.sendall(read) except socket.error: logger.exception('Error while sending through socket') self.clean_fds() break self.checksum = checksum.hexdigest() self.clean_fds()