Commit 2ab21b47 authored by Anael Beutot's avatar Anael Beutot
Browse files

Refactored hypervisor/__init__.py module into three modules

hypervisor/__init__.py ->
    hypervisor/__init__.py (Handler)
    hypervisor/kvm.py (Hypervisor -> KVM)
    hypervisor/lib.py (Storage*/Volume)
parent 45772b04
Loading
Loading
Loading
Loading
+4 −375
Original line number Diff line number Diff line
@@ -4,10 +4,8 @@ import sys
import signal
import socket
import time
import weakref
import traceback
from StringIO import StringIO
from itertools import chain, imap
from xml.etree import cElementTree as et

import libvirt
@@ -16,13 +14,9 @@ from sjrpc.utils import threadless, pass_connection
from cloudcontrol.node.host import Handler as HostHandler
from cloudcontrol.node.tags import Tag, tag_inspector, get_tags
from cloudcontrol.node.hypervisor import tags
from cloudcontrol.node.hypervisor.lib import (
    DOMAIN_STATES, EVENTS, STORAGE_STATES,
    EventLoop as VirEventLoop,
)
from cloudcontrol.node.hypervisor.domains import VirtualMachine
from cloudcontrol.node.hypervisor.kvm import KVM
from cloudcontrol.node.exc import (
    UndefinedDomain, PoolStorageError, DRBDError, VMMigrationError,
    UndefinedDomain, DRBDError, VMMigrationError,
)
from cloudcontrol.node.hypervisor.jobs import (
    ImportVolume, ExportVolume, TCPTunnel, DRBD,
@@ -78,7 +72,7 @@ class Handler(HostHandler):
    def virt_connect_cb(self, *args):
        # initialize hypervisor instance
        try:
            self.hypervisor = Hypervisor(
            self.hypervisor = KVM(
                name=self.hypervisor_name,
                handler=self,
            )
@@ -265,7 +259,7 @@ class Handler(HostHandler):
        try:
            self.hypervisor.domains[name].destroy()
        except libvirt.libvirtError as exc:
            # Libvirt raises exception 'domain is not running' event is domain
            # Libvirt raises exception 'domain is not running' even if domain
            # is running, might be a bug in libvirt
            if 'domain is not running' not in str(exc) or (
                self.hypervisor.domains[name].state != 'running'):
@@ -791,368 +785,3 @@ class Handler(HostHandler):

        # remove job from job_manager list
        self.main.job_manager.notify(job)


class Hypervisor(object):
    """Container for all hypervisor related state."""
    def __init__(self, name, handler):
        """
        :param str name: name of hypervisor instance
        :param Handler handler: hypervisor handler
        """
        self.handler = weakref.proxy(handler)

        #: hv attributes
        self.name = name
        self.type = u'kvm'

        # register libvirt error handler
        libvirt.registerErrorHandler(self.vir_error_cb, None)
        # libvirt event loop abstraction
        self.vir_event_loop = VirEventLoop(self.handler.main.evloop)

        self.vir_con = libvirt.open('qemu:///system')  # currently only support KVM

        # findout storage
        self.storage = StorageIndex(handler, self.vir_con)

        logger.debug('Storages: %s', self.storage.paths)

        #: domains: vms, containers...
        self.domains = dict()
        # find defined domains
        for dom_name in self.vir_con.listDefinedDomains():
            dom = self.vir_con.lookupByName(dom_name)
            self.domains[dom.name()] = VirtualMachine(dom, self)
        # find started domains
        for dom_id in self.vir_con.listDomainsID():
            dom = self.vir_con.lookupByID(dom_id)
            self.domains[dom.name()] = VirtualMachine(dom, self)

        logger.debug('Domains: %s', self.domains)

        self.vir_con.domainEventRegister(self.vir_cb, None)  # TODO find out args

    def stop(self):
        self.vir_event_loop.stop()
        # unregister callback
        try:
            self.vir_con.domainEventDeregister(self.vir_cb)
        except libvirt.libvirtError:
            # in case the libvirt connection is broken, it will raise the error
            pass
        ret = self.vir_con.close()
        logger.debug('Libvirt still handling %s ref connections', ret)
        # TODO delet objects

    def vir_error_cb(self, ctxt, err):
        """Libvirt error callback.

        See http://libvirt.org/errors.html for more informations.

        :param ctxt: arbitrary context data (not needed because context is
            givent by self
        :param err: libvirt error code
        """
        logger.error('Libvirt error %s', err)

    def vir_cb(self, conn, dom, event, detail, opaque):
        """Callback for libvirt event loop."""
        logger.debug('Received event %s on domain %s, detail %s', event,
                     dom.name(), detail)

        event = EVENTS[event]

        if event == 'Added':
            # update Storage pools in case VM has volumes that were created
            self.storage.update()
            if dom.name() in self.domains:
                # sometimes libvirt send us the same event multiple times
                # this can be the result of a change in the domain configuration
                # we first remove the old domain
                vm = self.domains.pop(dom.name())
                self.handler.tag_db.remove_sub_object(vm.name)
                logger.debug('Domain %s recreated', dom.name())
            vm = VirtualMachine(dom, self)
            logger.info('Created domain %s', vm.name)
            self.domains[vm.name] = vm
            self.handler.tag_db.add_sub_object(vm.name, vm.tags.itervalues())
            self.update_domain_count()
        elif event == 'Removed':
            vm_name = dom.name()
            self.vm_unregister(vm_name)
            logger.info('Removed domain %s', vm_name)
        elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved',
                       'Restored'):
            vm = self.domains.get(dom.name())
            # sometimes libvirt sent a start event before a created event so be
            # careful
            if vm is not None:
                try:
                    state = DOMAIN_STATES[dom.info()[0]]
                except libvirt.libvirtError as exc:
                    # checks that domain was not previously removed
                    # seems to happen only in libvirt 0.8.8
                    if 'Domain not found' in str(exc):
                        self.vm_unregister(dom.name())
                    else:
                        raise
                else:
                    logger.info('Domain change state from %s to %s', vm.state,
                                 state)
                    vm.state = state
                    self.update_domain_count()

    def vm_unregister(self, name):
        """Unregister a VM from the cc-server and remove it from the index."""
        try:
            vm = self.domains.pop(name)
        except KeyError:
            # domain already removed, see hypervisor/domains/vm_tags.py
            # sometimes libvirt send us the remove event too late
            # we still update storage and tag attributes
            pass
        else:
            self.handler.tag_db.remove_sub_object(vm.name)
            # update Storage pools in case VM had volumes that were deleted
            self.storage.update()
            self.update_domain_count()

    def update_domain_count(self):
        """Update domain state count tags."""
        # update domain state counts
        for tag in ('nvm', 'vmpaused', 'vmstarted', 'vmstopped', 'cpualloc',
                    'cpurunning', 'memalloc', 'memrunning'):
            self.handler.tag_db['__main__'][tag].update_value()

    def vm_define(self, xml_desc):
        """Create a VM on the Hypervisor
        :param str xml_desc: XML description in libvirt format
        :return: VM name created
        """
        try:
            return self.vir_con.defineXML(xml_desc).name()
        except libvirt.libvirtError:
            logger.exception('Error while creating domain')
            # reraise exception for the cc-server
            raise

    def _count_domain(self, filter=lambda d: True):
        count = 0

        for dom in self.domains.itervalues():
            if filter(dom):
                count += 1

        return count

    @property
    def vm_started(self):
        """Number of VMs started."""
        return self._count_domain(lambda d: d.state == 'running')

    @property
    def vm_stopped(self):
        """Number of VMs stopped."""
        return self._count_domain(lambda d: d.state == 'stopped')

    @property
    def vm_paused(self):
        """Number of VMs paused."""
        return self._count_domain(lambda d: d.state == 'paused')

    @property
    def vm_total(self):
        """Total number of VMs on the hypervisor."""
        return self._count_domain()


class StorageIndex(object):
    """Keep an index of all storage volume paths."""
    def __init__(self, handler, lv_con):
        """
        :param handler: Hypervisor handler instance
        :param lv_con: Libvirt connection
        """
        self.handler = handler
        self.lv_con = lv_con
        self.storages = dict(
            (s.name, s) for s in imap(
                Storage,
                imap(
                    lv_con.storagePoolLookupByName,
                    chain(
                        lv_con.listDefinedStoragePools(),
                        lv_con.listStoragePools(),
                    ),
                ),
            ),
        )

        self.paths = None
        self.update_path_index()


    def update(self):
        """Update storage pools and volumes."""
        # go through all storage pools and check if it is already in the index
        for lv_storage in imap(
            self.lv_con.storagePoolLookupByName,
            chain(
                self.lv_con.listDefinedStoragePools(),
                self.lv_con.listStoragePools(),
            ),
        ):
            if lv_storage.name() in self.storages:
                # update
                self.storages[lv_storage.name()].update()
            else:
                # add storage pool
                s = Storage(lv_storage)
                self.storages[s.name] = s
                # add tags
                self.handler.tag_db.add_tags((
                    Tag('sto%s_state' % s.name, lambda: s.state, 5, 5),
                    Tag('sto%s_size' % s.name, lambda: s.capacity, 5, 5),
                    Tag('sto%s_free' % s.name, lambda: s.available, 5, 5),
                    Tag('sto%s_used' % s.name,
                        lambda: s.capacity - s.available, 5, 5),
                    Tag('sto%s_type' % s.name, lambda: s.type, 5, 5),
                ))

        self.update_path_index()

    def update_path_index(self):
        self.paths = dict(
            (v.path, v) for v in chain.from_iterable(imap(
                lambda s: s.volumes.itervalues(),
                self.storages.itervalues(),
            )),
        )

    def get_volume(self, path):
        return self.paths.get(path)

    def get_storage(self, name):
        return self.storages.get(name)

    def create_volume(self, pool_name, volume_name, capacity):
        """Create a new volume in the storage pool.

        :param str name: name for the volume
        :param int capacity: size for the volume
        """
        # get volume
        logger.debug('asked pool %s', pool_name)
        logger.debug('Pool state %s', self.storages)
        try:
            pool = self.storages[pool_name]
        except KeyError:
            raise PoolStorageError('Invalid pool name')
        if pool is None:
            raise Exception('Storage pool not found')
        try:
            new_volume = pool.lv_storage.createXML("""<volume>
                                        <name>%s</name>
                                        <capacity>%d</capacity>
                                      </volume>""" % (volume_name, capacity), 0)
        except libvirt.libvirtError:
            logger.exception('Error while creating volume')
            raise

        new_volume = Volume(new_volume)
        # if success add the volume to the index
        self.paths[new_volume.path] = new_volume
        # and also to its storage pool
        self.storages[new_volume.storage].volumes[new_volume.name] = new_volume

        return new_volume

    def delete_volume(self, pool_name, volume_name):
        """Delete a volume in the givent storage pool.

        :param str pool_name: name for the storage pool
        :param str volume_name: name for the volume
        """
        # get volume
        try:
            pool = self.storages[pool_name]
        except KeyError:
            raise PoolStorageError('Invalid pool name')
        try:
            volume = pool.volumes[volume_name]
        except KeyError:
            raise PoolStorageError('Invalid volume name')
        # delete from index
        del self.paths[volume.path]
        del self.storages[pool_name].volumes[volume_name]

        # delete volume
        try:
            volume.lv_volume.delete(0)
        except libvirt.libvirtError:
            logger.exception('Error while deleting volume')
            raise


class Storage(object):
    """Storage abstraction."""
    def __init__(self, lv_storage):
        """
        :param lv_storage: Libvirt pool storage instance
        """
        self.uuid = lv_storage.UUID()
        self.name = lv_storage.name()
        self.lv_storage = lv_storage

        self.state, self.capacity = None, None
        self.allocation, self.available = None, None

        self.type = et.ElementTree().parse(
            StringIO(lv_storage.XMLDesc(0))).get('type')

        self.volumes = dict((v.name, v) for v in imap(
            Volume,
            (lv_storage.storageVolLookupByName(n) for n in
             lv_storage.listVolumes()),
        ))

        self.update_attr()

    def update(self):
        self.update_attr()

        # update volumes
        for vol_name in self.lv_storage.listVolumes():
            if vol_name in self.volumes:
                # update volume
                self.volumes[vol_name].update()
            else:
                # add volume
                v = Volume(self.lv_storage.storageVolLookupByName(vol_name))
                self.volumes[v.name] = v

    def update_attr(self):
        self.state, self.capacity, self.allocation, self.available = self.lv_storage.info()
        self.state = STORAGE_STATES[self.state]

        self.type = et.ElementTree().parse(
            StringIO(self.lv_storage.XMLDesc(0))).get('type')



class Volume(object):
    """Volume abstraction."""
    def __init__(self, lv_volume):
        """
        :param lv_volume: Libvirt volume instance
        """
        self.storage = lv_volume.storagePoolLookupByVolume().name()
        self.path = lv_volume.path()
        self.name = lv_volume.name()
        self.capacity, self.allocation = None, None
        self.lv_volume = lv_volume
        self.update()

    def update(self):
        self.capacity, self.allocation = self.lv_volume.info()[1:]
+191 −0
Original line number Diff line number Diff line
"""KVM hypervisor support."""

import logging
import weakref

import libvirt

from cloudcontrol.node.hypervisor.lib import (
    DOMAIN_STATES, EVENTS,
    EventLoop as VirEventLoop,
    StorageIndex,
)
from cloudcontrol.node.hypervisor.domains import VirtualMachine


logger = logging.getLogger(__name__)


# FIXME create abstract base class for any hypervisor
class KVM(object):
    """Container for all hypervisor related state."""
    def __init__(self, name, handler):
        """
        :param str name: name of hypervisor instance
        :param Handler handler: hypervisor handler
        """
        self.handler = weakref.proxy(handler)

        #: hv attributes
        self.name = name
        self.type = u'kvm'

        # register libvirt error handler
        libvirt.registerErrorHandler(self.vir_error_cb, None)
        # libvirt event loop abstraction
        self.vir_event_loop = VirEventLoop(self.handler.main.evloop)

        self.vir_con = libvirt.open('qemu:///system')  # currently only support KVM

        # findout storage
        self.storage = StorageIndex(handler, self.vir_con)

        logger.debug('Storages: %s', self.storage.paths)

        #: domains: vms, containers...
        self.domains = dict()
        # find defined domains
        for dom_name in self.vir_con.listDefinedDomains():
            dom = self.vir_con.lookupByName(dom_name)
            self.domains[dom.name()] = VirtualMachine(dom, self)
        # find started domains
        for dom_id in self.vir_con.listDomainsID():
            dom = self.vir_con.lookupByID(dom_id)
            self.domains[dom.name()] = VirtualMachine(dom, self)

        logger.debug('Domains: %s', self.domains)

        self.vir_con.domainEventRegister(self.vir_cb, None)  # TODO find out args

    def stop(self):
        self.vir_event_loop.stop()
        # unregister callback
        try:
            self.vir_con.domainEventDeregister(self.vir_cb)
        except libvirt.libvirtError:
            # in case the libvirt connection is broken, it will raise the error
            pass
        ret = self.vir_con.close()
        logger.debug('Libvirt still handling %s ref connections', ret)
        # TODO delet objects

    def vir_error_cb(self, ctxt, err):
        """Libvirt error callback.

        See http://libvirt.org/errors.html for more informations.

        :param ctxt: arbitrary context data (not needed because context is
            givent by self
        :param err: libvirt error code
        """
        logger.error('Libvirt error %s', err)

    def vir_cb(self, conn, dom, event, detail, opaque):
        """Callback for libvirt event loop."""
        logger.debug('Received event %s on domain %s, detail %s', event,
                     dom.name(), detail)

        event = EVENTS[event]

        if event == 'Added':
            # update Storage pools in case VM has volumes that were created
            self.storage.update()
            if dom.name() in self.domains:
                # sometimes libvirt send us the same event multiple times
                # this can be the result of a change in the domain configuration
                # we first remove the old domain
                vm = self.domains.pop(dom.name())
                self.handler.tag_db.remove_sub_object(vm.name)
                logger.debug('Domain %s recreated', dom.name())
            vm = VirtualMachine(dom, self)
            logger.info('Created domain %s', vm.name)
            self.domains[vm.name] = vm
            self.handler.tag_db.add_sub_object(vm.name, vm.tags.itervalues())
            self.update_domain_count()
        elif event == 'Removed':
            vm_name = dom.name()
            self.vm_unregister(vm_name)
            logger.info('Removed domain %s', vm_name)
        elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved',
                       'Restored'):
            vm = self.domains.get(dom.name())
            # sometimes libvirt sent a start event before a created event so be
            # careful
            if vm is not None:
                try:
                    state = DOMAIN_STATES[dom.info()[0]]
                except libvirt.libvirtError as exc:
                    # checks that domain was not previously removed
                    # seems to happen only in libvirt 0.8.8
                    if 'Domain not found' in str(exc):
                        self.vm_unregister(dom.name())
                    else:
                        raise
                else:
                    logger.info('Domain change state from %s to %s', vm.state,
                                 state)
                    vm.state = state
                    self.update_domain_count()

    def vm_unregister(self, name):
        """Unregister a VM from the cc-server and remove it from the index."""
        try:
            vm = self.domains.pop(name)
        except KeyError:
            # domain already removed, see hypervisor/domains/vm_tags.py
            # sometimes libvirt send us the remove event too late
            # we still update storage and tag attributes
            pass
        else:
            self.handler.tag_db.remove_sub_object(vm.name)
            # update Storage pools in case VM had volumes that were deleted
            self.storage.update()
            self.update_domain_count()

    def update_domain_count(self):
        """Update domain state count tags."""
        # update domain state counts
        for tag in ('nvm', 'vmpaused', 'vmstarted', 'vmstopped', 'cpualloc',
                    'cpurunning', 'memalloc', 'memrunning'):
            self.handler.tag_db['__main__'][tag].update_value()

    def vm_define(self, xml_desc):
        """Create a VM on the Hypervisor
        :param str xml_desc: XML description in libvirt format
        :return: VM name created
        """
        try:
            return self.vir_con.defineXML(xml_desc).name()
        except libvirt.libvirtError:
            logger.exception('Error while creating domain')
            # reraise exception for the cc-server
            raise

    def _count_domain(self, filter=lambda d: True):
        count = 0

        for dom in self.domains.itervalues():
            if filter(dom):
                count += 1

        return count

    @property
    def vm_started(self):
        """Number of VMs started."""
        return self._count_domain(lambda d: d.state == 'running')

    @property
    def vm_stopped(self):
        """Number of VMs stopped."""
        return self._count_domain(lambda d: d.state == 'stopped')

    @property
    def vm_paused(self):
        """Number of VMs paused."""
        return self._count_domain(lambda d: d.state == 'paused')

    @property
    def vm_total(self):
        """Total number of VMs on the hypervisor."""
        return self._count_domain()
+196 −1

File changed.

Preview size limit exceeded, changes collapsed.