-
Antoine Millet authoredAntoine Millet authored
utils.py 7.37 KiB
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import errno
import signal
import logging
import resource
import threading
import subprocess
from collections import deque
logger = logging.getLogger(__name__)
def and_(iter_):
"""Do an and logic condition over the iterable element.
:param iterable iter: meat for condition
"""
for i in iter_:
if not i:
return False
return True
def subproc_call(main_loop, args, stdin=None):
"""
:param args: arguments for subprocess call
:param stdin: stdin data as string
"""
rcode, output = execute(args, stdin)
if rcode != 0:
raise subprocess.CalledProcessError(rcode, 'Error while executing command')
return output
def execute(main_loop, args, stdin=None):
"""Execute a command and return error code and command output.
Warning: this function will block until the command exits. DO NOT CALL IT
IN THE MAIN THREAD.
:param args: list of command arguments. First item is the command itself
:param stdin: string to pass as command standard input
"""
# Create pipes to interact with child's standard IOs:
r_stdin, w_stdin = os.pipe()
r_stdout, w_stdout = os.pipe()
child_is_terminated = threading.Event()
# Callback to be called by libev when the child terminates:
def _cb_child_is_terminated(watcher, revents):
child_is_terminated.set()
watcher.stop()
# Part executed in the main thread to allow child watcher to be properly
# installed: "It is permissible to install a Child watcher after the child
# has been forked (which implies it might have already exited), as long as
# the event loop isn't entered" - http://pythonhosted.org/pyev/Child.html
def _executed_in_main_thread():
pid = os.fork()
if pid == 0: # Child
os.dup2(r_stdin, sys.stdin.fileno())
os.dup2(w_stdout, sys.stdout.fileno())
os.dup2(w_stdout, sys.stderr.fileno())
close_fds(debug=True)
os.execvp(args[0], args)
else: # Parent
os.close(r_stdin)
os.close(w_stdout)
# Write stdin string to the child standard input:
buf = stdin
while buf:
written = os.write(w_stdin, buf)
buf = buf[written:]
os.close(w_stdin)
# Create a watcher to catch child termination and start it:
child_watcher = main_loop.evloop.child(pid, False, _cb_child_is_terminated)
child_watcher.start()
# Returns the created watcher for two reasons:
# 1. Access to the child exit code and pid
# 2. Keep a reference on the watcher to prevent it to be garbage collected
return child_watcher
child_watcher = main_loop.call_in_main_thread(_executed_in_main_thread)
logger.debug('Executed command with pid %d: args: %s, stdin: %s',
child_watcher.pid, args,
'%d bytes' % len(stdin) if stdin is not None else 'no')
# Read child's stdout:
stdout = ''
tmp = True
while tmp:
tmp = os.read(r_stdout, 2048)
stdout += tmp
os.close(r_stdout)
child_is_terminated.wait()
if os.WIFEXITED(child_watcher.rstatus):
rcode = os.WEXITSTATUS(child_watcher.rstatus)
else:
rcode = -os.WTERMSIG(child_watcher.rstatus)
logger.debug('Command with pid %d returned with code %d',
child_watcher.pid, rcode)
return rcode, stdout
class SocketBuffer(deque):
"""Holds bytes in a list.
This class don't handle maximum size but instead give help like handling
count automatically.
"""
def __init__(self, max_len=8 * 64 * 1024):
deque.__init__(self)
self.max_len = max_len
self.current_len = 0
def append(self, x):
deque.append(self, x)
self.current_len += len(x)
def appendleft(self, x):
deque.appendleft(self, x)
self.current_len += len(x)
def clear(self):
deque.clear(self)
self.current_len = 0
def extend(self, iterable):
raise NotImplementedError
def extendleft(self, iterable):
raise NotImplementedError
def pop(self):
elt = deque.pop(self)
self.current_len -= len(elt)
return elt
def popleft(self):
elt = deque.popleft(self)
self.current_len -= len(elt)
return elt
def remove(value):
raise NotImplementedError
def reverse(self):
raise NotImplementedError
def rotate(self, n):
raise NotImplementedError
def is_full(self):
return self.current_len >= self.max_len
def is_empty(self):
return self.current_len == 0
class Singleton(type):
"""Singleton metaclass."""
def __init__(cls, name, bases, dict):
super(Singleton, cls).__init__(cls, bases, dict)
cls._instance = None
def __call__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instance
def close_fds(exclude_fds=None, debug=False):
"""Close all fds uneeded fds in child when using fork.
:param exclude_fds: list of file descriptors that should not be closed (0,
1, 2 must not be set here, see debug)
:param bool debug: indicates if std in/out should be left open (usually for
debuging purpose)
"""
# get max fd
limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if limit == resource.RLIM_INFINITY:
max_fd = 2048
else:
max_fd = limit
if exclude_fds is None:
exclude_fds = []
if debug:
exclude_fds += [0, 1, 2] # debug
for fd in xrange(max_fd, -1, -1):
if fd in exclude_fds:
continue
try:
os.close(fd)
except OSError as exc:
if exc.errno != errno.EBADF:
raise
# wasn't open
if not debug:
sys.stdin = open(os.devnull)
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')
assert sys.stdin.fileno() == 0
assert sys.stdout.fileno() == 1
assert sys.stderr.fileno() == 2
def set_signal_map(map_):
"""Set signal map in fork children.
:param mapping map_: (signal code, handler)...
:returns: old handlers as dict
"""
previous_handlers = dict()
for sig, handler in map_.iteritems():
previous_handlers[sig] = signal.signal(sig, handler)
return previous_handlers
sig_names = dict((k, v) for v, k in signal.__dict__.iteritems() if
v.startswith('SIG'))
def num_to_sig(num):
"""Returns signal name.
:param num: signal number
"""
return sig_names.get(num, 'Unknown signal')