diff --git a/cloudcontrol/node/host/__init__.py b/cloudcontrol/node/host/__init__.py index f0b16e4ee9e1e796b73b361ee1ff4c1efca40c8c..3b86c3392e831dff2bbddf1d061266dfaf9e1b3c 100644 --- a/cloudcontrol/node/host/__init__.py +++ b/cloudcontrol/node/host/__init__.py @@ -41,7 +41,7 @@ from cloudcontrol.common.client.plugins import Base as BasePlugin, rpc_handler from cloudcontrol.common.jobs import JobsManager, JobsStore from cloudcontrol.common.helpers.logger import patch_logging; patch_logging() -from cloudcontrol.node.utils import EvPopen +from cloudcontrol.node.utils import execute from cloudcontrol.node.exc import RemoteExecutionError from cloudcontrol.node.host import tags from cloudcontrol.node.host.jobs import NodeJobsManagerInterface, ScriptJob @@ -279,43 +279,19 @@ class Handler(BasePlugin): @rpc_handler def execute_command(self, command, stdin=None): - """Execute an arbitrary shell command on the host. + command = ['/bin/sh', '-c', command] + rcode, output = execute(self.main, command, stdin) - :param string command: shell command to run - :param string stdin: string to use as stdin on command - """ - logger.debug('Executing command %s', command) - - try: - remote_command = EvPopen(self.main, command, close_fds=True, - shell=True, stdout=PIPE, stdin=PIPE, - stderr=STDOUT) - except Exception: - logger.exception('Error while starting subprocess for executing ' - ' command %s', command) - raise - - self.commands.add(remote_command) - - try: - stdout, _ = remote_command.communicate(stdin) - except Exception: - logger.exception('Error while communicating with subprocess for' - ' command %s', command) - raise - - self.commands.remove(remote_command) - - if remote_command.returncode != 0: + if rcode != 0: # 127 means command not found, 126 means not executable - if remote_command.returncode == 127: - raise RemoteExecutionError('Command not found: %s' % stdout) - elif remote_command.returncode == 126: + if rcode == 127: + raise RemoteExecutionError('Command not found: %s' % output) + elif rcode == 126: raise RemoteExecutionError('Command is not executable') + else: + raise RemoteExecutionError('Child exited with non zero status %s' % rcode) - raise RemoteExecutionError('Child exited with non zero status %s' % - remote_command.returncode) - return stdout + return output @rpc_handler def node_shutdown(self, reboot=True, gracefull=True): diff --git a/cloudcontrol/node/utils.py b/cloudcontrol/node/utils.py index 7af8de2fe6fb6b2289baece464847bbde4c5c422..c5be080df7b4c4ab599c4ce2f4d46cd0254c3c35 100644 --- a/cloudcontrol/node/utils.py +++ b/cloudcontrol/node/utils.py @@ -15,24 +15,14 @@ import os -import gc import sys -import types import errno import signal -import pickle import logging import resource import threading -import traceback import subprocess from collections import deque -from functools import wraps -from subprocess import _eintr_retry_call - -import pyev - -from cloudcontrol.common.client.utils import main_thread logger = logging.getLogger(__name__) @@ -50,288 +40,101 @@ def and_(iter_): return True -def _main_thread(func): - """EvPopen constructor decorator.""" - @wraps(func) - def decorated(self, main_loop, *args, **kwargs): - return main_loop.call_in_main_thread(func, self, main_loop, *args, **kwargs) - return decorated - - -class EvPopen(subprocess.Popen): - @_main_thread - def __init__(self, main_loop, *args, **kwargs): - """Class that acts as `subprocess.Popen` but uses libev child handling. - :param main_loop: `NodeLoop` instance - :param \*args: arguments for :py:class:`subprocess.Popen` - :param \*\*kwargs: keyword arguments for :py:class:`subprocess.Popen` - """ - self.main = main_loop - - # this could raise but don't worry about zombies, they will be collected - # by libev - subprocess.Popen.__init__(self, *args, **kwargs) - - # check stdout, stderr fileno and create watchers if needed - self.stdout_watcher = self.stderr_watcher = None - self.child_watcher = self.main.evloop.child(self.pid, False, - self.child_cb) - self.child_watcher.start() - - self._stdout_output = list() - self._stderr_output = list() - # take an optional event for other threads to wait for process - # termination - self.stdout_done = threading.Event() - self.stderr_done = threading.Event() - self.process_done = threading.Event() - - @main_thread - def create_std_watchers(self): - if self.stdout is not None: - self.stdout_watcher = self.main.evloop.io(self.stdout, - pyev.EV_READ, - self.stdout_cb) - self.stdout_watcher.start() - else: - self.stdout_done.set() - if self.stderr is not None and self.stderr.fileno() != self.stdout.fileno(): - self.stderr_watcher = self.main.evloop.io(self.stderr, - pyev.EV_READ, - self.stderr_cb) - self.stderr_watcher.start() - else: - self.stderr_done.set() - - def stdout_cb(self, watcher, revents): - data = os.read(watcher.fd, 1024) - if data: - self._stdout_output.append(data) - else: - self.stdout_watcher.stop() - self.stdout_watcher = None - self.stdout.close() - self.stdout = None - self.stdout_done.set() - - def stderr_cb(self, watcher, revents): - data = os.read(watcher.fd, 1024) - if data: - self._stderr_output.append(data) - else: - self.stderr_watcher.stop() - self.stderr_watcher = None - self.stderr.close() - self.stderr = None - self.stderr_done.set() - - def child_cb(self, watcher, revents): - self._handle_exitstatus(self.child_watcher.rstatus) - self.child_watcher.stop() - self.child_watcher = None - self.process_done.set() - - # overiding parent methods - def _internal_poll(self, *args, **kwargs): - # ignore all parameters - return self.returncode - - def _communicate(self, stdin=None): - self.create_std_watchers() - - if stdin: - if self.stdin is None: - logger.warning('Ignoring stdin input for %s', self) - else: - fd = self.stdin.fileno() - while True: - count = os.write(fd, stdin) - if count == len(stdin): - self.stdin.close() - self.stdin = None - break - else: - stdin = stdin[:count] - - self.stdout_done.wait() - self.stderr_done.wait() - # FIXME handle universal newlines - self.process_done.wait() - - return tuple(map(u''.join, (self._stdout_output, self._stderr_output))) - - # This is basically a copy-paste from stdlib subprocess module to - # prevent calling waitpid which would race with libev and would raise - # ECHILD - def _execute_child(self, args, executable, preexec_fn, close_fds, - cwd, env, universal_newlines, - startupinfo, creationflags, shell, - p2cread, p2cwrite, - c2pread, c2pwrite, - errread, errwrite): - """Execute program (POSIX version)""" - - if isinstance(args, types.StringTypes): - args = [args] - else: - args = list(args) - - if shell: - args = ["/bin/sh", "-c"] + args - - if executable is None: - executable = args[0] - - # For transferring possible exec failure from child to parent - # The first char specifies the exception type: 0 means - # OSError, 1 means some other error. - errpipe_read, errpipe_write = os.pipe() - try: - try: - self._set_cloexec_flag(errpipe_write) - - gc_was_enabled = gc.isenabled() - # Disable gc to avoid bug where gc -> file_dealloc -> - # write to stderr -> hang. http://bugs.python.org/issue1336 - gc.disable() - try: - self.pid = os.fork() - except: - if gc_was_enabled: - gc.enable() - raise - self._child_created = True - if self.pid == 0: - # Child - try: - # Close parent's pipe ends - if p2cwrite is not None: - os.close(p2cwrite) - if c2pread is not None: - os.close(c2pread) - if errread is not None: - os.close(errread) - os.close(errpipe_read) - - # Dup fds for child - if p2cread is not None: - os.dup2(p2cread, 0) - if c2pwrite is not None: - os.dup2(c2pwrite, 1) - if errwrite is not None: - os.dup2(errwrite, 2) - - # Close pipe fds. Make sure we don't close the same - # fd more than once, or standard fds. - if p2cread is not None and p2cread not in (0,): - os.close(p2cread) - if c2pwrite is not None and c2pwrite not in (p2cread, 1): - os.close(c2pwrite) - if errwrite is not None and errwrite not in (p2cread, c2pwrite, 2): - os.close(errwrite) - - # Close all other fds, if asked for - if close_fds: - self._close_fds(but=errpipe_write) - - if cwd is not None: - os.chdir(cwd) - - if preexec_fn: - preexec_fn() - - if env is None: - os.execvp(executable, args) - else: - os.execvpe(executable, args, env) - - except: - exc_type, exc_value, tb = sys.exc_info() - # Save the traceback and attach it to the exception object - exc_lines = traceback.format_exception(exc_type, - exc_value, - tb) - exc_value.child_traceback = ''.join(exc_lines) - os.write(errpipe_write, pickle.dumps(exc_value)) - - # This exitcode won't be reported to applications, so it - # really doesn't matter what we return. - os._exit(255) - - # Parent - if gc_was_enabled: - gc.enable() - finally: - # be sure the FD is closed no matter what - os.close(errpipe_write) - - if p2cread is not None and p2cwrite is not None: - os.close(p2cread) - if c2pwrite is not None and c2pread is not None: - os.close(c2pwrite) - if errwrite is not None and errread is not None: - os.close(errwrite) - - # Wait for exec to fail or succeed; possibly raising exception - # Exception limited to 1M - data = _eintr_retry_call(os.read, errpipe_read, 1048576) - finally: - # be sure the FD is closed no matter what - os.close(errpipe_read) - - if data != "": - # _eintr_retry_call(os.waitpid, self.pid, 0) - child_exception = pickle.loads(data) - for fd in (p2cwrite, c2pread, errread): - if fd is not None: - os.close(fd) - raise child_exception - - def wait(self): - self.process_done.wait() - return self.returncode - # end overiding - - def close(self): - # stop std* watchers - if self.stdout_watcher is not None: - self.stdout_watcher.stop() - self.stdout_watcher = None - if self.stderr_watcher is not None: - self.stderr_watcher.stop() - self.stderr_watcher = None - # close std* file objects if needed - if self.stdin is not None: - self.stdin.close() - self.stdin = None - if self.stdout is not None: - self.stdout.close() - self.stdout = None - if self.stderr is not None: - self.stderr.close() - self.stderr = None - if self.child_watcher is not None: - self.child_watcher.stop() - self.child_watcher = None - if self.returncode is None: - # we must kill the child - self.kill() - # libev handles zombies - - def subproc_call(main_loop, args, stdin=None): """ :param args: arguments for subprocess call :param stdin: stdin data as string """ - proc = EvPopen(main_loop, args, bufsize=4096, stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT, - close_fds=True) - result, _ = proc.communicate(stdin) - if proc.returncode != 0: - raise subprocess.CalledProcessError(proc.returncode, - 'Error while executing command') - return result + + rcode, output = execute(args, stdin) + + if rcode != 0: + raise subprocess.CalledProcessError(rcode, 'Error while executing command') + + return output + + +def execute(main_loop, args, stdin=None): + """Execute a command and return error code and command output. + + Warning: this function will block until the command exits. DO NOT CALL IT + IN THE MAIN THREAD. + + :param args: list of command arguments. First item is the command itself + :param stdin: string to pass as command standard input + """ + + # Create pipes to interact with child's standard IOs: + r_stdin, w_stdin = os.pipe() + r_stdout, w_stdout = os.pipe() + + child_is_terminated = threading.Event() + + # Callback to be called by libev when the child terminates: + def _cb_child_is_terminated(watcher, revents): + child_is_terminated.set() + watcher.stop() + + # Part executed in the main thread to allow child watcher to be properly + # installed: "It is permissible to install a Child watcher after the child + # has been forked (which implies it might have already exited), as long as + # the event loop isn't entered" - http://pythonhosted.org/pyev/Child.html + def _executed_in_main_thread(): + pid = os.fork() + + if pid == 0: # Child + os.dup2(r_stdin, sys.stdin.fileno()) + os.dup2(w_stdout, sys.stdout.fileno()) + os.dup2(w_stdout, sys.stderr.fileno()) + + close_fds(debug=True) + + os.execvp(args[0], args) + else: # Parent + os.close(r_stdin) + os.close(w_stdout) + + # Write stdin string to the child standard input: + buf = stdin + while buf: + written = os.write(w_stdin, buf) + buf = buf[written:] + + os.close(w_stdin) + + # Create a watcher to catch child termination and start it: + child_watcher = main_loop.evloop.child(pid, False, _cb_child_is_terminated) + child_watcher.start() + + # Returns the created watcher for two reasons: + # 1. Access to the child exit code and pid + # 2. Keep a reference on the watcher to prevent it to be garbage collected + return child_watcher + + child_watcher = main_loop.call_in_main_thread(_executed_in_main_thread) + + logger.debug('Executed command with pid %d: args: %s, stdin: %s', + child_watcher.pid, args, + '%d bytes' % len(stdin) if stdin is not None else 'no') + + # Read child's stdout: + stdout = '' + tmp = True + while tmp: + tmp = os.read(r_stdout, 2048) + stdout += tmp + os.close(r_stdout) + + child_is_terminated.wait() + + if os.WIFEXITED(child_watcher.rstatus): + rcode = os.WEXITSTATUS(child_watcher.rstatus) + else: + rcode = -os.WTERMSIG(child_watcher.rstatus) + + logger.debug('Command with pid %d returned with code %d', + child_watcher.pid, rcode) + return rcode, stdout class SocketBuffer(deque):