Commit 84920503 authored by Anael Beutot's avatar Anael Beutot
Browse files

Created an EvPopen class to replace Popen calls

It uses pyev child watcher to handles child termination instead of calling
waitpid.
parent 2c0b67c2
Loading
Loading
Loading
Loading
+160 −0
Original line number Diff line number Diff line
@@ -2,9 +2,19 @@ import os
import sys
import errno
import signal
import logging
import resource
import threading
import subprocess
from collections import deque
from functools import wraps

import pyev

from cloudcontrol.common.client.utils import main_thread


logger = logging.getLogger(__name__)


def and_(iter_):
@@ -19,6 +29,156 @@ def and_(iter_):
    return True


def _main_thread(func):
    """EvPopen constructor decorator."""
    @wraps(func)
    def decorated(self, main_loop, *args, **kwargs):
        return main_loop.call_in_main_thread(func, self, main_loop, *args, **kwargs)
    return decorated


class EvPopen(subprocess.Popen):
    @_main_thread
    def __init__(self, main_loop, *args, **kwargs):
        """Class that acts as `subprocess.Popen` but uses libev child handling.
        :param main_loop: `NodeLoop` instance
        :param \*args: arguments for :py:class:`subprocess.Popen`
        :param \*\*kwargs: keyword arguments for :py:class:`subprocess.Popen`
        """
        self.main = main_loop

        try:
            subprocess.Popen.__init__(self, *args, **kwargs)
        except OSError as exc:
            if exc.errno == errno.ECHILD:
                # in case on some child failure on startup, subprocess will call
                # waitpid on child, but there is a great chance that, since
                # libev catchs SIGCHLD, the syscall is performed after libev
                # waitpid or retried after interrupted, causing it to fail with
                # ECHILD errno, in that latter case, we ignore the error and
                # return early
                raise RemoteExecutionError('Early child death')

        # check stdout, stderr fileno and create watchers if needed
        self.stdout_watcher = self.stderr_watcher = None
        self.child_watcher = self.main.evloop.child(self.pid, False,
                                                    self.child_cb)
        self.child_watcher.start()

        self._stdout_output = list()
        self._stderr_output = list()
        # take an optional event for other threads to wait for process
        # termination
        self.stdout_done = threading.Event()
        self.stderr_done = threading.Event()
        self.process_done = threading.Event()

    @main_thread
    def create_std_watchers(self):
        if self.stdout is not None:
            self.stdout_watcher = self.main.evloop.io(self.stdout,
                                                      pyev.EV_READ,
                                                      self.stdout_cb)
            self.stdout_watcher.start()
        else:
            self.stdout_done.set()
        if self.stderr is not None and self.stderr.fileno() != self.stdout.fileno():
            self.stderr_watcher = self.main.evloop.io(self.stderr,
                                                      pyev.EV_READ,
                                                      self.stderr_cb)
            self.stderr_watcher.start()
        else:
            self.stderr_done.set()

    def stdout_cb(self, watcher, revents):
        data = os.read(watcher.fd, 1024)
        if data:
            self._stdout_output.append(data)
        else:
            self.stdout_watcher.stop()
            self.stdout_watcher = None
            self.stdout.close()
            self.stdout = None
            self.stdout_done.set()

    def stderr_cb(self, watcher, revents):
        data = os.read(watcher.fd, 1024)
        if data:
            self._stderr_output.append(data)
        else:
            self.stderr_watcher.stop()
            self.stderr_watcher = None
            self.stderr.close()
            self.stderr = None
            self.stderr_done.set()

    def child_cb(self, watcher, revents):
        self._handle_exitstatus(self.child_watcher.rstatus)
        self.child_watcher.stop()
        self.child_watcher = None
        self.process_done.set()

    # overiding parent methods
    def _internal_poll(self, *args, **kwargs):
        # ignore all parameters
        return self.returncode

    def _communicate(self, stdin=None):
        self.create_std_watchers()

        if stdin:
            if self.stdin is None:
                logger.warning('Ignoring stdin input for %s', self)
            else:
                fd = self.stdin.fileno()
                while True:
                    count = os.write(fd, stdin)
                    if count == len(stdin):
                        self.stdin.close()
                        self.stdin = None
                        break
                    else:
                        stdin = stdin[:count]

        self.stdout_done.wait()
        self.stderr_done.wait()
        # FIXME handle universal newlines
        self.process_done.wait()

        return tuple(map(u''.join, (self._stdout_output, self._stderr_output)))

    def wait(self):
        self.process_done.wait()
        return self.returncode
    # end overiding

    def close(self):
        # stop std* watchers
        if self.stdout_watcher is not None:
            self.stdout_watcher.stop()
            self.stdout_watcher = None
        if self.stderr_watcher is not None:
            self.stderr_watcher.stop()
            self.stderr_watcher = None
        # close std* file objects if needed
        if self.stdin is not None:
            self.stdin.close()
            self.stdin = None
        if self.stdout is not None:
            self.stdout.close()
            self.stdout = None
        if self.stderr is not None:
            self.stderr.close()
            self.stderr = None
        if self.child_watcher is not None:
            self.child_watcher.stop()
            self.child_watcher = None
        if self.returncode is None:
            # we must kill the child
            self.kill()
            # libev handles zombies


def subproc_call(args, stdin=None):
    """
    :param args: arguments for subprocess call