"""Helpers for libvirt.""" import sys import getopt import os import select import errno import time import atexit from threading import Thread import libvirt #: connection to the libvirt connection = None # TODO create a watcher thread for this #: corresponding name for enum state of libvirt domains # see http://libvirt.org/html/libvirt-libvirt.html#virDomainState DOMAIN_STATES = ( 'stopped', # 0 no state 'running', # 1 running 'running', # 2 blocked 'paused', # 3 paused 'running', # 4 shutdown 'stopped', # 5 shuttoff 'stopped', # 6 crashed 'unknown', # 7 ?? ) #: libvirt envents EVENTS = ( 'Added', 'Removed', 'Started', 'Suspended', 'Resumed', 'Stopped', 'Saved', 'Restored', ) # following classes are used only for libvirt eventloop class virEventLoopPureHandle(object): """This class contains the data we need to track for a single file handle. """ def __init__(self, handle, fd, events, cb, opaque): self.handle = handle self.fd = fd self.events = events self.cb = cb self.opaque = opaque def get_id(self): return self.handle def get_fd(self): return self.fd def get_events(self): return self.events def set_events(self, events): self.events = events def dispatch(self, events): self.cb(self.handle, self.fd, events, self.opaque[0], self.opaque[1]) class virEventLoopPureTimer(object): """This class contains the data we need to track for a single periodic timer. """ def __init__(self, timer, interval, cb, opaque): self.timer = timer self.interval = interval self.cb = cb self.opaque = opaque self.lastfired = 0 def get_id(self): return self.timer def get_interval(self): return self.interval def set_interval(self, interval): self.interval = interval def get_last_fired(self): return self.lastfired def set_last_fired(self, now): self.lastfired = now def dispatch(self): self.cb(self.timer, self.opaque[0], self.opaque[1]) class EventLoop(object): """This general purpose event loop will support waiting for file handle I/O and errors events, as well as scheduling repeatable timers with a fixed interval. It is a pure python implementation based around the poll() API. """ def __init__(self, debug=False): self.debugOn = debug self.poll = select.poll() self.pipetrick = os.pipe() self.nextHandleID = 1 self.nextTimerID = 1 self.handles = [] self.timers = [] self.quit = False # The event loop can be used from multiple threads at once. # Specifically while the main thread is sleeping in poll() # waiting for events to occur, another thread may come along # and add/update/remove a file handle, or timer. When this # happens we need to interrupt the poll() sleep in the other # thread, so that it'll see the file handle / timer changes. # # Using OS level signals for this is very unreliable and # hard to implement correctly. Thus we use the real classic # "self pipe" trick. A anonymous pipe, with one end registered # with the event loop for input events. When we need to force # the main thread out of a poll() sleep, we simple write a # single byte of data to the other end of the pipe. self.debug("Self pipe watch %d write %d" %(self.pipetrick[0], self.pipetrick[1])) self.poll.register(self.pipetrick[0], select.POLLIN) def debug(self, msg): if self.debugOn: print msg def next_timeout(self): """Calculate when the next timeout is due to occurr, returning the absolute timestamp for the next timeout, or 0 if there is no timeout due. """ next = 0 for t in self.timers: last = t.get_last_fired() interval = t.get_interval() if interval < 0: continue if next == 0 or (last + interval) < next: next = last + interval return next def get_handle_by_fd(self, fd): """Lookup a virEventLoopPureHandle object based on file descriptor. """ for h in self.handles: if h.get_fd() == fd: return h return None def get_handle_by_id(self, handleID): """Lookup a virEventLoopPureHandle object based on its event loop ID. """ for h in self.handles: if h.get_id() == handleID: return h return None def run_once(self): """This is the heart of the event loop, performing one single iteration. It asks when the next timeout is due, and then calcuates the maximum amount of time it is able to sleep for in poll() pending file handle events. It then goes into the poll() sleep. When poll() returns, there will zero or more file handle events which need to be dispatched to registered callbacks It may also be time to fire some periodic timers. Due to the coarse granularity of schedular timeslices, if we ask for a sleep of 500ms in order to satisfy a timer, we may return upto 1 schedular timeslice early. So even though our sleep timeout was reached, the registered timer may not technically be at its expiry point. This leads to us going back around the loop with a crazy 5ms sleep. So when checking if timeouts are due, we allow a margin of 20ms, to avoid these pointless repeated tiny sleeps. """ sleep = -1 next = self.next_timeout() self.debug("Next timeout due at %d" % next) if next > 0: now = int(time.time() * 1000) if now >= next: sleep = 0 else: sleep = next - now self.debug("Poll with a sleep of %d" % sleep) events = self.poll.poll(sleep) # Dispatch any file handle events that occurred for (fd, revents) in events: # See if the events was from the self-pipe # telling us to wakup. if so, then discard # the data just continue if fd == self.pipetrick[0]: data = os.read(fd, 1) continue h = self.get_handle_by_fd(fd) if h: self.debug("Dispatch fd %d handle %d events %d" % (fd, h.get_id(), revents)) h.dispatch(self.events_from_poll(revents)) now = int(time.time() * 1000) for t in self.timers: interval = t.get_interval() if interval < 0: continue want = t.get_last_fired() + interval # Deduct 20ms, since schedular timeslice # means we could be ever so slightly early if now >= (want-20): self.debug("Dispatch timer %d now %s want %s" % (t.get_id(), str(now), str(want))) t.set_last_fired(now) t.dispatch() def interrupt(self): os.write(self.pipetrick[1], 'c') def add_handle(self, fd, events, cb, opaque): """Registers a new file handle 'fd', monitoring for 'events' (libvirt event constants), firing the callback cb() when an event occurs. Returns a unique integer identier for this handle, that should be used to later update/remove it. """ handleID = self.nextHandleID + 1 self.nextHandleID = self.nextHandleID + 1 h = virEventLoopPureHandle(handleID, fd, events, cb, opaque) self.handles.append(h) self.poll.register(fd, self.events_to_poll(events)) self.interrupt() self.debug("Add handle %d fd %d events %d" % (handleID, fd, events)) return handleID def add_timer(self, interval, cb, opaque): """Registers a new timer with periodic expiry at 'interval' ms, firing cb() each time the timer expires. If 'interval' is -1, then the timer is registered, but not enabled. Returns a unique integer identier for this handle, that should be used to later update/remove it. """ timerID = self.nextTimerID + 1 self.nextTimerID = self.nextTimerID + 1 h = virEventLoopPureTimer(timerID, interval, cb, opaque) self.timers.append(h) self.interrupt() self.debug("Add timer %d interval %d" % (timerID, interval)) return timerID def update_handle(self, handleID, events): """Change the set of events to be monitored on the file handle. """ h = self.get_handle_by_id(handleID) if h: h.set_events(events) self.poll.unregister(h.get_fd()) self.poll.register(h.get_fd(), self.events_to_poll(events)) self.interrupt() self.debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events)) def update_timer(self, timerID, interval): """Change the periodic frequency of the timer. """ for h in self.timers: if h.get_id() == timerID: h.set_interval(interval); self.interrupt() self.debug("Update timer %d interval %d" % (timerID, interval)) break def remove_handle(self, handleID): """Stop monitoring for events on the file handle. """ handles = [] for h in self.handles: if h.get_id() == handleID: self.poll.unregister(h.get_fd()) self.debug("Remove handle %d fd %d" % (handleID, h.get_fd())) else: handles.append(h) self.handles = handles self.interrupt() def remove_timer(self, timerID): """Stop firing the periodic timer. """ timers = [] for h in self.timers: if h.get_id() != timerID: timers.append(h) self.debug("Remove timer %d" % timerID) self.timers = timers self.interrupt() def events_to_poll(self, events): """Convert from libvirt event constants, to poll() events constants. """ ret = 0 if events & libvirt.VIR_EVENT_HANDLE_READABLE: ret |= select.POLLIN if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: ret |= select.POLLOUT if events & libvirt.VIR_EVENT_HANDLE_ERROR: ret |= select.POLLERR if events & libvirt.VIR_EVENT_HANDLE_HANGUP: ret |= select.POLLHUP return ret def events_from_poll(self, events): """Convert from poll() event constants, to libvirt events constants. """ ret = 0 if events & select.POLLIN: ret |= libvirt.VIR_EVENT_HANDLE_READABLE if events & select.POLLOUT: ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE if events & select.POLLNVAL: ret |= libvirt.VIR_EVENT_HANDLE_ERROR if events & select.POLLERR: ret |= libvirt.VIR_EVENT_HANDLE_ERROR if events & select.POLLHUP: ret |= libvirt.VIR_EVENT_HANDLE_HANGUP return ret def register_callbacks(self, *callbacks): """Register a callback.""" global connection for c in callbacks: connection.domainEventRegister(c, None) # TODO find out args def run(self): """Run the loop.""" self.quit = False while not self.quit: self.run_once() def start(self, daemonize=True): """Run the loop in a background thread.""" t = Thread(target=self.run) t.daemon = True t.start() self.thread = t return t