Commit 871d1ab3 authored by Anael Beutot's avatar Anael Beutot
Browse files

Implemented BaseIOJob.

Uses a fork and permits to set ionice on child.

/!\ Don't use logging for logging as it would (maybe) end in a deadlock for the
child.
parent ef42c95f
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -36,3 +36,7 @@ class ConsoleAlreadyOpened(CCNodeError):

class VMMigrationError(CCNodeError):
    pass


class JobError(CCNodeError):
    pass
+161 −1
Original line number Diff line number Diff line
import errno
import logging
from threading import Thread
import os
import resource
import signal
import subprocess
import sys
from threading import Thread, Event

from cloudcontrol.node.exc import JobError


logger = logging.getLogger(__name__)
@@ -130,3 +138,155 @@ class BaseThreadedJob(Thread):

    def stop(self):
        self.running = False


class BaseIOJob(object):
    """Job that can set ionice and executes in a fork.

    When inherit, you must define open_fds property that list file descriptors
    that must be kept in the child and closed in the parent.

    .. warning::
        logging should not be used in the child as this would cause a deadlock
        to occur, see http://bugs.python.org/issue6721
    """

    IO_NICE = 7

    def __init__(self, job_manager):
        self.job_manager = job_manager

        #: job id
        self.id = job_manager.job_id.next()

        self.running = False
        # event for other thread to wait for job termination
        self.job_done = Event()

        self.fork_pid = None

    def pre_job(self):
        # overide in sub class
        pass

    def run_job(self):
        # overide in sub class
        pass

    def set_io_nice(self):
        try:
            subprocess.check_call(['ionice', '-n%d' % self.IO_NICE,
                                   '-p%d' % os.getpid()], close_fds=True)
        except subprocess.CalledProcessError as exc:
            sys.stderr.write('Cannot set ionice, return code %s\n' % exc.returncode)

    def close_fds(self):
        """Close all fds uneeded fds in children."""
        # get max fd
        limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
        if limit == resource.RLIM_INFINITY:
            max_fd = 2048
        else:
            max_fd = limit

        exclude_fd = self.open_fds
        if self.job_manager.main.config.debug:
            exclude_fd += [0, 1, 2]  # debug
        for fd in xrange(max_fd, -1, -1):
            if fd in exclude_fd:
                continue
            try:
                os.close(fd)
            except OSError as exc:
                if exc.errno != errno.EBADF:
                    raise
                # wasn't open
        if not self.job_manager.main.config.debug:
            sys.stdin = open(os.devnull)
            sys.stdout = open(os.devnull)
            sys.stderr = open(os.devnull)
            assert sys.stdin.fileno() == 0
            assert sys.stdout.fileno() == 1
            assert sys.stderr.fileno() == 2

    def reset_signal_mask(self):
        signal.signal(signal.SIGTERM, lambda *args: os._exit(1))
        signal.signal(signal.SIGUSR1, signal.SIG_IGN)
        signal.signal(signal.SIGINT, signal.SIG_IGN)

    def run(self):
        try:
            self.fork_pid = os.fork()
        except OSError as exc:
            logger.error('Cannot fork: %s', os.strerror(exc.errno))
            raise
        self.running = True

        if self.fork_pid == 0:
            # child
            self.reset_signal_mask()
            self.close_fds()
            self.set_io_nice()
            try:
                self.run_job()
            except:
                sys.stderr.write('Error during job\n')
                os._exit(1)
            else:
                sys.stderr.write('Job execution went well\n')
                os._exit(0)
        else:
            # close child fds
            for fd in self.open_fds:
                try:
                    os.close(fd)
                except OSError:
                    logger.error('Error while closing fds in parent')
                    raise

    def start(self):
        self.pre_job()
        self.run()

    def stop(self):
        try:
            os.kill(self.fork_pid, signal.SIGKILL)
        except OSError as exc:
            if exc.errno == errno.ESRCH:
                logger.debug('Child already killed')
                return

            logger.error('Cannot kill child for IO job: %s',
                         os.strerror(exc.errno))
            raise

    def notify(self):
        self.job_manager.notify(self)

    def wait(self):
        if self.fork_pid is None:
            return
        try:
            while True:
                try:
                    pid, return_status = os.waitpid(self.fork_pid, 0)
                except OSError as exc:
                    if exc.errno == errno.EINTR:
                        continue

                    logger.error('Error while waiting for child to terminate: %s',
                                 os.strerror(exc.errno))
                    raise
                else:
                    break
            assert pid == self.fork_pid
            if return_status >> 8 != 0:
                raise JobError('Exception during job, returned %s',
                               return_status)
        finally:
            self.fork_pid = None
            self.job_done.set()
            self.notify()

    def join(self):
        self.job_done.wait()