Newer
Older
import sys
import getopt
import os
import select
import errno
import time
import atexit
from threading import Thread
import libvirt
#: connection to the libvirt
connection = None # TODO create a watcher thread for this
#: corresponding name for enum state of libvirt domains
# see http://libvirt.org/html/libvirt-libvirt.html#virDomainState
DOMAIN_STATES = (
'stopped', # 0 no state
'running', # 1 running
'running', # 2 blocked
'paused', # 3 paused
'running', # 4 shutdown
'stopped', # 5 shuttoff
'stopped', # 6 crashed
'unknown', # 7 ??
)
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
#: libvirt envents
EVENTS = (
'Added',
'Removed',
'Started',
'Suspended',
'Resumed',
'Stopped',
'Saved',
'Restored',
)
# following classes are used only for libvirt eventloop
class virEventLoopPureHandle(object):
"""This class contains the data we need to track for a single file handle.
"""
def __init__(self, handle, fd, events, cb, opaque):
self.handle = handle
self.fd = fd
self.events = events
self.cb = cb
self.opaque = opaque
def get_id(self):
return self.handle
def get_fd(self):
return self.fd
def get_events(self):
return self.events
def set_events(self, events):
self.events = events
def dispatch(self, events):
self.cb(self.handle,
self.fd,
events,
self.opaque[0],
self.opaque[1])
class virEventLoopPureTimer(object):
"""This class contains the data we need to track for a single periodic
timer.
"""
def __init__(self, timer, interval, cb, opaque):
self.timer = timer
self.interval = interval
self.cb = cb
self.opaque = opaque
self.lastfired = 0
def get_id(self):
return self.timer
def get_interval(self):
return self.interval
def set_interval(self, interval):
self.interval = interval
def get_last_fired(self):
return self.lastfired
def set_last_fired(self, now):
self.lastfired = now
def dispatch(self):
self.cb(self.timer,
self.opaque[0],
self.opaque[1])
class EventLoop(object):
"""This general purpose event loop will support waiting for file handle
I/O and errors events, as well as scheduling repeatable timers with a fixed
interval.
It is a pure python implementation based around the poll() API.
"""
def __init__(self, debug=False):
self.debugOn = debug
self.poll = select.poll()
self.pipetrick = os.pipe()
self.nextHandleID = 1
self.nextTimerID = 1
self.handles = []
self.timers = []
self.quit = False
# The event loop can be used from multiple threads at once.
# Specifically while the main thread is sleeping in poll()
# waiting for events to occur, another thread may come along
# and add/update/remove a file handle, or timer. When this
# happens we need to interrupt the poll() sleep in the other
# thread, so that it'll see the file handle / timer changes.
#
# Using OS level signals for this is very unreliable and
# hard to implement correctly. Thus we use the real classic
# "self pipe" trick. A anonymous pipe, with one end registered
# with the event loop for input events. When we need to force
# the main thread out of a poll() sleep, we simple write a
# single byte of data to the other end of the pipe.
self.debug("Self pipe watch %d write %d" %(self.pipetrick[0], self.pipetrick[1]))
self.poll.register(self.pipetrick[0], select.POLLIN)
def debug(self, msg):
if self.debugOn:
print msg
def next_timeout(self):
"""Calculate when the next timeout is due to occurr, returning
the absolute timestamp for the next timeout, or 0 if there is
no timeout due.
"""
next = 0
for t in self.timers:
last = t.get_last_fired()
interval = t.get_interval()
if interval < 0:
continue
if next == 0 or (last + interval) < next:
next = last + interval
return next
def get_handle_by_fd(self, fd):
"""Lookup a virEventLoopPureHandle object based on file descriptor.
"""
for h in self.handles:
if h.get_fd() == fd:
return h
return None
def get_handle_by_id(self, handleID):
"""Lookup a virEventLoopPureHandle object based on its event loop ID.
"""
for h in self.handles:
if h.get_id() == handleID:
return h
return None
def run_once(self):
"""This is the heart of the event loop, performing one single
iteration. It asks when the next timeout is due, and then
calcuates the maximum amount of time it is able to sleep
for in poll() pending file handle events.
It then goes into the poll() sleep.
When poll() returns, there will zero or more file handle
events which need to be dispatched to registered callbacks
It may also be time to fire some periodic timers.
Due to the coarse granularity of schedular timeslices, if
we ask for a sleep of 500ms in order to satisfy a timer, we
may return upto 1 schedular timeslice early. So even though
our sleep timeout was reached, the registered timer may not
technically be at its expiry point. This leads to us going
back around the loop with a crazy 5ms sleep. So when checking
if timeouts are due, we allow a margin of 20ms, to avoid
these pointless repeated tiny sleeps.
"""
sleep = -1
next = self.next_timeout()
self.debug("Next timeout due at %d" % next)
if next > 0:
now = int(time.time() * 1000)
if now >= next:
sleep = 0
else:
sleep = next - now
self.debug("Poll with a sleep of %d" % sleep)
events = self.poll.poll(sleep)
# Dispatch any file handle events that occurred
for (fd, revents) in events:
# See if the events was from the self-pipe
# telling us to wakup. if so, then discard
# the data just continue
if fd == self.pipetrick[0]:
data = os.read(fd, 1)
continue
h = self.get_handle_by_fd(fd)
if h:
self.debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents))
h.dispatch(self.events_from_poll(revents))
now = int(time.time() * 1000)
for t in self.timers:
interval = t.get_interval()
if interval < 0:
continue
want = t.get_last_fired() + interval
# Deduct 20ms, since schedular timeslice
# means we could be ever so slightly early
if now >= (want-20):
self.debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want)))
t.set_last_fired(now)
t.dispatch()
def interrupt(self):
os.write(self.pipetrick[1], 'c')
def add_handle(self, fd, events, cb, opaque):
"""Registers a new file handle 'fd', monitoring for 'events' (libvirt
event constants), firing the callback cb() when an event occurs.
Returns a unique integer identier for this handle, that should be
used to later update/remove it.
"""
handleID = self.nextHandleID + 1
self.nextHandleID = self.nextHandleID + 1
h = virEventLoopPureHandle(handleID, fd, events, cb, opaque)
self.handles.append(h)
self.poll.register(fd, self.events_to_poll(events))
self.interrupt()
self.debug("Add handle %d fd %d events %d" % (handleID, fd, events))
return handleID
def add_timer(self, interval, cb, opaque):
"""Registers a new timer with periodic expiry at 'interval' ms,
firing cb() each time the timer expires. If 'interval' is -1,
then the timer is registered, but not enabled.
Returns a unique integer identier for this handle, that should be
used to later update/remove it.
"""
timerID = self.nextTimerID + 1
self.nextTimerID = self.nextTimerID + 1
h = virEventLoopPureTimer(timerID, interval, cb, opaque)
self.timers.append(h)
self.interrupt()
self.debug("Add timer %d interval %d" % (timerID, interval))
return timerID
def update_handle(self, handleID, events):
"""Change the set of events to be monitored on the file handle.
"""
h = self.get_handle_by_id(handleID)
if h:
h.set_events(events)
self.poll.unregister(h.get_fd())
self.poll.register(h.get_fd(), self.events_to_poll(events))
self.interrupt()
self.debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events))
def update_timer(self, timerID, interval):
"""Change the periodic frequency of the timer.
"""
for h in self.timers:
if h.get_id() == timerID:
h.set_interval(interval);
self.interrupt()
self.debug("Update timer %d interval %d" % (timerID, interval))
break
def remove_handle(self, handleID):
"""Stop monitoring for events on the file handle.
"""
handles = []
for h in self.handles:
if h.get_id() == handleID:
self.poll.unregister(h.get_fd())
self.debug("Remove handle %d fd %d" % (handleID, h.get_fd()))
else:
handles.append(h)
self.handles = handles
self.interrupt()
def remove_timer(self, timerID):
"""Stop firing the periodic timer.
"""
timers = []
for h in self.timers:
if h.get_id() != timerID:
timers.append(h)
self.debug("Remove timer %d" % timerID)
self.timers = timers
self.interrupt()
def events_to_poll(self, events):
"""Convert from libvirt event constants, to poll() events constants.
"""
ret = 0
if events & libvirt.VIR_EVENT_HANDLE_READABLE:
ret |= select.POLLIN
if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
ret |= select.POLLOUT
if events & libvirt.VIR_EVENT_HANDLE_ERROR:
ret |= select.POLLERR
if events & libvirt.VIR_EVENT_HANDLE_HANGUP:
ret |= select.POLLHUP
return ret
def events_from_poll(self, events):
"""Convert from poll() event constants, to libvirt events constants.
"""
ret = 0
if events & select.POLLIN:
ret |= libvirt.VIR_EVENT_HANDLE_READABLE
if events & select.POLLOUT:
ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE
if events & select.POLLNVAL:
ret |= libvirt.VIR_EVENT_HANDLE_ERROR
if events & select.POLLERR:
ret |= libvirt.VIR_EVENT_HANDLE_ERROR
if events & select.POLLHUP:
ret |= libvirt.VIR_EVENT_HANDLE_HANGUP
return ret
def register_callbacks(self, *callbacks):
"""Register a callback."""
global connection
for c in callbacks:
connection.domainEventRegister(c, None) # TODO find out args
def run(self):
"""Run the loop."""
self.quit = False
while not self.quit:
self.run_once()
def start(self, daemonize=True):
"""Run the loop in a background thread."""
t = Thread(target=self.run)
t.daemon = True
t.start()
self.thread = t
return t