Newer
Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import errno
logger = logging.getLogger(__name__)
"""Do an and logic condition over the iterable element.
:param iterable iter: meat for condition
"""
if not i:
return False
return True
def subproc_call(main_loop, args, stdin=None):
"""
:param args: arguments for subprocess call
:param stdin: stdin data as string
"""
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
rcode, output = execute(args, stdin)
if rcode != 0:
raise subprocess.CalledProcessError(rcode, 'Error while executing command')
return output
def execute(main_loop, args, stdin=None):
"""Execute a command and return error code and command output.
Warning: this function will block until the command exits. DO NOT CALL IT
IN THE MAIN THREAD.
:param args: list of command arguments. First item is the command itself
:param stdin: string to pass as command standard input
"""
# Create pipes to interact with child's standard IOs:
r_stdin, w_stdin = os.pipe()
r_stdout, w_stdout = os.pipe()
child_is_terminated = threading.Event()
# Callback to be called by libev when the child terminates:
def _cb_child_is_terminated(watcher, revents):
child_is_terminated.set()
watcher.stop()
# Part executed in the main thread to allow child watcher to be properly
# installed: "It is permissible to install a Child watcher after the child
# has been forked (which implies it might have already exited), as long as
# the event loop isn't entered" - http://pythonhosted.org/pyev/Child.html
def _executed_in_main_thread():
pid = os.fork()
if pid == 0: # Child
os.dup2(r_stdin, sys.stdin.fileno())
os.dup2(w_stdout, sys.stdout.fileno())
os.dup2(w_stdout, sys.stderr.fileno())
close_fds(debug=True)
try:
os.execvp(args[0], args)
except OSError as err:
if err.errno == 2:
os._exit(127)
os._exit(1)
except:
os._exit(1)
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
else: # Parent
os.close(r_stdin)
os.close(w_stdout)
# Write stdin string to the child standard input:
buf = stdin
while buf:
written = os.write(w_stdin, buf)
buf = buf[written:]
os.close(w_stdin)
# Create a watcher to catch child termination and start it:
child_watcher = main_loop.evloop.child(pid, False, _cb_child_is_terminated)
child_watcher.start()
# Returns the created watcher for two reasons:
# 1. Access to the child exit code and pid
# 2. Keep a reference on the watcher to prevent it to be garbage collected
return child_watcher
child_watcher = main_loop.call_in_main_thread(_executed_in_main_thread)
logger.debug('Executed command with pid %d: args: %s, stdin: %s',
child_watcher.pid, args,
'%d bytes' % len(stdin) if stdin is not None else 'no')
# Read child's stdout:
stdout = ''
tmp = True
while tmp:
tmp = os.read(r_stdout, 2048)
stdout += tmp
os.close(r_stdout)
child_is_terminated.wait()
if os.WIFEXITED(child_watcher.rstatus):
rcode = os.WEXITSTATUS(child_watcher.rstatus)
else:
rcode = -os.WTERMSIG(child_watcher.rstatus)
logger.debug('Command with pid %d returned with code %d',
child_watcher.pid, rcode)
return rcode, stdout
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class SocketBuffer(deque):
"""Holds bytes in a list.
This class don't handle maximum size but instead give help like handling
count automatically.
"""
def __init__(self, max_len=8 * 64 * 1024):
deque.__init__(self)
self.max_len = max_len
self.current_len = 0
def append(self, x):
deque.append(self, x)
self.current_len += len(x)
def appendleft(self, x):
deque.appendleft(self, x)
self.current_len += len(x)
def clear(self):
deque.clear(self)
self.current_len = 0
def extend(self, iterable):
raise NotImplementedError
def extendleft(self, iterable):
raise NotImplementedError
def pop(self):
elt = deque.pop(self)
self.current_len -= len(elt)
return elt
def popleft(self):
elt = deque.popleft(self)
self.current_len -= len(elt)
return elt
def remove(value):
raise NotImplementedError
def reverse(self):
raise NotImplementedError
def rotate(self, n):
raise NotImplementedError
def is_full(self):
return self.current_len >= self.max_len
def is_empty(self):
return self.current_len == 0
class Singleton(type):
"""Singleton metaclass."""
def __init__(cls, name, bases, dict):
super(Singleton, cls).__init__(cls, bases, dict)
cls._instance = None
def __call__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instance
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def close_fds(exclude_fds=None, debug=False):
"""Close all fds uneeded fds in child when using fork.
:param exclude_fds: list of file descriptors that should not be closed (0,
1, 2 must not be set here, see debug)
:param bool debug: indicates if std in/out should be left open (usually for
debuging purpose)
"""
# get max fd
limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if limit == resource.RLIM_INFINITY:
max_fd = 2048
else:
max_fd = limit
if exclude_fds is None:
exclude_fds = []
if debug:
exclude_fds += [0, 1, 2] # debug
for fd in xrange(max_fd, -1, -1):
if fd in exclude_fds:
continue
try:
os.close(fd)
except OSError as exc:
if exc.errno != errno.EBADF:
raise
# wasn't open
if not debug:
sys.stdin = open(os.devnull)
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')
assert sys.stdin.fileno() == 0
assert sys.stdout.fileno() == 1
assert sys.stderr.fileno() == 2
def set_signal_map(map_):
"""Set signal map in fork children.
:param mapping map_: (signal code, handler)...
:returns: old handlers as dict
"""
previous_handlers = dict()
for sig, handler in map_.iteritems():
previous_handlers[sig] = signal.signal(sig, handler)
return previous_handlers
sig_names = dict((k, v) for v, k in signal.__dict__.iteritems() if
v.startswith('SIG'))
def num_to_sig(num):
"""Returns signal name.
:param num: signal number
"""
return sig_names.get(num, 'Unknown signal')