# This file is part of CloudControl. # # CloudControl is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # CloudControl is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with CloudControl. If not, see . import os import sys import errno import signal import logging import resource import threading import subprocess from collections import deque logger = logging.getLogger(__name__) def and_(iter_): """Do an and logic condition over the iterable element. :param iterable iter: meat for condition """ for i in iter_: if not i: return False return True def subproc_call(main_loop, args, stdin=None): """ :param args: arguments for subprocess call :param stdin: stdin data as string """ rcode, output = execute(args, stdin) if rcode != 0: raise subprocess.CalledProcessError(rcode, 'Error while executing command') return output def execute(main_loop, args, stdin=None): """Execute a command and return error code and command output. Warning: this function will block until the command exits. DO NOT CALL IT IN THE MAIN THREAD. :param args: list of command arguments. First item is the command itself :param stdin: string to pass as command standard input """ # Create pipes to interact with child's standard IOs: r_stdin, w_stdin = os.pipe() r_stdout, w_stdout = os.pipe() child_is_terminated = threading.Event() # Callback to be called by libev when the child terminates: def _cb_child_is_terminated(watcher, revents): child_is_terminated.set() watcher.stop() # Part executed in the main thread to allow child watcher to be properly # installed: "It is permissible to install a Child watcher after the child # has been forked (which implies it might have already exited), as long as # the event loop isn't entered" - http://pythonhosted.org/pyev/Child.html def _executed_in_main_thread(): pid = os.fork() if pid == 0: # Child os.dup2(r_stdin, sys.stdin.fileno()) os.dup2(w_stdout, sys.stdout.fileno()) os.dup2(w_stdout, sys.stderr.fileno()) close_fds(debug=True) try: os.execvp(args[0], args) except OSError as err: if err.errno == 2: os._exit(127) os._exit(1) except: os._exit(1) else: # Parent os.close(r_stdin) os.close(w_stdout) # Write stdin string to the child standard input: buf = stdin while buf: written = os.write(w_stdin, buf) buf = buf[written:] os.close(w_stdin) # Create a watcher to catch child termination and start it: child_watcher = main_loop.evloop.child(pid, False, _cb_child_is_terminated) child_watcher.start() # Returns the created watcher for two reasons: # 1. Access to the child exit code and pid # 2. Keep a reference on the watcher to prevent it to be garbage collected return child_watcher child_watcher = main_loop.call_in_main_thread(_executed_in_main_thread) logger.debug('Executed command with pid %d: args: %s, stdin: %s', child_watcher.pid, args, '%d bytes' % len(stdin) if stdin is not None else 'no') # Read child's stdout: stdout = '' tmp = True while tmp: tmp = os.read(r_stdout, 2048) stdout += tmp os.close(r_stdout) child_is_terminated.wait() if os.WIFEXITED(child_watcher.rstatus): rcode = os.WEXITSTATUS(child_watcher.rstatus) else: rcode = -os.WTERMSIG(child_watcher.rstatus) logger.debug('Command with pid %d returned with code %d', child_watcher.pid, rcode) return rcode, stdout class SocketBuffer(deque): """Holds bytes in a list. This class don't handle maximum size but instead give help like handling count automatically. """ def __init__(self, max_len=8 * 64 * 1024): deque.__init__(self) self.max_len = max_len self.current_len = 0 def append(self, x): deque.append(self, x) self.current_len += len(x) def appendleft(self, x): deque.appendleft(self, x) self.current_len += len(x) def clear(self): deque.clear(self) self.current_len = 0 def extend(self, iterable): raise NotImplementedError def extendleft(self, iterable): raise NotImplementedError def pop(self): elt = deque.pop(self) self.current_len -= len(elt) return elt def popleft(self): elt = deque.popleft(self) self.current_len -= len(elt) return elt def remove(value): raise NotImplementedError def reverse(self): raise NotImplementedError def rotate(self, n): raise NotImplementedError def is_full(self): return self.current_len >= self.max_len def is_empty(self): return self.current_len == 0 class Singleton(type): """Singleton metaclass.""" def __init__(cls, name, bases, dict): super(Singleton, cls).__init__(cls, bases, dict) cls._instance = None def __call__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(Singleton, cls).__call__(*args, **kwargs) return cls._instance def close_fds(exclude_fds=None, debug=False): """Close all fds uneeded fds in child when using fork. :param exclude_fds: list of file descriptors that should not be closed (0, 1, 2 must not be set here, see debug) :param bool debug: indicates if std in/out should be left open (usually for debuging purpose) """ # get max fd limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1] if limit == resource.RLIM_INFINITY: max_fd = 2048 else: max_fd = limit if exclude_fds is None: exclude_fds = [] if debug: exclude_fds += [0, 1, 2] # debug for fd in xrange(max_fd, -1, -1): if fd in exclude_fds: continue try: os.close(fd) except OSError as exc: if exc.errno != errno.EBADF: raise # wasn't open if not debug: sys.stdin = open(os.devnull) sys.stdout = open(os.devnull, 'w') sys.stderr = open(os.devnull, 'w') assert sys.stdin.fileno() == 0 assert sys.stdout.fileno() == 1 assert sys.stderr.fileno() == 2 def set_signal_map(map_): """Set signal map in fork children. :param mapping map_: (signal code, handler)... :returns: old handlers as dict """ previous_handlers = dict() for sig, handler in map_.iteritems(): previous_handlers[sig] = signal.signal(sig, handler) return previous_handlers sig_names = dict((k, v) for v, k in signal.__dict__.iteritems() if v.startswith('SIG')) def num_to_sig(num): """Returns signal name. :param num: signal number """ return sig_names.get(num, 'Unknown signal')