From b39958ea68042ef55c2045cb08cb278d29195690 Mon Sep 17 00:00:00 2001 From: Anael Beutot Date: Fri, 4 May 2012 17:53:02 +0200 Subject: [PATCH] Hypervisor with new API. Refactored plugins. Updated hypervisor libvirt event handling. Async rpc handling. Minor renaming for libvirt event callback. Remove uneeded method sub_tags (in hypervisor Handler). --- ccnode/hypervisor/__init__.py | 149 ++++++++----- ccnode/hypervisor/lib.py | 408 +++++++++++----------------------- ccnode/node.py | 9 +- ccnode/plugins.py | 8 +- 4 files changed, 236 insertions(+), 338 deletions(-) diff --git a/ccnode/hypervisor/__init__.py b/ccnode/hypervisor/__init__.py index 646d88e..f8814d2 100644 --- a/ccnode/hypervisor/__init__.py +++ b/ccnode/hypervisor/__init__.py @@ -8,7 +8,10 @@ from ccnode.host import Handler as HostHandler from ccnode.tags import Tag, tag_inspector, get_tags from ccnode.hypervisor import tags from ccnode.hypervisor import lib as _libvirt -from ccnode.hypervisor.lib import DOMAIN_STATES, EVENTS, STORAGE_STATES, EventLoop +from ccnode.hypervisor.lib import ( + DOMAIN_STATES, EVENTS, STORAGE_STATES, + EventLoop as VirEventLoop, +) from ccnode.hypervisor.domains import VirtualMachine @@ -18,26 +21,23 @@ logger = logging.getLogger(__name__) class Handler(HostHandler): def __init__(self, *args, **kwargs): """ - :param proxy: sjRpc proxy + :param loop: MainLoop instance :param hypervisor_name: hypervisor name """ + hypervisor_name = kwargs.pop('hypervisor_name') HostHandler.__init__(self, *args, **kwargs) - for t in tag_inspector(tags, self): - self.tags[t.name] = t + #: keep index of asynchronous calls + self.async_calls = dict() # initialize hypervisor instance - global hypervisor - if hypervisor is None: - hypervisor = Hypervisor( - name=kwargs.pop('hypervisor_name', None), - proxy=kwargs['proxy'], - ) - else: - hypervisor.sjproxy = weakref.proxy(kwargs['proxy']) - - self.hypervisor = weakref.proxy(hypervisor) + # FIXME this may block + self.hypervisor = Hypervisor( + name=hypervisor_name, + loop=self.main, + ) + # FIXME this may block # register hypervisor storage tags for name, storage in self.hypervisor.storage.storages.iteritems(): for t in ( @@ -47,33 +47,48 @@ class Handler(HostHandler): Tag('sto%s_used' % name, lambda: storage.capacity - storage.available, 5), ): - self.tags[t.name] = t + self.tag_db['__main__'][t.name] = t # register domains - proxy = kwargs.pop('proxy') - for dom in hypervisor.domains.itervalues(): + for dom in self.hypervisor.domains.itervalues(): name = dom.name - logger.debug('Registered domain %s', name) - proxy.register(name, 'vm') - - def sub_tags(self, sub_id, tags=None, noresolve_tags=None): - """Get subtags.""" - global hypervisor - - domain = hypervisor.domains.get(sub_id) - - if domain is None: - logger.debug('Failed to find domain with name %s.', sub_id) + # proxy.register(name, 'vm') + self.async_calls[self.main.rpc_con.rpc.async_call_cb( + self.register_domain_cb, + 'register', + name, + 'vm', + )] = name + + self.tag_db['__main__'].update(dict( + (t.name, t) for t in tag_inspector(tags, self), + )) + + self.rpc_handler.update(dict( + vm_define=self.vm_define, + vm_undefine=self.vm_undefine, + vm_export=self.vm_export, + vm_stop=self.vm_stop, + vm_start=self.vm_start, + vm_suspend=self.vm_suspend, + vm_resume=self.vm_resume, + )) + + def register_domain_cb(self, call_id, response=None, error=None): + name = self.async_calls.pop(call_id) + if error is not None: + logger.error('Error while registering domain, %s', error) return - logger.debug('Get tags for sub object: %s', sub_id) - return get_tags(domain, tags, noresolve_tags) + logger.debug('Registered domain %s', name) + domain = self.hypervisor.domains[name] + for tag in domain.tags.itervalues(): + self.main.reset_sub_tag(domain.name, tag) def iter_vms(self, vm_names): """Utility function to iterate over VM objects using their names.""" if vm_names is None: return - # get_domain = self.hypervisor.domains.get get_domain = self.hypervisor.domains.get for name in vm_names: dom = get_domain(name) @@ -133,31 +148,33 @@ class Handler(HostHandler): class Hypervisor(object): """Container for all hypervisor related state.""" - def __init__(self, name, proxy): + def __init__(self, name, loop): """ :param str name: name of hypervisor instance - :param proxy: sjRpc proxy + :param loop: MainLoop instance """ - self.sjproxy = weakref.proxy(proxy) + #: parent MainLoop + self.main = weakref.proxy(loop) + self.rpc_con = weakref.proxy(loop.rpc_con) # FIXME do we need weakref ? + self.async_calls = dict() #: hv attributes self.name = name self.type = u'kvm' - self.event_loop = EventLoop() + # libvirt event loop abstraction + self.vir_event_loop = VirEventLoop(self.main.loop) # This tells libvirt what event loop implementation it # should use libvirt.virEventRegisterImpl( - self.event_loop.add_handle, - self.event_loop.update_handle, - self.event_loop.remove_handle, - self.event_loop.add_timer, - self.event_loop.update_timer, - self.event_loop.remove_timer, + self.vir_event_loop.add_handle, + self.vir_event_loop.update_handle, + self.vir_event_loop.remove_handle, + self.vir_event_loop.add_timer, + self.vir_event_loop.update_timer, + self.vir_event_loop.remove_timer, ) - self.event_loop.start() - # TODO cleanup connection on stop _libvirt.connection = libvirt.open('qemu:///system') # currently only support KVM @@ -166,7 +183,6 @@ class Hypervisor(object): logger.debug('Storages: %s', self.storage.paths) - #: domains: vms, containers... self.domains = dict() # find defined domains @@ -180,9 +196,13 @@ class Hypervisor(object): logger.debug('Domains: %s', self.domains) - self.event_loop.register_callbacks(self.callback) + self.vir_event_loop.register_callbacks(self.vir_cb) + + def stop(self): + self.vir_event_loop.stop() + # TODO delet objects - def callback(self, conn, dom, event, detail, opaque): + def vir_cb(self, conn, dom, event, detail, opaque): """Callback for libvirt event loop.""" logger.debug('Received event %s on domain %s, detail %s', event, dom.name(), detail) @@ -192,13 +212,23 @@ class Hypervisor(object): if event == 'Added': vm = VirtualMachine(dom, self) self.domains[vm.name] = vm - self.sjproxy.register(vm.name, 'vm') - logger.info('Add domain: %s (%s)', vm.name, vm.uuid) + # self.sjproxy.register(vm.name, 'vm') + self.async_calls[self.rpc_con.rpc.async_call_cb( + self.register_cb, + 'register', + vm.name, + 'vm', + )] = vm.name elif event == 'Removed': logger.debug('About to remove domain') vm = self.domains.pop(dom.name()) - self.sjproxy.unregister(vm.name) - logger.info('Delete domain: %s (%s)', vm.name, vm.uuid) + # self.sjproxy.unregister(vm.name) + self.async_calls[self.rpc_con.rpc.async_call_cb( + self.register_cb, + 'register', + vm.name, + 'vm', + )] = vm.name elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved', 'Restored'): vm = self.domains.get(dom.name()) @@ -210,6 +240,22 @@ class Hypervisor(object): state) vm.state = state + def register_cb(self, call_id, response=None, error=None): + vm = self.domains[self.async_calls.pop(call_id)] + if error is not None: + logger.error('Error while registering domain to server, %s', error) + logger.info('Add domain: %s (%s)', vm.name, vm.uuid) + # add tags + for tag in vm.tags.itervalues(): + self.main.reset_sub_tag(vm.name, tag) + + def unregister_cb(self, call_id, response=None, error=None): + vm = self.domains[self.async_calls.pop(call_id)] + if error is not None: + logger.error('Error while unregistering domain to server, %s', error) + logger.info('Delete domain: %s (%s)', vm.name, vm.uuid) + self.main.remove_sub_object(vm) + def _count_domain(self, filter=lambda d: True): count = 0 @@ -240,9 +286,6 @@ class Hypervisor(object): return self._count_domain() -hypervisor = None - - class StorageIndex(object): """Keep an index of all storage volume paths.""" def __init__(self, lv_con): diff --git a/ccnode/hypervisor/lib.py b/ccnode/hypervisor/lib.py index 04909e7..0adb22d 100644 --- a/ccnode/hypervisor/lib.py +++ b/ccnode/hypervisor/lib.py @@ -1,21 +1,16 @@ """Helpers for libvirt.""" -import sys -import getopt -import os -import select -import errno -import time -import atexit import logging -from threading import Thread +from itertools import chain +import pyev import libvirt logger = logging.getLogger(__name__) +# FIXME bad global variable #: connection to the libvirt connection = None # TODO create a watcher thread for this @@ -44,7 +39,7 @@ STORAGE_STATES = ( ) -#: libvirt envents +#: libvirt events EVENTS = ( 'Added', 'Removed', @@ -58,35 +53,54 @@ EVENTS = ( # following event loop implementation was inspired by libvirt python example +# but updated to work with libev class LoopHandler(object): """This class contains the data we need to track for a single file handle. """ - def __init__(self, handle, fd, events, cb, opaque): + def __init__(self, loop, handle, fd, events, cb, opaque): + # events conversion + self.events_map = { + pyev.EV_READ: libvirt.VIR_EVENT_HANDLE_READABLE, + pyev.EV_WRITE: libvirt.VIR_EVENT_HANDLE_WRITABLE, + } + self.revents_map = { + libvirt.VIR_EVENT_HANDLE_READABLE: pyev.EV_READ, + libvirt.VIR_EVENT_HANDLE_WRITABLE: pyev.EV_WRITE, + } + self.handle = handle self.fd = fd - self.events = events - self.cb = cb + self._events = self.revents_map[events] + self._cb = cb self.opaque = opaque + self.watcher = loop.io(self.fd, self._events, self.ev_cb) + + def _set(self): + self.watcher.stop() + self.watcher.set(self.fd, self._events) + self.watcher.start() - def get_id(self): - return self.handle + @property + def events(self): + return self._events - def get_fd(self): - return self.fd + @events.setter + def events(self, events): + self._events = self.revents_map[events] + self._set() - def get_events(self): - return self.events + def start(self): + self.watcher.start() - def set_events(self, events): - self.events = events + def stop(self): + self.watcher.stop() - def dispatch(self, events): - self.cb(self.handle, - self.fd, - events, - self.opaque[0], - self.opaque[1]) + def ev_cb(self, watcher, revents): + # convert events + events = self.events_map[revents] + self._cb(self.handle, self.watcher.fd, events, self.opaque[0], + self.opaque[1]) class LoopTimer(object): @@ -94,167 +108,67 @@ class LoopTimer(object): timer. """ - def __init__(self, timer, interval, cb, opaque): + def __init__(self, loop, timer, interval, cb, opaque): self.timer = timer - self.interval = interval - self.cb = cb + self._interval = float(interval) + self._cb = cb self.opaque = opaque - self.lastfired = 0 + self.watcher = None + self.loop = loop + self._set() - def get_id(self): - return self.timer + def _set(self): + self.stop() + if self._interval >= 0.: # libvirt sends us interval == -1 + self.watcher = self.loop.timer(self._interval, self._interval, + self.ev_cb) + else: + self.watcher = None + self.start() - def get_interval(self): - return self.interval + @property + def interval(self): + return self._interval - def set_interval(self, interval): - self.interval = interval + @interval.setter + def interval(self, value): + self._interval = float(value) + self._set() - def get_last_fired(self): - return self.lastfired + def start(self): + if self.watcher is not None: + self.watcher.start() - def set_last_fired(self, now): - self.lastfired = now + def stop(self): + if self.watcher is not None: + self.watcher.stop() - def dispatch(self): - self.cb(self.timer, - self.opaque[0], - self.opaque[1]) + def ev_cb(self, *args): + self._cb(self.timer, self.opaque[0], self.opaque[1]) class EventLoop(object): - """This general purpose event loop will support waiting for file handle - I/O and errors events, as well as scheduling repeatable timers with a fixed - interval. - It is a pure python implementation based around the poll() API. + """This class is used as an interface between the libvirt event handling and + the main pyev loop. + It cannot be used from other threads. """ - def __init__(self): - self.poll = select.poll() - self.pipetrick = os.pipe() - self.nextHandleID = 1 - self.nextTimerID = 1 - self.handles = [] - self.timers = [] - self.quit = False - - # The event loop can be used from multiple threads at once. - # Specifically while the main thread is sleeping in poll() - # waiting for events to occur, another thread may come along - # and add/update/remove a file handle, or timer. When this - # happens we need to interrupt the poll() sleep in the other - # thread, so that it'll see the file handle / timer changes. - # - # Using OS level signals for this is very unreliable and - # hard to implement correctly. Thus we use the real classic - # "self pipe" trick. A anonymous pipe, with one end registered - # with the event loop for input events. When we need to force - # the main thread out of a poll() sleep, we simple write a - # single byte of data to the other end of the pipe. - # logger.debug('Self pipe watch %d write %d', self.pipetrick[0], self.pipetrick[1]) - self.poll.register(self.pipetrick[0], select.POLLIN) - - def next_timeout(self): - """Calculate when the next timeout is due to occurr, returning - the absolute timestamp for the next timeout, or 0 if there is - no timeout due. - + def __init__(self, loop): """ - next = 0 - for t in self.timers: - last = t.get_last_fired() - interval = t.get_interval() - if interval < 0: - continue - if next == 0 or (last + interval) < next: - next = last + interval - - return next - - def get_handle_by_fd(self, fd): - """Lookup a LoopHandler object based on file descriptor. - - """ - for h in self.handles: - if h.get_fd() == fd: - return h - return None - - def get_handle_by_id(self, handleID): - """Lookup a LoopHandler object based on its event loop ID. - + :param loop: pyev loop instance """ - for h in self.handles: - if h.get_id() == handleID: - return h - return None + self.loop = loop + def counter(): + count = 0 + while True: + yield count + count += 1 - def run_once(self): - """This is the heart of the event loop, performing one single - iteration. It asks when the next timeout is due, and then - calcuates the maximum amount of time it is able to sleep - for in poll() pending file handle events. - - It then goes into the poll() sleep. - - When poll() returns, there will zero or more file handle - events which need to be dispatched to registered callbacks - It may also be time to fire some periodic timers. - - Due to the coarse granularity of schedular timeslices, if - we ask for a sleep of 500ms in order to satisfy a timer, we - may return upto 1 schedular timeslice early. So even though - our sleep timeout was reached, the registered timer may not - technically be at its expiry point. This leads to us going - back around the loop with a crazy 5ms sleep. So when checking - if timeouts are due, we allow a margin of 20ms, to avoid - these pointless repeated tiny sleeps. - - """ - sleep = -1 - next = self.next_timeout() - # logger.debug('Next timeout due at %d', next) - if next > 0: - now = int(time.time() * 1000) - if now >= next: - sleep = 0 - else: - sleep = next - now - - # logger.debug('Poll with a sleep of %d', sleep) - events = self.poll.poll(sleep) - - # Dispatch any file handle events that occurred - for (fd, revents) in events: - # See if the events was from the self-pipe - # telling us to wakup. if so, then discard - # the data just continue - if fd == self.pipetrick[0]: - data = os.read(fd, 1) - continue - - h = self.get_handle_by_fd(fd) - if h: - # logger.debug('Dispatch fd %d handle %d events %d', fd, h.get_id(), revents) - h.dispatch(self.events_from_poll(revents)) - - now = int(time.time() * 1000) - for t in self.timers: - interval = t.get_interval() - if interval < 0: - continue - - want = t.get_last_fired() + interval - # Deduct 20ms, since schedular timeslice - # means we could be ever so slightly early - if now >= (want-20): - # logger.debug('Dispatch timer %d now %s want %s', t.get_id(), str(now), str(want)) - t.set_last_fired(now) - t.dispatch() - - def interrupt(self): - os.write(self.pipetrick[1], 'c') + self.handle_id = counter() + self.timer_id = counter() + self.handles = dict() + self.timers = dict() def add_handle(self, fd, events, cb, opaque): """Registers a new file handle 'fd', monitoring for 'events' (libvirt @@ -262,19 +176,17 @@ class EventLoop(object): Returns a unique integer identier for this handle, that should be used to later update/remove it. + Note: unlike in the libvirt example, we don't use an interrupt trick as + we run everything in the same thread, furthermore, calling start watcher + method from a different thread could be dangerous. """ - handleID = self.nextHandleID + 1 - self.nextHandleID = self.nextHandleID + 1 + handle_id = self.handle_id.next() - h = LoopHandler(handleID, fd, events, cb, opaque) - self.handles.append(h) - - self.poll.register(fd, self.events_to_poll(events)) - self.interrupt() - - # logger.debug('Add handle %d fd %d events %d', handleID, fd, events) - - return handleID + h = LoopHandler(self.loop, handle_id, fd, events, cb, opaque) + h.start() + self.handles[handle_id] = h + logger.debug('Add handle %d fd %d events %d', handle_id, fd, events) + return handle_id def add_timer(self, interval, cb, opaque): """Registers a new timer with periodic expiry at 'interval' ms, @@ -283,124 +195,64 @@ class EventLoop(object): Returns a unique integer identier for this handle, that should be used to later update/remove it. + Note: same note as for :method:`add_handle` applies here """ - timerID = self.nextTimerID + 1 - self.nextTimerID = self.nextTimerID + 1 - - h = LoopTimer(timerID, interval, cb, opaque) - self.timers.append(h) - self.interrupt() + timer_id = self.timer_id.next() - # logger.debug('Add timer %d interval %d', timerID, interval) + h = LoopTimer(self.loop, timer_id, interval, cb, opaque) + h.start() + self.timers[timer_id] = h + logger.debug('Add timer %d interval %d', timer_id, interval) + return timer_id - return timerID - - def update_handle(self, handleID, events): + def update_handle(self, handle_id, events): """Change the set of events to be monitored on the file handle. """ - h = self.get_handle_by_id(handleID) + h = self.handles.get(handle_id) if h: - h.set_events(events) - self.poll.unregister(h.get_fd()) - self.poll.register(h.get_fd(), self.events_to_poll(events)) - self.interrupt() - - # logger.debug('Update handle %d fd %d events %d', handleID, h.get_fd(), events) + h.events = events + logger.debug('Update handle %d fd %d events %d', handle_id, h.fd, events) - def update_timer(self, timerID, interval): + def update_timer(self, timer_id, interval): """Change the periodic frequency of the timer. """ - for h in self.timers: - if h.get_id() == timerID: - h.set_interval(interval); - self.interrupt() - - # logger.debug('Update timer %d interval %d', timerID, interval) - break + t = self.timers.get(timer_id) + if t: + t.interval = interval - def remove_handle(self, handleID): + def remove_handle(self, handle_id): """Stop monitoring for events on the file handle. """ - handles = [] - for h in self.handles: - if h.get_id() == handleID: - self.poll.unregister(h.get_fd()) - # logger.debug('Remove handle %d fd %d', handleID, h.get_fd()) - else: - handles.append(h) - self.handles = handles - self.interrupt() - - def remove_timer(self, timerID): - """Stop firing the periodic timer. - - """ - timers = [] - for h in self.timers: - if h.get_id() != timerID: - timers.append(h) - # logger.debug('Remove timer %d', timerID) - self.timers = timers - self.interrupt() - - def events_to_poll(self, events): - """Convert from libvirt event constants, to poll() events constants. - - """ - ret = 0 - - if events & libvirt.VIR_EVENT_HANDLE_READABLE: - ret |= select.POLLIN - if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: - ret |= select.POLLOUT - if events & libvirt.VIR_EVENT_HANDLE_ERROR: - ret |= select.POLLERR - if events & libvirt.VIR_EVENT_HANDLE_HANGUP: - ret |= select.POLLHUP - - return ret + h = self.handles.pop(handle_id, None) + if h: + h.stop() + logger.debug('Remove handle %d', handle_id) - def events_from_poll(self, events): - """Convert from poll() event constants, to libvirt events constants. + def remove_timer(self, timer_id): + """Stop firing the periodic timer. """ - ret = 0 - - if events & select.POLLIN: - ret |= libvirt.VIR_EVENT_HANDLE_READABLE - if events & select.POLLOUT: - ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE - if events & select.POLLNVAL: - ret |= libvirt.VIR_EVENT_HANDLE_ERROR - if events & select.POLLERR: - ret |= libvirt.VIR_EVENT_HANDLE_ERROR - if events & select.POLLHUP: - ret |= libvirt.VIR_EVENT_HANDLE_HANGUP - - return ret + t = self.timers.pop(timer_id, None) + if t: + t.stop() + logger.debug('Remove timer %d', timer_id) def register_callbacks(self, *callbacks): """Register a callback.""" global connection for c in callbacks: - connection.domainEventRegister(c, None) # TODO find out args - - def run(self): - """Run the loop.""" - self.quit = False - while not self.quit: - self.run_once() - - def start(self, daemonize=True): - """Run the loop in a background thread.""" - t = Thread(target=self.run) - t.daemon = True - t.start() - - self.thread = t - - return t + try: + connection.domainEventRegister(c, None) # TODO find out args + except: + logger.exception('Can\'t register callback') + + def stop(self): + for handl in chain(self.handles.itervalues(), self.timers.itervalues()): + handl.stop() + + self.handles = dict() + self.timers = dict() diff --git a/ccnode/node.py b/ccnode/node.py index c583287..14abd18 100644 --- a/ccnode/node.py +++ b/ccnode/node.py @@ -100,14 +100,13 @@ class AuthHandler(object): if response == u'host': logger.debug('Role host affected') from ccnode.host import Handler as HostHandler - self.loop.role = HostHandler(loop=self.loop.loop) + self.loop.role = HostHandler(loop=self.loop) elif response == u'hv': logger.debug('Role hypervisor affected') from ccnode.hypervisor import Handler as HypervisorHandler self.loop.role = HypervisorHandler( - proxy=self.loop.proxy, hypervisor_name=self.loop.config.server_user, - loop=self.loop.loop, + loop=self.loop, ) else: logger.error('Failed authentication, role returned: %s', response) @@ -213,6 +212,10 @@ class MainLoop(object): # TODO tag unregister self.tag_db[sub_id].pop(tag_name).stop() + def remove_sub_object(self, sub_id): + for tag in self.tag_db.pop(sub_id, {}).itervalues(): + tag.stop() + def register_plugin(self, plugin): # keep track of registered plugins if plugin in self.registered_plugins: diff --git a/ccnode/plugins.py b/ccnode/plugins.py index acbe038..9d689c4 100644 --- a/ccnode/plugins.py +++ b/ccnode/plugins.py @@ -12,9 +12,9 @@ class Base(object): """ def __init__(self, *args, **kwargs): """ - :param loop: pyev loop instance + :param loop: MainLoop instance """ - self.loop = kwargs.pop('loop') + self.main = kwargs.pop('loop') # plugins may define tags (see :mod:`ccnode.tags`) self.tag_db = dict( @@ -44,10 +44,10 @@ class Base(object): for tag in chain.from_iterable( imap(lambda d: d.itervalues(), self.tag_db.itervalues()), ): - tag.start(self.loop) + tag.start(self.main.loop) def stop(self): """Cleanup for plugins, can be used to clean pyev watchers.""" - self.loop = None + self.main = None # TODO dependencies -- GitLab