Commit 1baa8d96 authored by Anael Beutot's avatar Anael Beutot
Browse files

Replaced remote command execution using EvPopen

parent 842ff9c2
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -49,3 +49,8 @@ class VMMigrationError(CCNodeError):
class JobError(CCNodeError):
    """General exception for a job."""
    pass


class RemoteExecutionError(CCNodeError):
    """Thrown when a remote command execution error occurs."""
    pass
+34 −8
Original line number Diff line number Diff line
@@ -14,9 +14,10 @@ import tempfile
import threading
from fcntl import ioctl
import cPickle as pickle
from itertools import imap
from itertools import imap, chain
from subprocess import Popen, PIPE, STDOUT

import pyev
from sjrpc.utils import pass_connection, threadless
from sjrpc.core.exceptions import RpcError
from cloudcontrol.common.client.tags import Tag, tag_inspector, TagDB
@@ -24,6 +25,8 @@ from cloudcontrol.common.client.plugins import Base as BasePlugin
from cloudcontrol.common.jobs import JobsManager, JobsStore
from cloudcontrol.common.helpers.logger import patch_logging; patch_logging()

from cloudcontrol.node.utils import EvPopen
from cloudcontrol.node.exc import RemoteExecutionError
from cloudcontrol.node.host import tags
from cloudcontrol.node.host.jobs import NodeJobsManagerInterface, ScriptJob
from cloudcontrol.node.host.plugins import PluginMethodJob, Plugin
@@ -188,6 +191,8 @@ class Handler(BasePlugin):
        ))
        # running shells
        self.shells = dict()
        # running remote commands
        self.commands = set()

        #: jobs manager (different from MainLoop.jobs_manager)
        try:
@@ -242,10 +247,13 @@ class Handler(BasePlugin):
    def stop(self):
        # remove script cache directory
        shutil.rmtree(self.scripts_dir, ignore_errors=True)
        # kill all currently running shells
        for shell in self.shells.values():
            shell.close()
        # kill all currently running shells and commands
        logger.debug('Kill currently running shells and commands')
        for stuff in chain(self.shells.values(),
                           list(self.commands)):
            stuff.close()
        self.shells.clear()
        self.commands.clear()
        BasePlugin.stop(self)

    def update_plugin_index(self):
@@ -266,14 +274,32 @@ class Handler(BasePlugin):

        :param string command: shell command to run
        """
        # return stdout and stderr mixed in
        logger.debug('Executing command %s', command)

        try:
            remote_command = EvPopen(self.main, command, close_fds=True,
                                     shell=True, stdout=PIPE, stderr=STDOUT)
        except Exception:
            logger.exception('Error while starting subprocess for executing '
                             ' command %s', command)
            raise

        self.commands.add(remote_command)

        try:
            return Popen(command, shell=True, bufsize=-1, stdin=PIPE, stdout=PIPE,
                         stderr=STDOUT).communicate()[0] or None
            stdout, _ = remote_command.communicate()
        except Exception:
            logger.exception('Error while executing a remote command')
            logger.exception('Error while communicating with subprocess for'
                             ' command %s', command)
            raise

        self.commands.remove(remote_command)

        if remote_command.returncode != 0:
            raise RemoteExecutionError('Child exited with non zero status %s' %
                                       remote_command.returncode)
        return stdout

    def node_shutdown(self, reboot=True, gracefull=True):
        """Halt/Reboot the node.