Commit 338462d9 authored by Anael Beutot's avatar Anael Beutot
Browse files

Tag database to abstract tag manipulation.

Support for registering/unregistering sub object to the cc-server.
Support for tag lifecycle handling (libev), was previously managed by plugins.
parent 18686fcd
Loading
Loading
Loading
Loading
+3 −5
Original line number Diff line number Diff line
@@ -32,15 +32,13 @@ class Handler(BasePlugin):
        BasePlugin.__init__(self, *args, **kwargs)

        # add plugin tags
        self.tag_db['__main__'].update(dict(
            (t.name, t) for t in tag_inspector(tags),
        ))
        self.tag_db.add_tags(tag_inspector(tags))

        # disk related tags
        self.tag_db['__main__'].update(dict((t.name, t) for t in imap(
        self.tag_db.add_tags(imap(
            lambda d: Tag('disk%s_size' % d, disk_tag_value(d), 60),
            self.tag_db['__main__']['disk']._calculate_value().split(),
        )))
        ))

        # rpc handler
        self.rpc_handler.update(dict(
+43 −85
Original line number Diff line number Diff line
@@ -32,7 +32,21 @@ class Handler(HostHandler):

        self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb)
        self.hypervisor = None
        self.virt_connected = False
        self._virt_connected = False

        # register tags
        self.tag_db.add_tags(tag_inspector(tags, self))

    @property
    def virt_connected(self):
        return self._virt_connected

    @virt_connected.setter
    def virt_connected(self, value):
        self._virt_connected = value
        # update tags
        for tag in ('vir_status', 'hvver', 'libvirtver'):
            self.tag_db['__main__'][tag].calculate_value()

    def start(self):
        self.timer.start()
@@ -59,33 +73,17 @@ class Handler(HostHandler):

        # register hypervisor storage tags
        for name, storage in self.hypervisor.storage.storages.iteritems():
            for t in (
                Tag('sto%s_state' % name, lambda: storage.state, 5),
                Tag('sto%s_size' % name, lambda: storage.capacity, 5),
                Tag('sto%s_free' % name, lambda: storage.available, 5),
            self.tag_db.add_tags((
                Tag('sto%s_state' % name, lambda: storage.state, 5, 5),
                Tag('sto%s_size' % name, lambda: storage.capacity, 5, 5),
                Tag('sto%s_free' % name, lambda: storage.available, 5, 5),
                Tag('sto%s_used' % name,
                    lambda: storage.capacity - storage.available, 5),
            ):
                self.tag_db['__main__'][t.name] = t
                self.main.reset_tag(t)
                    lambda: storage.capacity - storage.available, 5, 5),
            ))

        # register domains
        for dom in self.hypervisor.domains.itervalues():
            name = dom.name
            # proxy.register(name, 'vm')
            self.async_calls[self.main.rpc_con.rpc.async_call_cb(
                self.register_domain_cb,
                'register',
                name,
                'vm',
            )] = name

        hv_tags = dict(
            (t.name, t) for t in tag_inspector(tags, self),
        )
        self.tag_db['__main__'].update(hv_tags)
        for tag in hv_tags.itervalues():
            self.main.reset_tag(tag)
            self.tag_db.add_sub_object(dom.name, dom.tags.itervalues())

        self.rpc_handler.update(dict(
            vm_define=self.vm_define,
@@ -118,42 +116,34 @@ class Handler(HostHandler):
        logger.error('Connection to libvirt lost, trying to restart')
        # update connection state
        self.virt_connected = False
        # unregister tags that will be re register later
        # unregister tags that will be re registered later
        for storage in self.hypervisor.storage.storages:
            self.main.remove_tag('sto%s_state' % storage)
            self.main.remove_tag('sto%s_size' % storage)
            self.main.remove_tag('sto%s_free' % storage)
            self.main.remove_tag('sto%s_used' % storage)
            self.tag_db['__main__'].pop('sto%s_state' % storage)
            self.tag_db['__main__'].pop('sto%s_size' % storage)
            self.tag_db['__main__'].pop('sto%s_free' % storage)
            self.tag_db['__main__'].pop('sto%s_used' % storage)
            self.tag_db.remove_tags((
                'sto%s_state' % storage,
                'sto%s_size' % storage,
                'sto%s_free' % storage,
                'sto%s_used' % storage,
            ))
        # unregister sub objects (for the same reason)
        for sub_id in self.tag_db.keys():
            if sub_id == '__main__':
                continue
            self.main.remove_sub_object(sub_id)
            self.tag_db.pop(sub_id)
            self.tag_db.remove_sub_object(sub_id)
        # stop and delete hypervisor instance
        self.hypervisor.stop()
        self.hypervisor = None

        # remove handlers related to libvirt
        self.main.remove_handler('vm_define')
        self.main.remove_handler('vm_undefine')
        self.main.remove_handler('vm_export')
        self.main.remove_handler('vm_stop')
        self.main.remove_handler('vm_start')
        self.main.remove_handler('vm_suspend')
        self.main.remove_handler('vm_resume')
        # launch connection timer
        self.timer.start()

    # FIXME duplicate code
    def register_domain_cb(self, call_id, response=None, error=None):
        """RPC callback used when registering a domain on the cc-server."""
        name = self.async_calls.pop(call_id)
        if error is not None:
            logger.error('Error while registering domain, %s("%s")',
                         error['exception'], error['message'])
            return

        logger.debug('Registered domain %s', name)
        domain = self.hypervisor.domains[name]
        for tag in domain.tags.itervalues():
            self.main.reset_sub_tag(domain.name, tag)

    def iter_vms(self, vm_names):
        """Utility function to iterate over VM objects using their names."""
        if vm_names is None:
@@ -224,8 +214,6 @@ class Hypervisor(object):
        :param Handler handler: hypervisor handler
        """
        self.handler = weakref.proxy(handler)
        self.rpc_con = handler.main.rpc_con
        self.async_calls = dict()

        #: hv attributes
        self.name = name
@@ -280,23 +268,13 @@ class Hypervisor(object):

        if event == 'Added':
            vm = VirtualMachine(dom, self)
            logger.info('Created domain %s', vm.name)
            self.domains[vm.name] = vm
            # self.sjproxy.register(vm.name, 'vm')
            self.async_calls[self.rpc_con.rpc.async_call_cb(
                self.register_cb,
                'register',
                vm.name,
                'vm',
            )] = vm.name
            self.handler.tag_db.add_sub_object(vm.name, vm.tags.itervalues())
        elif event == 'Removed':
            logger.debug('About to remove domain')
            vm = self.domains.pop(dom.name())
            # self.sjproxy.unregister(vm.name)
            self.async_calls[self.rpc_con.rpc.async_call_cb(
                self.unregister_cb,
                'unregister',
                vm.name,
            )] = vm.name
            logger.info('Removed domain %s', vm.name)
            self.handler.tag_db.remove_sub_object(vm.name)
        elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved',
                       'Restored'):
            vm = self.domains.get(dom.name())
@@ -308,26 +286,6 @@ class Hypervisor(object):
                             state)
                vm.state = state

    def register_cb(self, call_id, response=None, error=None):
        """RPC callback for registering domain to cc-server."""
        vm = self.domains[self.async_calls.pop(call_id)]
        if error is not None:
            logger.error('Error while registering domain to server, %s("%s")',
                         error['exception'], error['message'])
        logger.info('Add domain: %s (%s)', vm.name, vm.uuid)
        # add tags
        for tag in vm.tags.itervalues():
            self.handler.main.reset_sub_tag(vm.name, tag)

    def unregister_cb(self, call_id, response=None, error=None):
        """RPC callback for unregistering domain to cc-server."""
        vm_name = self.async_calls.pop(call_id)
        if error is not None:
            logger.error('Error while unregistering domain to server, %s("%s")',
                         error['exception'], error['message'])
        logger.info('Delete domain: %s', vm_name)
        self.handler.main.remove_sub_object(vm_name)

    def vm_define(self, xml_desc):
        """Create a VM on the Hypervisor
        :param str xml_desc: XML description in libvirt format
+1 −0
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@ class VirtualMachine(object):
        #: state of VM: started, stoped, paused
        self._state = STATE[dom.info()[0]]
        #: tags for this VM
        # FIXME use a tag db instance
        self.tags = dict((t.name, t) for t in tag_inspector(vm_tags, self))
        # define dynamic tags
        i = 0
+0 −20
Original line number Diff line number Diff line
@@ -2,7 +2,6 @@

import logging
from itertools import chain
from functools import wraps

import pyev
import libvirt
@@ -48,25 +47,6 @@ EVENTS = (
)


def vir_tag(func):
    """Catches libvirt related exception.

    Decorator used for tag declarations that interacts with libvirt.

    """
    @wraps(func)
    def decorated(handl):
        if not handl.virt_connected:
            return
        try:
            func(handl)
        except libvirt.libvirtError:
            logger.exception('Unexpected libvirt error')
            handl.vir_con_restart()

    return decorated


# following event loop implementation was inspired by libvirt python example
# but updated to work with libev
class LoopHandler(object):
+44 −3
Original line number Diff line number Diff line
import logging
from functools import wraps

import libvirt

from ccnode.utils import and_
from ccnode.hypervisor.lib import vir_tag


logger = logging.getLogger(__name__)


def _virt_tag(func):
    """Catches libvirt related exception.

    Decorator used for tag declarations that interacts with libvirt.

    """
    @wraps(func)
    def decorated(handl):
        if not handl.virt_connected:
            return
        try:
            return func(handl)
        except libvirt.libvirtError:
            logger.exception('Unexpected libvirt error')
            handl.vir_con_restart()

    return decorated


def _check_virt_connected(func):
    """Check is libvirt is connected before caculating tag."""
    @wraps(func)
    def decorated(handl):
        if not handl.virt_connected:
            return
        return func(handl)


def vir_status(handl):
@@ -14,6 +49,7 @@ def htype():
    return u'kvm'


@_check_virt_connected
def hv(handl):
    """Hypervisor name."""
    # What is the point of this tag ? if the information not already in a and id
@@ -44,13 +80,13 @@ def hvm():
    return None


@vir_tag
@_virt_tag
def hvver(handl):
    """Hypervisor version."""
    return handl.hypervisor.vir_con.getVersion()


@vir_tag
@_virt_tag
def libvirtver(handl):
    """Version of running libvirt."""
    return handl.hypervisor.vir_con.getLibVersion()
@@ -62,27 +98,32 @@ def rjobs():


# storage pools
@_check_virt_connected
def sto(handl):
    """Storage pool names."""
    return u' '.join(handl.hypervisor.storage.storages.iterkeys())


# Vm related tags
@_check_virt_connected
def nvm(handl):
    """Number of VMS in the current hypervisor."""
    return handl.hypervisor.vm_total


@_check_virt_connected
def vmpaused(handl):
    """Count of VMs paused."""
    return handl.hypervisor.vm_paused


@_check_virt_connected
def vmstarted(handl):
    """Count of VMs started."""
    return handl.hypervisor.vm_started


@_check_virt_connected
def vmstopped(handl):
    """Count of VMs Stopped."""
    return handl.hypervisor.vm_stopped
Loading