Skip to content
utils.py 9.58 KiB
Newer Older
import os
import sys
import errno
import signal
import resource
import threading
import subprocess
from collections import deque
from functools import wraps

import pyev

from cloudcontrol.common.client.utils import main_thread


logger = logging.getLogger(__name__)
Anael Beutot's avatar
Anael Beutot committed
def and_(iter_):
    """Do an and logic condition over the iterable element.

    :param iterable iter: meat for condition
    """
Anael Beutot's avatar
Anael Beutot committed
    for i in iter_:
        if not i:
            return False

    return True
def _main_thread(func):
    """EvPopen constructor decorator."""
    @wraps(func)
    def decorated(self, main_loop, *args, **kwargs):
        return main_loop.call_in_main_thread(func, self, main_loop, *args, **kwargs)
    return decorated


class EvPopen(subprocess.Popen):
    @_main_thread
    def __init__(self, main_loop, *args, **kwargs):
        """Class that acts as `subprocess.Popen` but uses libev child handling.
        :param main_loop: `NodeLoop` instance
        :param \*args: arguments for :py:class:`subprocess.Popen`
        :param \*\*kwargs: keyword arguments for :py:class:`subprocess.Popen`
        """
        self.main = main_loop

        try:
            subprocess.Popen.__init__(self, *args, **kwargs)
        except OSError as exc:
            if exc.errno == errno.ECHILD:
                # in case on some child failure on startup, subprocess will call
                # waitpid on child, but there is a great chance that, since
                # libev catchs SIGCHLD, the syscall is performed after libev
                # waitpid or retried after interrupted, causing it to fail with
                # ECHILD errno, in that latter case, we ignore the error and
                # return early
                raise RemoteExecutionError('Early child death')

        # check stdout, stderr fileno and create watchers if needed
        self.stdout_watcher = self.stderr_watcher = None
        self.child_watcher = self.main.evloop.child(self.pid, False,
                                                    self.child_cb)
        self.child_watcher.start()

        self._stdout_output = list()
        self._stderr_output = list()
        # take an optional event for other threads to wait for process
        # termination
        self.stdout_done = threading.Event()
        self.stderr_done = threading.Event()
        self.process_done = threading.Event()

    @main_thread
    def create_std_watchers(self):
        if self.stdout is not None:
            self.stdout_watcher = self.main.evloop.io(self.stdout,
                                                      pyev.EV_READ,
                                                      self.stdout_cb)
            self.stdout_watcher.start()
        else:
            self.stdout_done.set()
        if self.stderr is not None and self.stderr.fileno() != self.stdout.fileno():
            self.stderr_watcher = self.main.evloop.io(self.stderr,
                                                      pyev.EV_READ,
                                                      self.stderr_cb)
            self.stderr_watcher.start()
        else:
            self.stderr_done.set()

    def stdout_cb(self, watcher, revents):
        data = os.read(watcher.fd, 1024)
        if data:
            self._stdout_output.append(data)
        else:
            self.stdout_watcher.stop()
            self.stdout_watcher = None
            self.stdout.close()
            self.stdout = None
            self.stdout_done.set()

    def stderr_cb(self, watcher, revents):
        data = os.read(watcher.fd, 1024)
        if data:
            self._stderr_output.append(data)
        else:
            self.stderr_watcher.stop()
            self.stderr_watcher = None
            self.stderr.close()
            self.stderr = None
            self.stderr_done.set()

    def child_cb(self, watcher, revents):
        self._handle_exitstatus(self.child_watcher.rstatus)
        self.child_watcher.stop()
        self.child_watcher = None
        self.process_done.set()

    # overiding parent methods
    def _internal_poll(self, *args, **kwargs):
        # ignore all parameters
        return self.returncode

    def _communicate(self, stdin=None):
        self.create_std_watchers()

        if stdin:
            if self.stdin is None:
                logger.warning('Ignoring stdin input for %s', self)
            else:
                fd = self.stdin.fileno()
                while True:
                    count = os.write(fd, stdin)
                    if count == len(stdin):
                        self.stdin.close()
                        self.stdin = None
                        break
                    else:
                        stdin = stdin[:count]

        self.stdout_done.wait()
        self.stderr_done.wait()
        # FIXME handle universal newlines
        self.process_done.wait()

        return tuple(map(u''.join, (self._stdout_output, self._stderr_output)))

    def wait(self):
        self.process_done.wait()
        return self.returncode
    # end overiding

    def close(self):
        # stop std* watchers
        if self.stdout_watcher is not None:
            self.stdout_watcher.stop()
            self.stdout_watcher = None
        if self.stderr_watcher is not None:
            self.stderr_watcher.stop()
            self.stderr_watcher = None
        # close std* file objects if needed
        if self.stdin is not None:
            self.stdin.close()
            self.stdin = None
        if self.stdout is not None:
            self.stdout.close()
            self.stdout = None
        if self.stderr is not None:
            self.stderr.close()
            self.stderr = None
        if self.child_watcher is not None:
            self.child_watcher.stop()
            self.child_watcher = None
        if self.returncode is None:
            # we must kill the child
            self.kill()
            # libev handles zombies


def subproc_call(main_loop, args, stdin=None):
    """
    :param args: arguments for subprocess call
    :param stdin: stdin data as string
    """
    proc = EvPopen(main_loop, args, bufsize=4096, stdin=subprocess.PIPE,
                   stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
                   close_fds=True)
    result, _ = proc.communicate(stdin)
    if proc.returncode != 0:
        raise subprocess.CalledProcessError(proc.returncode,
                                            'Error while executing command')
    return result


class SocketBuffer(deque):
    """Holds bytes in a list.

    This class don't handle maximum size but instead give help like handling
    count automatically.
    """
    def __init__(self, max_len=8 * 64 * 1024):
        deque.__init__(self)
        self.max_len = max_len
        self.current_len = 0

    def append(self, x):
        deque.append(self, x)
        self.current_len += len(x)

    def appendleft(self, x):
        deque.appendleft(self, x)
        self.current_len += len(x)

    def clear(self):
        deque.clear(self)
        self.current_len = 0

    def extend(self, iterable):
        raise NotImplementedError

    def extendleft(self, iterable):
        raise NotImplementedError

    def pop(self):
        elt = deque.pop(self)
        self.current_len -= len(elt)
        return elt

    def popleft(self):
        elt = deque.popleft(self)
        self.current_len -= len(elt)
        return elt

    def remove(value):
        raise NotImplementedError

    def reverse(self):
        raise NotImplementedError

    def rotate(self, n):
        raise NotImplementedError

    def is_full(self):
        return self.current_len >= self.max_len

    def is_empty(self):
        return self.current_len == 0


class Singleton(type):
    """Singleton metaclass."""
    def __init__(cls, name, bases, dict):
        super(Singleton, cls).__init__(cls, bases, dict)
        cls._instance = None

    def __call__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super(Singleton, cls).__call__(*args, **kwargs)

        return cls._instance


def close_fds(exclude_fds=None, debug=False):
    """Close all fds uneeded fds in child when using fork.

    :param exclude_fds: list of file descriptors that should not be closed (0,
        1, 2 must not be set here, see debug)
    :param bool debug: indicates if std in/out should be left open (usually for
        debuging purpose)
    """
    # get max fd
    limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
    if limit == resource.RLIM_INFINITY:
        max_fd = 2048
    else:
        max_fd = limit

    if exclude_fds is None:
        exclude_fds = []
    if debug:
        exclude_fds += [0, 1, 2]  # debug

    for fd in xrange(max_fd, -1, -1):
        if fd in exclude_fds:
            continue
        try:
            os.close(fd)
        except OSError as exc:
            if exc.errno != errno.EBADF:
                raise
            # wasn't open

    if not debug:
        sys.stdin = open(os.devnull)
        sys.stdout = open(os.devnull, 'w')
        sys.stderr = open(os.devnull, 'w')
        assert sys.stdin.fileno() == 0
        assert sys.stdout.fileno() == 1
        assert sys.stderr.fileno() == 2


def set_signal_map(map_):
    """Set signal map in fork children.

    :param mapping map_: (signal code, handler)...
    :returns: old handlers as dict
    """
    previous_handlers = dict()

    for sig, handler in map_.iteritems():
        previous_handlers[sig] = signal.signal(sig, handler)

    return previous_handlers


sig_names = dict((k, v) for v, k in signal.__dict__.iteritems() if
                 v.startswith('SIG'))


def num_to_sig(num):
    """Returns signal name.

    :param num: signal number
    """
    return sig_names.get(num, 'Unknown signal')