Skip to content
Snippets Groups Projects
__init__.py 8.09 KiB
import logging
import weakref
from functools import partial
from itertools import chain, imap

import libvirt

from ccnode.host import Handler as HostHandler
from ccnode.tags import Tag, tag_inspector, get_tags
from ccnode.hypervisor import tags
from ccnode.hypervisor import lib as _libvirt
from ccnode.hypervisor.lib import DOMAIN_STATES, EVENTS, STORAGE_STATES, EventLoop
from ccnode.hypervisor.domains import VirtualMachine


logger = logging.getLogger(__name__)


class Handler(HostHandler):
    def __init__(self, *args, **kwargs):
        """
        :param proxy: sjRpc proxy
        :param hypervisor_name: hypervisor name
        """
        HostHandler.__init__(self, *args, **kwargs)

        for t in tag_inspector(tags, self):
            self.tags[t.name] = t

        # set tag hv
        # self.tags['hv'] = Tag('hv', )

        # initialize hypervisor instance
        global hypervisor
        if hypervisor is None:
            hypervisor = Hypervisor(
                name=kwargs.pop('hypervisor_name', None),
                proxy=kwargs['proxy'],
            )

        self.hypervisor = weakref.proxy(hypervisor)

        # register hypervisor storage tags
        for name, storage in self.hypervisor.storage.storages.iteritems():
            for t in (
                Tag('sto%s_state' % name, lambda: storage.state, 5),
                Tag('sto%s_size' % name, lambda: storage.capacity, 5),
                Tag('sto%s_free' % name, lambda: storage.available, 5),
                Tag('sto%s_used' % name,
                    lambda: storage.capacity - storage.available, 5),
            ):
                self.tags[t.name] = t

        # register domains
        proxy = kwargs.pop('proxy')
        for dom in hypervisor.domains.itervalues():
            name = dom.name
            logger.debug(u'Registered domain %s' % name)
            proxy.register(name, 'vm')

    def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
        """Get subtags."""
        global hypervisor

        domain = hypervisor.get_domain_by_name(sub_id)

        if domain is None:
            logger.debug(u'Failed to find domain with name %s.' % sub_id)
            return

        return get_tags(domain, tags, noresolve_tags)

    def vm_define(self, name):
        pass

    def vm_undefine(self, name):
        pass

    def vm_export(self, name, format='xml'):
        pass

    def vm_stop(self, vm_names=None, force=False):
        pass

    def vm_start(self, vm_names=None):
        pass

    def vm_suspend(self, vm_names=None):
        pass

    def vm_resume(self, vm_names=None):
        pass


class Hypervisor(object):
    """Container for all hypervisor related state."""
    def __init__(self, name, proxy):
        """
        :param str name: name of hypervisor instance
        :param proxy: sjRpc proxy
        """
        self.sjproxy = weakref.proxy(proxy)

        #: hv attributes
        self.name = name
        self.type = u'kvm'

        self.event_loop = EventLoop()
        # This tells libvirt what event loop implementation it
        # should use
        libvirt.virEventRegisterImpl(
            self.event_loop.add_handle,
            self.event_loop.update_handle,
            self.event_loop.remove_handle,
            self.event_loop.add_timer,
            self.event_loop.update_timer,
            self.event_loop.remove_timer,
        )

        self.event_loop.start()

        # TODO cleanup connection on stop
        _libvirt.connection = libvirt.open('qemu:///system')  # currently only support KVM

        # findout storage
        self.storage = StorageIndex(_libvirt.connection)

        logger.debug('Storages: %s', self.storage.paths)


        #: domains: vms, containers...
        self.domains = dict()
        # find defined domains
        for dom_name in _libvirt.connection.listDefinedDomains():
            dom = _libvirt.connection.lookupByName(dom_name)
            self.domains[dom.UUID()] = VirtualMachine(dom, self)
        # find started domains
        for dom_id in _libvirt.connection.listDomainsID():
            dom = _libvirt.connection.lookupByID(dom_id)
            self.domains[dom.UUID()] = VirtualMachine(dom, self)

        logger.debug('Domains: %s', self.domains)

        self.event_loop.register_callbacks(self.callback)

    def callback(self, conn, dom, event, detail, opaque):
        """Callback for libvirt event loop."""
        event = EVENTS[event]

        if event == 'Added':
            vm = VirtualMachine(dom, self)
            self.domains[vm.uuid] = vm
            self.sjproxy.register(vm.name, 'vm')
            logger.debug('Add domain: %s (%s)', vm.name, vm.uuid)
        elif event == 'Removed':
            vm = self.domains.pop(dom.UUID())
            self.sjproxy.unregister(vm.name)
            logger.debug('Delete domain: %s (%s)', vm.name, vm.uuid)
        elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved',
                       'Restored'):
            vm = self.domains.get(dom.UUID())
            # sometimes libvirt sent a start event before a created event so be
            # careful
            if vm is not None:
                state = DOMAIN_STATES[dom.info()[0]]
                logger.debug('Domain change state from %s to %s', vm.state,
                             state)
                vm.state = state

    def get_domain_by_name(self, name):
        """Get a domain by name."""
        for d in self.domains.itervalues():
            if d.name == name:
                return d

        return None

    def _count_domain(self, filter=lambda d: True):
        count = 0

        for dom in self.domains.itervalues():
            if filter(dom):
                count += 1

        return count

    @property
    def vm_started(self):
        """Number of VMs started."""
        return self._count_domain(lambda d: d.state == 'running')

    @property
    def vm_stopped(self):
        """Number of VMs stopped."""
        return self._count_domain(lambda d: d.state == 'stopped')

    @property
    def vm_paused(self):
        """Number of VMs paused."""
        return self._count_domain(lambda d: d.state == 'paused')

    @property
    def vm_total(self):
        """Total number of VMs on the hypervisor."""
        return self._count_domain()


hypervisor = None


class StorageIndex(object):
    """Keep an index of all storage volume paths."""
    def __init__(self, lv_con):
        """
        :param lv_con: Libvirt connection
        """
        self.storages = dict(
            (s.name, s) for s in imap(
                Storage,
                imap(
                    lv_con.storagePoolLookupByName,
                    chain(
                        lv_con.listDefinedStoragePools(),
                        lv_con.listStoragePools(),
                    ),
                ),
            ),
        )

        self.paths = dict(
            (v.path, v) for v in chain.from_iterable(map(
                lambda s: s.volumes,
                self.storages.itervalues(),
            )),
        )

    def get_volume(self, path):
        return self.paths.get(path)

    def get_storage(self, name):
        return self.Storage.get(name)


class Storage(object):
    """Storage abstraction."""
    def __init__(self, lv_storage):
        """
        :param lv_storage: Libvirt pool storage instance
        """
        self.uuid = lv_storage.UUID()
        self.name = lv_storage.name()

        self.state, self.capacity, self.allocation, self.available = lv_storage.info()
        self.state = STORAGE_STATES[self.state]

        self.volumes = map(
            partial(Volume, storage=self),
            (lv_storage.storageVolLookupByName(n) for n in
            lv_storage.listVolumes()),
        )


class Volume(object):
    """Volume abstraction."""
    def __init__(self, lv_volume, storage):
        """
        :param lv_volume: Libvirt volume instance
        :param storage: parent storage instance
        """
        # self.storage = None if storage is None else weakref.proxy(storage)
        self.storage = lv_volume.storagePoolLookupByVolume().name()
        self.path = lv_volume.path()
        self.name = lv_volume.name()
        self.capacity, self.allocation = lv_volume.info()[1:]