Newer
Older
import os
import sys
import errno
from functools import wraps
import pyev
from cloudcontrol.common.client.utils import main_thread
logger = logging.getLogger(__name__)
"""Do an and logic condition over the iterable element.
:param iterable iter: meat for condition
"""
if not i:
return False
return True
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def _main_thread(func):
"""EvPopen constructor decorator."""
@wraps(func)
def decorated(self, main_loop, *args, **kwargs):
return main_loop.call_in_main_thread(func, self, main_loop, *args, **kwargs)
return decorated
class EvPopen(subprocess.Popen):
@_main_thread
def __init__(self, main_loop, *args, **kwargs):
"""Class that acts as `subprocess.Popen` but uses libev child handling.
:param main_loop: `NodeLoop` instance
:param \*args: arguments for :py:class:`subprocess.Popen`
:param \*\*kwargs: keyword arguments for :py:class:`subprocess.Popen`
"""
self.main = main_loop
try:
subprocess.Popen.__init__(self, *args, **kwargs)
except OSError as exc:
if exc.errno == errno.ECHILD:
# in case on some child failure on startup, subprocess will call
# waitpid on child, but there is a great chance that, since
# libev catchs SIGCHLD, the syscall is performed after libev
# waitpid or retried after interrupted, causing it to fail with
# ECHILD errno, in that latter case, we ignore the error and
# return early
raise RemoteExecutionError('Early child death')
# check stdout, stderr fileno and create watchers if needed
self.stdout_watcher = self.stderr_watcher = None
self.child_watcher = self.main.evloop.child(self.pid, False,
self.child_cb)
self.child_watcher.start()
self._stdout_output = list()
self._stderr_output = list()
# take an optional event for other threads to wait for process
# termination
self.stdout_done = threading.Event()
self.stderr_done = threading.Event()
self.process_done = threading.Event()
@main_thread
def create_std_watchers(self):
if self.stdout is not None:
self.stdout_watcher = self.main.evloop.io(self.stdout,
pyev.EV_READ,
self.stdout_cb)
self.stdout_watcher.start()
else:
self.stdout_done.set()
if self.stderr is not None and self.stderr.fileno() != self.stdout.fileno():
self.stderr_watcher = self.main.evloop.io(self.stderr,
pyev.EV_READ,
self.stderr_cb)
self.stderr_watcher.start()
else:
self.stderr_done.set()
def stdout_cb(self, watcher, revents):
data = os.read(watcher.fd, 1024)
if data:
self._stdout_output.append(data)
else:
self.stdout_watcher.stop()
self.stdout_watcher = None
self.stdout.close()
self.stdout = None
self.stdout_done.set()
def stderr_cb(self, watcher, revents):
data = os.read(watcher.fd, 1024)
if data:
self._stderr_output.append(data)
else:
self.stderr_watcher.stop()
self.stderr_watcher = None
self.stderr.close()
self.stderr = None
self.stderr_done.set()
def child_cb(self, watcher, revents):
self._handle_exitstatus(self.child_watcher.rstatus)
self.child_watcher.stop()
self.child_watcher = None
self.process_done.set()
# overiding parent methods
def _internal_poll(self, *args, **kwargs):
# ignore all parameters
return self.returncode
def _communicate(self, stdin=None):
self.create_std_watchers()
if stdin:
if self.stdin is None:
logger.warning('Ignoring stdin input for %s', self)
else:
fd = self.stdin.fileno()
while True:
count = os.write(fd, stdin)
if count == len(stdin):
self.stdin.close()
self.stdin = None
break
else:
stdin = stdin[:count]
self.stdout_done.wait()
self.stderr_done.wait()
# FIXME handle universal newlines
self.process_done.wait()
return tuple(map(u''.join, (self._stdout_output, self._stderr_output)))
def wait(self):
self.process_done.wait()
return self.returncode
# end overiding
def close(self):
# stop std* watchers
if self.stdout_watcher is not None:
self.stdout_watcher.stop()
self.stdout_watcher = None
if self.stderr_watcher is not None:
self.stderr_watcher.stop()
self.stderr_watcher = None
# close std* file objects if needed
if self.stdin is not None:
self.stdin.close()
self.stdin = None
if self.stdout is not None:
self.stdout.close()
self.stdout = None
if self.stderr is not None:
self.stderr.close()
self.stderr = None
if self.child_watcher is not None:
self.child_watcher.stop()
self.child_watcher = None
if self.returncode is None:
# we must kill the child
self.kill()
# libev handles zombies
def subproc_call(main_loop, args, stdin=None):
"""
:param args: arguments for subprocess call
:param stdin: stdin data as string
"""
proc = EvPopen(main_loop, args, bufsize=4096, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True)
result, _ = proc.communicate(stdin)
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode,
'Error while executing command')
return result
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
class SocketBuffer(deque):
"""Holds bytes in a list.
This class don't handle maximum size but instead give help like handling
count automatically.
"""
def __init__(self, max_len=8 * 64 * 1024):
deque.__init__(self)
self.max_len = max_len
self.current_len = 0
def append(self, x):
deque.append(self, x)
self.current_len += len(x)
def appendleft(self, x):
deque.appendleft(self, x)
self.current_len += len(x)
def clear(self):
deque.clear(self)
self.current_len = 0
def extend(self, iterable):
raise NotImplementedError
def extendleft(self, iterable):
raise NotImplementedError
def pop(self):
elt = deque.pop(self)
self.current_len -= len(elt)
return elt
def popleft(self):
elt = deque.popleft(self)
self.current_len -= len(elt)
return elt
def remove(value):
raise NotImplementedError
def reverse(self):
raise NotImplementedError
def rotate(self, n):
raise NotImplementedError
def is_full(self):
return self.current_len >= self.max_len
def is_empty(self):
return self.current_len == 0
class Singleton(type):
"""Singleton metaclass."""
def __init__(cls, name, bases, dict):
super(Singleton, cls).__init__(cls, bases, dict)
cls._instance = None
def __call__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instance
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def close_fds(exclude_fds=None, debug=False):
"""Close all fds uneeded fds in child when using fork.
:param exclude_fds: list of file descriptors that should not be closed (0,
1, 2 must not be set here, see debug)
:param bool debug: indicates if std in/out should be left open (usually for
debuging purpose)
"""
# get max fd
limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if limit == resource.RLIM_INFINITY:
max_fd = 2048
else:
max_fd = limit
if exclude_fds is None:
exclude_fds = []
if debug:
exclude_fds += [0, 1, 2] # debug
for fd in xrange(max_fd, -1, -1):
if fd in exclude_fds:
continue
try:
os.close(fd)
except OSError as exc:
if exc.errno != errno.EBADF:
raise
# wasn't open
if not debug:
sys.stdin = open(os.devnull)
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')
assert sys.stdin.fileno() == 0
assert sys.stdout.fileno() == 1
assert sys.stderr.fileno() == 2
def set_signal_map(map_):
"""Set signal map in fork children.
:param mapping map_: (signal code, handler)...
:returns: old handlers as dict
"""
previous_handlers = dict()
for sig, handler in map_.iteritems():
previous_handlers[sig] = signal.signal(sig, handler)
return previous_handlers
sig_names = dict((k, v) for v, k in signal.__dict__.iteritems() if
v.startswith('SIG'))
def num_to_sig(num):
"""Returns signal name.
:param num: signal number
"""
return sig_names.get(num, 'Unknown signal')