Commit f05cde25 authored by Antoine Millet's avatar Antoine Millet
Browse files

Rewrote external command execution

parent 62e4e1ce
Loading
Loading
Loading
Loading
+10 −34
Original line number Diff line number Diff line
@@ -41,7 +41,7 @@ from cloudcontrol.common.client.plugins import Base as BasePlugin, rpc_handler
from cloudcontrol.common.jobs import JobsManager, JobsStore
from cloudcontrol.common.helpers.logger import patch_logging; patch_logging()

from cloudcontrol.node.utils import EvPopen
from cloudcontrol.node.utils import execute
from cloudcontrol.node.exc import RemoteExecutionError
from cloudcontrol.node.host import tags
from cloudcontrol.node.host.jobs import NodeJobsManagerInterface, ScriptJob
@@ -279,43 +279,19 @@ class Handler(BasePlugin):

    @rpc_handler
    def execute_command(self, command, stdin=None):
        """Execute an arbitrary shell command on the host.
        command = ['/bin/sh', '-c', command]
        rcode, output = execute(self.main, command, stdin)

        :param string command: shell command to run
        :param string stdin: string to use as stdin on command
        """
        logger.debug('Executing command %s', command)

        try:
            remote_command = EvPopen(self.main, command, close_fds=True,
                                     shell=True, stdout=PIPE, stdin=PIPE,
                                     stderr=STDOUT)
        except Exception:
            logger.exception('Error while starting subprocess for executing '
                             ' command %s', command)
            raise

        self.commands.add(remote_command)

        try:
            stdout, _ = remote_command.communicate(stdin)
        except Exception:
            logger.exception('Error while communicating with subprocess for'
                             ' command %s', command)
            raise

        self.commands.remove(remote_command)

        if remote_command.returncode != 0:
        if rcode != 0:
            # 127 means command not found, 126 means not executable
            if remote_command.returncode == 127:
                raise RemoteExecutionError('Command not found: %s' % stdout)
            elif remote_command.returncode == 126:
            if rcode == 127:
                raise RemoteExecutionError('Command not found: %s' % output)
            elif rcode == 126:
                raise RemoteExecutionError('Command is not executable')
            else:
                raise RemoteExecutionError('Child exited with non zero status %s' % rcode)

            raise RemoteExecutionError('Child exited with non zero status %s' %
                                       remote_command.returncode)
        return stdout
        return output

    @rpc_handler
    def node_shutdown(self, reboot=True, gracefull=True):
+90 −287
Original line number Diff line number Diff line
@@ -15,24 +15,14 @@


import os
import gc
import sys
import types
import errno
import signal
import pickle
import logging
import resource
import threading
import traceback
import subprocess
from collections import deque
from functools import wraps
from subprocess import _eintr_retry_call

import pyev

from cloudcontrol.common.client.utils import main_thread


logger = logging.getLogger(__name__)
@@ -50,288 +40,101 @@ def and_(iter_):
    return True


def _main_thread(func):
    """EvPopen constructor decorator."""
    @wraps(func)
    def decorated(self, main_loop, *args, **kwargs):
        return main_loop.call_in_main_thread(func, self, main_loop, *args, **kwargs)
    return decorated
def subproc_call(main_loop, args, stdin=None):
    """
    :param args: arguments for subprocess call
    :param stdin: stdin data as string
    """

    rcode, output = execute(args, stdin)

    if rcode != 0:
        raise subprocess.CalledProcessError(rcode, 'Error while executing command')

    return output


def execute(main_loop, args, stdin=None):
    """Execute a command and return error code and command output.

class EvPopen(subprocess.Popen):
    @_main_thread
    def __init__(self, main_loop, *args, **kwargs):
        """Class that acts as `subprocess.Popen` but uses libev child handling.
        :param main_loop: `NodeLoop` instance
        :param \*args: arguments for :py:class:`subprocess.Popen`
        :param \*\*kwargs: keyword arguments for :py:class:`subprocess.Popen`
    Warning: this function will block until the command exits. DO NOT CALL IT
    IN THE MAIN THREAD.

    :param args: list of command arguments. First item is the command itself
    :param stdin: string to pass as command standard input
    """
        self.main = main_loop

        # this could raise but don't worry about zombies, they will be collected
        # by libev
        subprocess.Popen.__init__(self, *args, **kwargs)

        # check stdout, stderr fileno and create watchers if needed
        self.stdout_watcher = self.stderr_watcher = None
        self.child_watcher = self.main.evloop.child(self.pid, False,
                                                    self.child_cb)
        self.child_watcher.start()

        self._stdout_output = list()
        self._stderr_output = list()
        # take an optional event for other threads to wait for process
        # termination
        self.stdout_done = threading.Event()
        self.stderr_done = threading.Event()
        self.process_done = threading.Event()

    @main_thread
    def create_std_watchers(self):
        if self.stdout is not None:
            self.stdout_watcher = self.main.evloop.io(self.stdout,
                                                      pyev.EV_READ,
                                                      self.stdout_cb)
            self.stdout_watcher.start()
        else:
            self.stdout_done.set()
        if self.stderr is not None and self.stderr.fileno() != self.stdout.fileno():
            self.stderr_watcher = self.main.evloop.io(self.stderr,
                                                      pyev.EV_READ,
                                                      self.stderr_cb)
            self.stderr_watcher.start()
        else:
            self.stderr_done.set()

    def stdout_cb(self, watcher, revents):
        data = os.read(watcher.fd, 1024)
        if data:
            self._stdout_output.append(data)
        else:
            self.stdout_watcher.stop()
            self.stdout_watcher = None
            self.stdout.close()
            self.stdout = None
            self.stdout_done.set()

    def stderr_cb(self, watcher, revents):
        data = os.read(watcher.fd, 1024)
        if data:
            self._stderr_output.append(data)
        else:
            self.stderr_watcher.stop()
            self.stderr_watcher = None
            self.stderr.close()
            self.stderr = None
            self.stderr_done.set()

    def child_cb(self, watcher, revents):
        self._handle_exitstatus(self.child_watcher.rstatus)
        self.child_watcher.stop()
        self.child_watcher = None
        self.process_done.set()

    # overiding parent methods
    def _internal_poll(self, *args, **kwargs):
        # ignore all parameters
        return self.returncode

    def _communicate(self, stdin=None):
        self.create_std_watchers()

        if stdin:
            if self.stdin is None:
                logger.warning('Ignoring stdin input for %s', self)
            else:
                fd = self.stdin.fileno()
                while True:
                    count = os.write(fd, stdin)
                    if count == len(stdin):
                        self.stdin.close()
                        self.stdin = None
                        break
                    else:
                        stdin = stdin[:count]

        self.stdout_done.wait()
        self.stderr_done.wait()
        # FIXME handle universal newlines
        self.process_done.wait()

        return tuple(map(u''.join, (self._stdout_output, self._stderr_output)))

    # This is basically a copy-paste from stdlib subprocess module to
    # prevent calling waitpid which would race with libev and would raise
    # ECHILD
    def _execute_child(self, args, executable, preexec_fn, close_fds,
                       cwd, env, universal_newlines,
                       startupinfo, creationflags, shell,
                       p2cread, p2cwrite,
                       c2pread, c2pwrite,
                       errread, errwrite):
        """Execute program (POSIX version)"""

        if isinstance(args, types.StringTypes):
            args = [args]
        else:
            args = list(args)
    # Create pipes to interact with child's standard IOs:
    r_stdin, w_stdin = os.pipe()
    r_stdout, w_stdout = os.pipe()

        if shell:
            args = ["/bin/sh", "-c"] + args
    child_is_terminated = threading.Event()

        if executable is None:
            executable = args[0]
    # Callback to be called by libev when the child terminates:
    def _cb_child_is_terminated(watcher, revents):
        child_is_terminated.set()
        watcher.stop()

        # For transferring possible exec failure from child to parent
        # The first char specifies the exception type: 0 means
        # OSError, 1 means some other error.
        errpipe_read, errpipe_write = os.pipe()
        try:
            try:
                self._set_cloexec_flag(errpipe_write)
    # Part executed in the main thread to allow child watcher to be properly
    # installed: "It is permissible to install a Child watcher after the child
    # has been forked (which implies it might have already exited), as long as
    # the event loop isn't entered" - http://pythonhosted.org/pyev/Child.html
    def _executed_in_main_thread():
        pid = os.fork()

                gc_was_enabled = gc.isenabled()
                # Disable gc to avoid bug where gc -> file_dealloc ->
                # write to stderr -> hang.  http://bugs.python.org/issue1336
                gc.disable()
                try:
                    self.pid = os.fork()
                except:
                    if gc_was_enabled:
                        gc.enable()
                    raise
                self._child_created = True
                if self.pid == 0:
                    # Child
                    try:
                        # Close parent's pipe ends
                        if p2cwrite is not None:
                            os.close(p2cwrite)
                        if c2pread is not None:
                            os.close(c2pread)
                        if errread is not None:
                            os.close(errread)
                        os.close(errpipe_read)

                        # Dup fds for child
                        if p2cread is not None:
                            os.dup2(p2cread, 0)
                        if c2pwrite is not None:
                            os.dup2(c2pwrite, 1)
                        if errwrite is not None:
                            os.dup2(errwrite, 2)

                        # Close pipe fds.  Make sure we don't close the same
                        # fd more than once, or standard fds.
                        if p2cread is not None and p2cread not in (0,):
                            os.close(p2cread)
                        if c2pwrite is not None and c2pwrite not in (p2cread, 1):
                            os.close(c2pwrite)
                        if errwrite is not None and errwrite not in (p2cread, c2pwrite, 2):
                            os.close(errwrite)

                        # Close all other fds, if asked for
                        if close_fds:
                            self._close_fds(but=errpipe_write)

                        if cwd is not None:
                            os.chdir(cwd)

                        if preexec_fn:
                            preexec_fn()

                        if env is None:
                            os.execvp(executable, args)
                        else:
                            os.execvpe(executable, args, env)

                    except:
                        exc_type, exc_value, tb = sys.exc_info()
                        # Save the traceback and attach it to the exception object
                        exc_lines = traceback.format_exception(exc_type,
                                                               exc_value,
                                                               tb)
                        exc_value.child_traceback = ''.join(exc_lines)
                        os.write(errpipe_write, pickle.dumps(exc_value))

                    # This exitcode won't be reported to applications, so it
                    # really doesn't matter what we return.
                    os._exit(255)

                # Parent
                if gc_was_enabled:
                    gc.enable()
            finally:
                # be sure the FD is closed no matter what
                os.close(errpipe_write)

            if p2cread is not None and p2cwrite is not None:
                os.close(p2cread)
            if c2pwrite is not None and c2pread is not None:
                os.close(c2pwrite)
            if errwrite is not None and errread is not None:
                os.close(errwrite)

            # Wait for exec to fail or succeed; possibly raising exception
            # Exception limited to 1M
            data = _eintr_retry_call(os.read, errpipe_read, 1048576)
        finally:
            # be sure the FD is closed no matter what
            os.close(errpipe_read)

        if data != "":
            # _eintr_retry_call(os.waitpid, self.pid, 0)
            child_exception = pickle.loads(data)
            for fd in (p2cwrite, c2pread, errread):
                if fd is not None:
                    os.close(fd)
            raise child_exception

    def wait(self):
        self.process_done.wait()
        return self.returncode
    # end overiding

    def close(self):
        # stop std* watchers
        if self.stdout_watcher is not None:
            self.stdout_watcher.stop()
            self.stdout_watcher = None
        if self.stderr_watcher is not None:
            self.stderr_watcher.stop()
            self.stderr_watcher = None
        # close std* file objects if needed
        if self.stdin is not None:
            self.stdin.close()
            self.stdin = None
        if self.stdout is not None:
            self.stdout.close()
            self.stdout = None
        if self.stderr is not None:
            self.stderr.close()
            self.stderr = None
        if self.child_watcher is not None:
            self.child_watcher.stop()
            self.child_watcher = None
        if self.returncode is None:
            # we must kill the child
            self.kill()
            # libev handles zombies
        if pid == 0:  # Child
            os.dup2(r_stdin, sys.stdin.fileno())
            os.dup2(w_stdout, sys.stdout.fileno())
            os.dup2(w_stdout, sys.stderr.fileno())

            close_fds(debug=True)

def subproc_call(main_loop, args, stdin=None):
    """
    :param args: arguments for subprocess call
    :param stdin: stdin data as string
    """
    proc = EvPopen(main_loop, args, bufsize=4096, stdin=subprocess.PIPE,
                   stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
                   close_fds=True)
    result, _ = proc.communicate(stdin)
    if proc.returncode != 0:
        raise subprocess.CalledProcessError(proc.returncode,
                                            'Error while executing command')
    return result
            os.execvp(args[0], args)
        else:  # Parent
            os.close(r_stdin)
            os.close(w_stdout)

            # Write stdin string to the child standard input:
            buf = stdin
            while buf:
                written = os.write(w_stdin, buf)
                buf = buf[written:]

            os.close(w_stdin)

            # Create a watcher to catch child termination and start it:
            child_watcher = main_loop.evloop.child(pid, False, _cb_child_is_terminated)
            child_watcher.start()

            # Returns the created watcher for two reasons:
            #  1. Access to the child exit code and pid
            #  2. Keep a reference on the watcher to prevent it to be garbage collected
            return child_watcher

    child_watcher = main_loop.call_in_main_thread(_executed_in_main_thread)

    logger.debug('Executed command with pid %d: args: %s, stdin: %s',
                 child_watcher.pid, args,
                 '%d bytes' % len(stdin) if stdin is not None else 'no')

    # Read child's stdout:
    stdout = ''
    tmp = True
    while tmp:
        tmp = os.read(r_stdout, 2048)
        stdout += tmp
    os.close(r_stdout)

    child_is_terminated.wait()

    if os.WIFEXITED(child_watcher.rstatus):
        rcode = os.WEXITSTATUS(child_watcher.rstatus)
    else:
        rcode = -os.WTERMSIG(child_watcher.rstatus)

    logger.debug('Command with pid %d returned with code %d',
                 child_watcher.pid, rcode)
    return rcode, stdout


class SocketBuffer(deque):