import os import sys import errno import signal import logging import resource import threading import subprocess from collections import deque from functools import wraps import pyev from cloudcontrol.common.client.utils import main_thread logger = logging.getLogger(__name__) def and_(iter_): """Do an and logic condition over the iterable element. :param iterable iter: meat for condition """ for i in iter_: if not i: return False return True def _main_thread(func): """EvPopen constructor decorator.""" @wraps(func) def decorated(self, main_loop, *args, **kwargs): return main_loop.call_in_main_thread(func, self, main_loop, *args, **kwargs) return decorated class EvPopen(subprocess.Popen): @_main_thread def __init__(self, main_loop, *args, **kwargs): """Class that acts as `subprocess.Popen` but uses libev child handling. :param main_loop: `NodeLoop` instance :param \*args: arguments for :py:class:`subprocess.Popen` :param \*\*kwargs: keyword arguments for :py:class:`subprocess.Popen` """ self.main = main_loop try: subprocess.Popen.__init__(self, *args, **kwargs) except OSError as exc: if exc.errno == errno.ECHILD: # in case on some child failure on startup, subprocess will call # waitpid on child, but there is a great chance that, since # libev catchs SIGCHLD, the syscall is performed after libev # waitpid or retried after interrupted, causing it to fail with # ECHILD errno, in that latter case, we ignore the error and # return early raise RemoteExecutionError('Early child death') # check stdout, stderr fileno and create watchers if needed self.stdout_watcher = self.stderr_watcher = None self.child_watcher = self.main.evloop.child(self.pid, False, self.child_cb) self.child_watcher.start() self._stdout_output = list() self._stderr_output = list() # take an optional event for other threads to wait for process # termination self.stdout_done = threading.Event() self.stderr_done = threading.Event() self.process_done = threading.Event() @main_thread def create_std_watchers(self): if self.stdout is not None: self.stdout_watcher = self.main.evloop.io(self.stdout, pyev.EV_READ, self.stdout_cb) self.stdout_watcher.start() else: self.stdout_done.set() if self.stderr is not None and self.stderr.fileno() != self.stdout.fileno(): self.stderr_watcher = self.main.evloop.io(self.stderr, pyev.EV_READ, self.stderr_cb) self.stderr_watcher.start() else: self.stderr_done.set() def stdout_cb(self, watcher, revents): data = os.read(watcher.fd, 1024) if data: self._stdout_output.append(data) else: self.stdout_watcher.stop() self.stdout_watcher = None self.stdout.close() self.stdout = None self.stdout_done.set() def stderr_cb(self, watcher, revents): data = os.read(watcher.fd, 1024) if data: self._stderr_output.append(data) else: self.stderr_watcher.stop() self.stderr_watcher = None self.stderr.close() self.stderr = None self.stderr_done.set() def child_cb(self, watcher, revents): self._handle_exitstatus(self.child_watcher.rstatus) self.child_watcher.stop() self.child_watcher = None self.process_done.set() # overiding parent methods def _internal_poll(self, *args, **kwargs): # ignore all parameters return self.returncode def _communicate(self, stdin=None): self.create_std_watchers() if stdin: if self.stdin is None: logger.warning('Ignoring stdin input for %s', self) else: fd = self.stdin.fileno() while True: count = os.write(fd, stdin) if count == len(stdin): self.stdin.close() self.stdin = None break else: stdin = stdin[:count] self.stdout_done.wait() self.stderr_done.wait() # FIXME handle universal newlines self.process_done.wait() return tuple(map(u''.join, (self._stdout_output, self._stderr_output))) def wait(self): self.process_done.wait() return self.returncode # end overiding def close(self): # stop std* watchers if self.stdout_watcher is not None: self.stdout_watcher.stop() self.stdout_watcher = None if self.stderr_watcher is not None: self.stderr_watcher.stop() self.stderr_watcher = None # close std* file objects if needed if self.stdin is not None: self.stdin.close() self.stdin = None if self.stdout is not None: self.stdout.close() self.stdout = None if self.stderr is not None: self.stderr.close() self.stderr = None if self.child_watcher is not None: self.child_watcher.stop() self.child_watcher = None if self.returncode is None: # we must kill the child self.kill() # libev handles zombies def subproc_call(main_loop, args, stdin=None): """ :param args: arguments for subprocess call :param stdin: stdin data as string """ proc = EvPopen(main_loop, args, bufsize=4096, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) result, _ = proc.communicate(stdin) if proc.returncode != 0: raise subprocess.CalledProcessError(proc.returncode, 'Error while executing command') return result class SocketBuffer(deque): """Holds bytes in a list. This class don't handle maximum size but instead give help like handling count automatically. """ def __init__(self, max_len=8 * 64 * 1024): deque.__init__(self) self.max_len = max_len self.current_len = 0 def append(self, x): deque.append(self, x) self.current_len += len(x) def appendleft(self, x): deque.appendleft(self, x) self.current_len += len(x) def clear(self): deque.clear(self) self.current_len = 0 def extend(self, iterable): raise NotImplementedError def extendleft(self, iterable): raise NotImplementedError def pop(self): elt = deque.pop(self) self.current_len -= len(elt) return elt def popleft(self): elt = deque.popleft(self) self.current_len -= len(elt) return elt def remove(value): raise NotImplementedError def reverse(self): raise NotImplementedError def rotate(self, n): raise NotImplementedError def is_full(self): return self.current_len >= self.max_len def is_empty(self): return self.current_len == 0 class Singleton(type): """Singleton metaclass.""" def __init__(cls, name, bases, dict): super(Singleton, cls).__init__(cls, bases, dict) cls._instance = None def __call__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super(Singleton, cls).__call__(*args, **kwargs) return cls._instance def close_fds(exclude_fds=None, debug=False): """Close all fds uneeded fds in child when using fork. :param exclude_fds: list of file descriptors that should not be closed (0, 1, 2 must not be set here, see debug) :param bool debug: indicates if std in/out should be left open (usually for debuging purpose) """ # get max fd limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1] if limit == resource.RLIM_INFINITY: max_fd = 2048 else: max_fd = limit if exclude_fds is None: exclude_fds = [] if debug: exclude_fds += [0, 1, 2] # debug for fd in xrange(max_fd, -1, -1): if fd in exclude_fds: continue try: os.close(fd) except OSError as exc: if exc.errno != errno.EBADF: raise # wasn't open if not debug: sys.stdin = open(os.devnull) sys.stdout = open(os.devnull) sys.stderr = open(os.devnull) assert sys.stdin.fileno() == 0 assert sys.stdout.fileno() == 1 assert sys.stderr.fileno() == 2 def set_signal_map(map_): """Set signal map in fork children. :param mapping map_: (signal code, handler)... :returns: old handlers as dict """ previous_handlers = dict() for sig, handler in map_.iteritems(): previous_handlers[sig] = signal.signal(sig, handler) return previous_handlers sig_names = dict((k, v) for v, k in signal.__dict__.iteritems() if v.startswith('SIG')) def num_to_sig(num): """Returns signal name. :param num: signal number """ return sig_names.get(num, 'Unknown signal')