Skip to content
utils.py 7.56 KiB
Newer Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl.  If not, see <http://www.gnu.org/licenses/>.


import os
import sys
import errno
import signal
import resource
import subprocess
from collections import deque


logger = logging.getLogger(__name__)
Anael Beutot's avatar
Anael Beutot committed
def and_(iter_):
    """Do an and logic condition over the iterable element.

    :param iterable iter: meat for condition
    """
Anael Beutot's avatar
Anael Beutot committed
    for i in iter_:
        if not i:
            return False

    return True
def subproc_call(main_loop, args, stdin=None):
    """
    :param args: arguments for subprocess call
    :param stdin: stdin data as string
    """

    rcode, output = execute(args, stdin)

    if rcode != 0:
        raise subprocess.CalledProcessError(rcode, 'Error while executing command')

    return output


def execute(main_loop, args, stdin=None):
    """Execute a command and return error code and command output.

    Warning: this function will block until the command exits. DO NOT CALL IT
    IN THE MAIN THREAD.

    :param args: list of command arguments. First item is the command itself
    :param stdin: string to pass as command standard input
    """

    # Create pipes to interact with child's standard IOs:
    r_stdin, w_stdin = os.pipe()
    r_stdout, w_stdout = os.pipe()

    child_is_terminated = threading.Event()

    # Callback to be called by libev when the child terminates:
    def _cb_child_is_terminated(watcher, revents):
        child_is_terminated.set()
        watcher.stop()

    # Part executed in the main thread to allow child watcher to be properly
    # installed: "It is permissible to install a Child watcher after the child
    # has been forked (which implies it might have already exited), as long as
    # the event loop isn't entered" - http://pythonhosted.org/pyev/Child.html
    def _executed_in_main_thread():
        pid = os.fork()

        if pid == 0:  # Child
            os.dup2(r_stdin, sys.stdin.fileno())
            os.dup2(w_stdout, sys.stdout.fileno())
            os.dup2(w_stdout, sys.stderr.fileno())

            close_fds(debug=True)

            try:
                os.execvp(args[0], args)
            except OSError as err:
                if err.errno == 2:
                    os._exit(127)
                os._exit(1)
            except:
                os._exit(1)
        else:  # Parent
            os.close(r_stdin)
            os.close(w_stdout)

            # Write stdin string to the child standard input:
            buf = stdin
            while buf:
                written = os.write(w_stdin, buf)
                buf = buf[written:]

            os.close(w_stdin)

            # Create a watcher to catch child termination and start it:
            child_watcher = main_loop.evloop.child(pid, False, _cb_child_is_terminated)
            child_watcher.start()

            # Returns the created watcher for two reasons:
            #  1. Access to the child exit code and pid
            #  2. Keep a reference on the watcher to prevent it to be garbage collected
            return child_watcher

    child_watcher = main_loop.call_in_main_thread(_executed_in_main_thread)

    logger.debug('Executed command with pid %d: args: %s, stdin: %s',
                 child_watcher.pid, args,
                 '%d bytes' % len(stdin) if stdin is not None else 'no')

    # Read child's stdout:
    stdout = ''
    tmp = True
    while tmp:
        tmp = os.read(r_stdout, 2048)
        stdout += tmp
    os.close(r_stdout)

    child_is_terminated.wait()

    if os.WIFEXITED(child_watcher.rstatus):
        rcode = os.WEXITSTATUS(child_watcher.rstatus)
    else:
        rcode = -os.WTERMSIG(child_watcher.rstatus)

    logger.debug('Command with pid %d returned with code %d',
                 child_watcher.pid, rcode)
    return rcode, stdout


class SocketBuffer(deque):
    """Holds bytes in a list.

    This class don't handle maximum size but instead give help like handling
    count automatically.
    """
    def __init__(self, max_len=8 * 64 * 1024):
        deque.__init__(self)
        self.max_len = max_len
        self.current_len = 0

    def append(self, x):
        deque.append(self, x)
        self.current_len += len(x)

    def appendleft(self, x):
        deque.appendleft(self, x)
        self.current_len += len(x)

    def clear(self):
        deque.clear(self)
        self.current_len = 0

    def extend(self, iterable):
        raise NotImplementedError

    def extendleft(self, iterable):
        raise NotImplementedError

    def pop(self):
        elt = deque.pop(self)
        self.current_len -= len(elt)
        return elt

    def popleft(self):
        elt = deque.popleft(self)
        self.current_len -= len(elt)
        return elt

    def remove(value):
        raise NotImplementedError

    def reverse(self):
        raise NotImplementedError

    def rotate(self, n):
        raise NotImplementedError

    def is_full(self):
        return self.current_len >= self.max_len

    def is_empty(self):
        return self.current_len == 0


class Singleton(type):
    """Singleton metaclass."""
    def __init__(cls, name, bases, dict):
        super(Singleton, cls).__init__(cls, bases, dict)
        cls._instance = None

    def __call__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super(Singleton, cls).__call__(*args, **kwargs)

        return cls._instance


def close_fds(exclude_fds=None, debug=False):
    """Close all fds uneeded fds in child when using fork.

    :param exclude_fds: list of file descriptors that should not be closed (0,
        1, 2 must not be set here, see debug)
    :param bool debug: indicates if std in/out should be left open (usually for
        debuging purpose)
    """
    # get max fd
    limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
    if limit == resource.RLIM_INFINITY:
        max_fd = 2048
    else:
        max_fd = limit

    if exclude_fds is None:
        exclude_fds = []
    if debug:
        exclude_fds += [0, 1, 2]  # debug

    for fd in xrange(max_fd, -1, -1):
        if fd in exclude_fds:
            continue
        try:
            os.close(fd)
        except OSError as exc:
            if exc.errno != errno.EBADF:
                raise
            # wasn't open

    if not debug:
        sys.stdin = open(os.devnull)
        sys.stdout = open(os.devnull, 'w')
        sys.stderr = open(os.devnull, 'w')
        assert sys.stdin.fileno() == 0
        assert sys.stdout.fileno() == 1
        assert sys.stderr.fileno() == 2


def set_signal_map(map_):
    """Set signal map in fork children.

    :param mapping map_: (signal code, handler)...
    :returns: old handlers as dict
    """
    previous_handlers = dict()

    for sig, handler in map_.iteritems():
        previous_handlers[sig] = signal.signal(sig, handler)

    return previous_handlers


sig_names = dict((k, v) for v, k in signal.__dict__.iteritems() if
                 v.startswith('SIG'))


def num_to_sig(num):
    """Returns signal name.

    :param num: signal number
    """
    return sig_names.get(num, 'Unknown signal')