Commit b39958ea authored by Anael Beutot's avatar Anael Beutot
Browse files

Hypervisor with new API.

Refactored plugins.
Updated hypervisor libvirt event handling.
Async rpc handling.
Minor renaming for libvirt event callback.
Remove uneeded method sub_tags (in hypervisor Handler).
parent e9cdcc95
Loading
Loading
Loading
Loading
+96 −53
Original line number Diff line number Diff line
@@ -8,7 +8,10 @@ from ccnode.host import Handler as HostHandler
from ccnode.tags import Tag, tag_inspector, get_tags
from ccnode.hypervisor import tags
from ccnode.hypervisor import lib as _libvirt
from ccnode.hypervisor.lib import DOMAIN_STATES, EVENTS, STORAGE_STATES, EventLoop
from ccnode.hypervisor.lib import (
    DOMAIN_STATES, EVENTS, STORAGE_STATES,
    EventLoop as VirEventLoop,
)
from ccnode.hypervisor.domains import VirtualMachine


@@ -18,26 +21,23 @@ logger = logging.getLogger(__name__)
class Handler(HostHandler):
    def __init__(self, *args, **kwargs):
        """
        :param proxy: sjRpc proxy
        :param loop: MainLoop instance
        :param hypervisor_name: hypervisor name
        """
        hypervisor_name = kwargs.pop('hypervisor_name')
        HostHandler.__init__(self, *args, **kwargs)

        for t in tag_inspector(tags, self):
            self.tags[t.name] = t
        #: keep index of asynchronous calls
        self.async_calls = dict()

        # initialize hypervisor instance
        global hypervisor
        if hypervisor is None:
            hypervisor = Hypervisor(
                name=kwargs.pop('hypervisor_name', None),
                proxy=kwargs['proxy'],
        # FIXME this may block
        self.hypervisor = Hypervisor(
            name=hypervisor_name,
            loop=self.main,
        )
        else:
            hypervisor.sjproxy = weakref.proxy(kwargs['proxy'])

        self.hypervisor = weakref.proxy(hypervisor)

        # FIXME this may block
        # register hypervisor storage tags
        for name, storage in self.hypervisor.storage.storages.iteritems():
            for t in (
@@ -47,33 +47,48 @@ class Handler(HostHandler):
                Tag('sto%s_used' % name,
                    lambda: storage.capacity - storage.available, 5),
            ):
                self.tags[t.name] = t
                self.tag_db['__main__'][t.name] = t

        # register domains
        proxy = kwargs.pop('proxy')
        for dom in hypervisor.domains.itervalues():
        for dom in self.hypervisor.domains.itervalues():
            name = dom.name
            logger.debug('Registered domain %s', name)
            proxy.register(name, 'vm')

    def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
        """Get subtags."""
        global hypervisor

        domain = hypervisor.domains.get(sub_id)

        if domain is None:
            logger.debug('Failed to find domain with name %s.', sub_id)
            # proxy.register(name, 'vm')
            self.async_calls[self.main.rpc_con.rpc.async_call_cb(
                self.register_domain_cb,
                'register',
                name,
                'vm',
            )] = name

        self.tag_db['__main__'].update(dict(
            (t.name, t) for t in tag_inspector(tags, self),
        ))

        self.rpc_handler.update(dict(
            vm_define=self.vm_define,
            vm_undefine=self.vm_undefine,
            vm_export=self.vm_export,
            vm_stop=self.vm_stop,
            vm_start=self.vm_start,
            vm_suspend=self.vm_suspend,
            vm_resume=self.vm_resume,
        ))

    def register_domain_cb(self, call_id, response=None, error=None):
        name = self.async_calls.pop(call_id)
        if error is not None:
            logger.error('Error while registering domain, %s', error)
            return

        logger.debug('Get tags for sub object: %s', sub_id)
        return get_tags(domain, tags, noresolve_tags)
        logger.debug('Registered domain %s', name)
        domain = self.hypervisor.domains[name]
        for tag in domain.tags.itervalues():
            self.main.reset_sub_tag(domain.name, tag)

    def iter_vms(self, vm_names):
        """Utility function to iterate over VM objects using their names."""
        if vm_names is None:
            return
        # get_domain = self.hypervisor.domains.get
        get_domain = self.hypervisor.domains.get
        for name in vm_names:
            dom = get_domain(name)
@@ -133,31 +148,33 @@ class Handler(HostHandler):

class Hypervisor(object):
    """Container for all hypervisor related state."""
    def __init__(self, name, proxy):
    def __init__(self, name, loop):
        """
        :param str name: name of hypervisor instance
        :param proxy: sjRpc proxy
        :param loop: MainLoop instance
        """
        self.sjproxy = weakref.proxy(proxy)
        #: parent MainLoop
        self.main = weakref.proxy(loop)
        self.rpc_con = weakref.proxy(loop.rpc_con)  # FIXME do we need weakref ?
        self.async_calls = dict()

        #: hv attributes
        self.name = name
        self.type = u'kvm'

        self.event_loop = EventLoop()
        # libvirt event loop abstraction
        self.vir_event_loop = VirEventLoop(self.main.loop)
        # This tells libvirt what event loop implementation it
        # should use
        libvirt.virEventRegisterImpl(
            self.event_loop.add_handle,
            self.event_loop.update_handle,
            self.event_loop.remove_handle,
            self.event_loop.add_timer,
            self.event_loop.update_timer,
            self.event_loop.remove_timer,
            self.vir_event_loop.add_handle,
            self.vir_event_loop.update_handle,
            self.vir_event_loop.remove_handle,
            self.vir_event_loop.add_timer,
            self.vir_event_loop.update_timer,
            self.vir_event_loop.remove_timer,
        )

        self.event_loop.start()

        # TODO cleanup connection on stop
        _libvirt.connection = libvirt.open('qemu:///system')  # currently only support KVM

@@ -166,7 +183,6 @@ class Hypervisor(object):

        logger.debug('Storages: %s', self.storage.paths)


        #: domains: vms, containers...
        self.domains = dict()
        # find defined domains
@@ -180,9 +196,13 @@ class Hypervisor(object):

        logger.debug('Domains: %s', self.domains)

        self.event_loop.register_callbacks(self.callback)
        self.vir_event_loop.register_callbacks(self.vir_cb)

    def stop(self):
        self.vir_event_loop.stop()
        # TODO delet objects

    def callback(self, conn, dom, event, detail, opaque):
    def vir_cb(self, conn, dom, event, detail, opaque):
        """Callback for libvirt event loop."""
        logger.debug('Received event %s on domain %s, detail %s', event,
                     dom.name(), detail)
@@ -192,13 +212,23 @@ class Hypervisor(object):
        if event == 'Added':
            vm = VirtualMachine(dom, self)
            self.domains[vm.name] = vm
            self.sjproxy.register(vm.name, 'vm')
            logger.info('Add domain: %s (%s)', vm.name, vm.uuid)
            # self.sjproxy.register(vm.name, 'vm')
            self.async_calls[self.rpc_con.rpc.async_call_cb(
                self.register_cb,
                'register',
                vm.name,
                'vm',
            )] = vm.name
        elif event == 'Removed':
            logger.debug('About to remove domain')
            vm = self.domains.pop(dom.name())
            self.sjproxy.unregister(vm.name)
            logger.info('Delete domain: %s (%s)', vm.name, vm.uuid)
            # self.sjproxy.unregister(vm.name)
            self.async_calls[self.rpc_con.rpc.async_call_cb(
                self.register_cb,
                'register',
                vm.name,
                'vm',
            )] = vm.name
        elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved',
                       'Restored'):
            vm = self.domains.get(dom.name())
@@ -210,6 +240,22 @@ class Hypervisor(object):
                             state)
                vm.state = state

    def register_cb(self, call_id, response=None, error=None):
        vm = self.domains[self.async_calls.pop(call_id)]
        if error is not None:
            logger.error('Error while registering domain to server, %s', error)
        logger.info('Add domain: %s (%s)', vm.name, vm.uuid)
        # add tags
        for tag in vm.tags.itervalues():
            self.main.reset_sub_tag(vm.name, tag)

    def unregister_cb(self, call_id, response=None, error=None):
        vm = self.domains[self.async_calls.pop(call_id)]
        if error is not None:
            logger.error('Error while unregistering domain to server, %s', error)
        logger.info('Delete domain: %s (%s)', vm.name, vm.uuid)
        self.main.remove_sub_object(vm)

    def _count_domain(self, filter=lambda d: True):
        count = 0

@@ -240,9 +286,6 @@ class Hypervisor(object):
        return self._count_domain()


hypervisor = None


class StorageIndex(object):
    """Keep an index of all storage volume paths."""
    def __init__(self, lv_con):
+130 −278
Original line number Diff line number Diff line
"""Helpers for libvirt."""

import sys
import getopt
import os
import select
import errno
import time
import atexit
import logging
from threading import Thread
from itertools import chain

import pyev
import libvirt


logger = logging.getLogger(__name__)


# FIXME bad global variable
#: connection to the libvirt
connection = None  # TODO create a watcher thread for this

@@ -44,7 +39,7 @@ STORAGE_STATES = (
)


#: libvirt envents
#: libvirt events
EVENTS = (
    'Added',
    'Removed',
@@ -58,34 +53,53 @@ EVENTS = (


# following event loop implementation was inspired by libvirt python example
# but updated to work with libev
class LoopHandler(object):
    """This class contains the data we need to track for a single file handle.

    """
    def __init__(self, handle, fd, events, cb, opaque):
    def __init__(self, loop, handle, fd, events, cb, opaque):
        # events conversion
        self.events_map = {
            pyev.EV_READ: libvirt.VIR_EVENT_HANDLE_READABLE,
            pyev.EV_WRITE: libvirt.VIR_EVENT_HANDLE_WRITABLE,
        }
        self.revents_map = {
            libvirt.VIR_EVENT_HANDLE_READABLE: pyev.EV_READ,
            libvirt.VIR_EVENT_HANDLE_WRITABLE: pyev.EV_WRITE,
        }

        self.handle = handle
        self.fd = fd
        self.events = events
        self.cb = cb
        self._events = self.revents_map[events]
        self._cb = cb
        self.opaque = opaque
        self.watcher = loop.io(self.fd, self._events, self.ev_cb)

    def _set(self):
        self.watcher.stop()
        self.watcher.set(self.fd, self._events)
        self.watcher.start()

    def get_id(self):
        return self.handle
    @property
    def events(self):
        return self._events

    def get_fd(self):
        return self.fd
    @events.setter
    def events(self, events):
        self._events = self.revents_map[events]
        self._set()

    def get_events(self):
        return self.events
    def start(self):
        self.watcher.start()

    def set_events(self, events):
        self.events = events
    def stop(self):
        self.watcher.stop()

    def dispatch(self, events):
        self.cb(self.handle,
                self.fd,
                events,
                self.opaque[0],
    def ev_cb(self, watcher, revents):
        # convert events
        events = self.events_map[revents]
        self._cb(self.handle, self.watcher.fd, events, self.opaque[0],
                 self.opaque[1])


@@ -94,167 +108,67 @@ class LoopTimer(object):
    timer.

    """
    def __init__(self, timer, interval, cb, opaque):
    def __init__(self, loop, timer, interval, cb, opaque):
        self.timer = timer
        self.interval = interval
        self.cb = cb
        self._interval = float(interval)
        self._cb = cb
        self.opaque = opaque
        self.lastfired = 0

    def get_id(self):
        return self.timer
        self.watcher = None
        self.loop = loop
        self._set()

    def _set(self):
        self.stop()
        if self._interval >= 0.:  # libvirt sends us interval == -1
            self.watcher = self.loop.timer(self._interval, self._interval,
                                           self.ev_cb)
        else:
            self.watcher = None
        self.start()

    def get_interval(self):
        return self.interval
    @property
    def interval(self):
        return self._interval

    def set_interval(self, interval):
        self.interval = interval
    @interval.setter
    def interval(self, value):
        self._interval = float(value)
        self._set()

    def get_last_fired(self):
        return self.lastfired
    def start(self):
        if self.watcher is not None:
            self.watcher.start()

    def set_last_fired(self, now):
        self.lastfired = now
    def stop(self):
        if self.watcher is not None:
            self.watcher.stop()

    def dispatch(self):
        self.cb(self.timer,
                self.opaque[0],
                self.opaque[1])
    def ev_cb(self, *args):
        self._cb(self.timer, self.opaque[0], self.opaque[1])


class EventLoop(object):
    """This general purpose event loop will support waiting for file handle
    I/O and errors events, as well as scheduling repeatable timers with a fixed
    interval.
    It is a pure python implementation based around the poll() API.

    """
    def __init__(self):
        self.poll = select.poll()
        self.pipetrick = os.pipe()
        self.nextHandleID = 1
        self.nextTimerID = 1
        self.handles = []
        self.timers = []
        self.quit = False

        # The event loop can be used from multiple threads at once.
        # Specifically while the main thread is sleeping in poll()
        # waiting for events to occur, another thread may come along
        # and add/update/remove a file handle, or timer. When this
        # happens we need to interrupt the poll() sleep in the other
        # thread, so that it'll see the file handle / timer changes.
        #
        # Using OS level signals for this is very unreliable and
        # hard to implement correctly. Thus we use the real classic
        # "self pipe" trick. A anonymous pipe, with one end registered
        # with the event loop for input events. When we need to force
        # the main thread out of a poll() sleep, we simple write a
        # single byte of data to the other end of the pipe.
        # logger.debug('Self pipe watch %d write %d', self.pipetrick[0], self.pipetrick[1])
        self.poll.register(self.pipetrick[0], select.POLLIN)

    def next_timeout(self):
        """Calculate when the next timeout is due to occurr, returning
        the absolute timestamp for the next timeout, or 0 if there is
        no timeout due.

        """
        next = 0
        for t in self.timers:
            last = t.get_last_fired()
            interval = t.get_interval()
            if interval < 0:
                continue
            if next == 0 or (last + interval) < next:
                next = last + interval

        return next

    def get_handle_by_fd(self, fd):
        """Lookup a LoopHandler object based on file descriptor.
    """This class is used as an interface between the libvirt event handling and
    the main pyev loop.

    It cannot be used from other threads.
    """
        for h in self.handles:
            if h.get_fd() == fd:
                return h
        return None

    def get_handle_by_id(self, handleID):
        """Lookup a LoopHandler object based on its event loop ID.

    def __init__(self, loop):
        """
        for h in self.handles:
            if h.get_id() == handleID:
                return h
        return None


    def run_once(self):
        """This is the heart of the event loop, performing one single
        iteration. It asks when the next timeout is due, and then
        calcuates the maximum amount of time it is able to sleep
        for in poll() pending file handle events.

        It then goes into the poll() sleep.

        When poll() returns, there will zero or more file handle
        events which need to be dispatched to registered callbacks
        It may also be time to fire some periodic timers.

        Due to the coarse granularity of schedular timeslices, if
        we ask for a sleep of 500ms in order to satisfy a timer, we
        may return upto 1 schedular timeslice early. So even though
        our sleep timeout was reached, the registered timer may not
        technically be at its expiry point. This leads to us going
        back around the loop with a crazy 5ms sleep. So when checking
        if timeouts are due, we allow a margin of 20ms, to avoid
        these pointless repeated tiny sleeps.

        :param loop: pyev loop instance
        """
        sleep = -1
        next = self.next_timeout()
        # logger.debug('Next timeout due at %d', next)
        if next > 0:
            now = int(time.time() * 1000)
            if now >= next:
                sleep = 0
            else:
                sleep = next - now
        self.loop = loop

        # logger.debug('Poll with a sleep of %d', sleep)
        events = self.poll.poll(sleep)
        def counter():
            count = 0
            while True:
                yield count
                count += 1

        # Dispatch any file handle events that occurred
        for (fd, revents) in events:
            # See if the events was from the self-pipe
            # telling us to wakup. if so, then discard
            # the data just continue
            if fd == self.pipetrick[0]:
                data = os.read(fd, 1)
                continue

            h = self.get_handle_by_fd(fd)
            if h:
                # logger.debug('Dispatch fd %d handle %d events %d', fd, h.get_id(), revents)
                h.dispatch(self.events_from_poll(revents))

        now = int(time.time() * 1000)
        for t in self.timers:
            interval = t.get_interval()
            if interval < 0:
                continue

            want = t.get_last_fired() + interval
            # Deduct 20ms, since schedular timeslice
            # means we could be ever so slightly early
            if now >= (want-20):
                # logger.debug('Dispatch timer %d now %s want %s', t.get_id(), str(now), str(want))
                t.set_last_fired(now)
                t.dispatch()

    def interrupt(self):
        os.write(self.pipetrick[1], 'c')
        self.handle_id = counter()
        self.timer_id = counter()
        self.handles = dict()
        self.timers = dict()

    def add_handle(self, fd, events, cb, opaque):
        """Registers a new file handle 'fd', monitoring  for 'events' (libvirt
@@ -262,19 +176,17 @@ class EventLoop(object):
        Returns a unique integer identier for this handle, that should be
        used to later update/remove it.

        Note: unlike in the libvirt example, we don't use an interrupt trick as
        we run everything in the same thread, furthermore, calling start watcher
        method from a different thread could be dangerous.
        """
        handleID = self.nextHandleID + 1
        self.nextHandleID = self.nextHandleID + 1
        handle_id = self.handle_id.next()

        h = LoopHandler(handleID, fd, events, cb, opaque)
        self.handles.append(h)

        self.poll.register(fd, self.events_to_poll(events))
        self.interrupt()

        # logger.debug('Add handle %d fd %d events %d', handleID, fd, events)

        return handleID
        h = LoopHandler(self.loop, handle_id, fd, events, cb, opaque)
        h.start()
        self.handles[handle_id] = h
        logger.debug('Add handle %d fd %d events %d', handle_id, fd, events)
        return handle_id

    def add_timer(self, interval, cb, opaque):
        """Registers a new timer with periodic expiry at 'interval' ms,
@@ -283,124 +195,64 @@ class EventLoop(object):
        Returns a unique integer identier for this handle, that should be
        used to later update/remove it.

        Note: same note as for :method:`add_handle` applies here
        """
        timerID = self.nextTimerID + 1
        self.nextTimerID = self.nextTimerID + 1
        timer_id = self.timer_id.next()

        h = LoopTimer(timerID, interval, cb, opaque)
        self.timers.append(h)
        self.interrupt()
        h = LoopTimer(self.loop, timer_id, interval, cb, opaque)
        h.start()
        self.timers[timer_id] = h
        logger.debug('Add timer %d interval %d', timer_id, interval)
        return timer_id

        # logger.debug('Add timer %d interval %d', timerID, interval)

        return timerID

    def update_handle(self, handleID, events):
    def update_handle(self, handle_id, events):
        """Change the set of events to be monitored on the file handle.

        """
        h = self.get_handle_by_id(handleID)
        h = self.handles.get(handle_id)
        if h:
            h.set_events(events)
            self.poll.unregister(h.get_fd())
            self.poll.register(h.get_fd(), self.events_to_poll(events))
            self.interrupt()

            # logger.debug('Update handle %d fd %d events %d', handleID, h.get_fd(), events)
            h.events = events
            logger.debug('Update handle %d fd %d events %d', handle_id, h.fd, events)

    def update_timer(self, timerID, interval):
    def update_timer(self, timer_id, interval):
        """Change the periodic frequency of the timer.

        """
        for h in self.timers:
            if h.get_id() == timerID:
                h.set_interval(interval);
                self.interrupt()
        t = self.timers.get(timer_id)
        if t:
            t.interval = interval

                # logger.debug('Update timer %d interval %d',  timerID, interval)
                break

    def remove_handle(self, handleID):
    def remove_handle(self, handle_id):
        """Stop monitoring for events on the file handle.

        """
        handles = []
        for h in self.handles:
            if h.get_id() == handleID:
                self.poll.unregister(h.get_fd())
                # logger.debug('Remove handle %d fd %d', handleID, h.get_fd())
            else:
                handles.append(h)
        self.handles = handles
        self.interrupt()
        h = self.handles.pop(handle_id, None)
        if h:
            h.stop()
            logger.debug('Remove handle %d', handle_id)

    def remove_timer(self, timerID):
    def remove_timer(self, timer_id):
        """Stop firing the periodic timer.

        """
        timers = []
        for h in self.timers:
            if h.get_id() != timerID:
                timers.append(h)
                # logger.debug('Remove timer %d', timerID)
        self.timers = timers
        self.interrupt()

    def events_to_poll(self, events):
        """Convert from libvirt event constants, to poll() events constants.

        """
        ret = 0

        if events & libvirt.VIR_EVENT_HANDLE_READABLE:
            ret |= select.POLLIN
        if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
            ret |= select.POLLOUT
        if events & libvirt.VIR_EVENT_HANDLE_ERROR:
            ret |= select.POLLERR
        if events & libvirt.VIR_EVENT_HANDLE_HANGUP:
            ret |= select.POLLHUP

        return ret

    def events_from_poll(self, events):
        """Convert from poll() event constants, to libvirt events constants.

        """
        ret = 0

        if events & select.POLLIN:
            ret |= libvirt.VIR_EVENT_HANDLE_READABLE
        if events & select.POLLOUT:
            ret |= libvirt.VIR_EVENT_HANDLE_WRITABLE
        if events & select.POLLNVAL:
            ret |= libvirt.VIR_EVENT_HANDLE_ERROR
        if events & select.POLLERR:
            ret |= libvirt.VIR_EVENT_HANDLE_ERROR
        if events & select.POLLHUP:
            ret |= libvirt.VIR_EVENT_HANDLE_HANGUP

        return ret
        t = self.timers.pop(timer_id, None)
        if t:
            t.stop()
            logger.debug('Remove timer %d', timer_id)

    def register_callbacks(self, *callbacks):
        """Register a callback."""
        global connection

        for c in callbacks:
            try:
                connection.domainEventRegister(c, None)  # TODO find out args
            except:
                logger.exception('Can\'t register callback')

    def run(self):
        """Run the loop."""
        self.quit = False
        while not self.quit:
            self.run_once()

    def start(self, daemonize=True):
        """Run the loop in a background thread."""
        t = Thread(target=self.run)
        t.daemon = True
        t.start()

        self.thread = t
    def stop(self):
        for handl in chain(self.handles.itervalues(), self.timers.itervalues()):
            handl.stop()

        return t
        self.handles = dict()
        self.timers = dict()
+6 −3

File changed.

Preview size limit exceeded, changes collapsed.

+4 −4
Original line number Diff line number Diff line
@@ -12,9 +12,9 @@ class Base(object):
    """
    def __init__(self, *args, **kwargs):
        """
        :param loop: pyev loop instance
        :param loop: MainLoop instance
        """
        self.loop = kwargs.pop('loop')
        self.main = kwargs.pop('loop')

        # plugins may define tags (see :mod:`ccnode.tags`)
        self.tag_db = dict(
@@ -44,10 +44,10 @@ class Base(object):
        for tag in chain.from_iterable(
            imap(lambda d: d.itervalues(), self.tag_db.itervalues()),
        ):
            tag.start(self.loop)
            tag.start(self.main.loop)

    def stop(self):
        """Cleanup for plugins, can be used to clean pyev watchers."""

        self.loop = None
        self.main = None
        # TODO dependencies