Skip to content
__init__.py 41.2 KiB
Newer Older
import logging
Anael Beutot's avatar
Anael Beutot committed
import os
Anael Beutot's avatar
Anael Beutot committed
import signal
Anael Beutot's avatar
Anael Beutot committed
import socket
Anael Beutot's avatar
Anael Beutot committed
import time
Anael Beutot's avatar
Anael Beutot committed
import weakref
import traceback
from StringIO import StringIO
from itertools import chain, imap
from xml.etree import cElementTree as et
Anael Beutot's avatar
Anael Beutot committed
from sjrpc.utils import threadless, pass_connection
from cloudcontrol.node.host import Handler as HostHandler
from cloudcontrol.node.tags import Tag, tag_inspector, get_tags
from cloudcontrol.node.hypervisor import tags
from cloudcontrol.node.hypervisor.lib import (
Anael Beutot's avatar
Anael Beutot committed
    DOMAIN_STATES, EVENTS, STORAGE_STATES,
    EventLoop as VirEventLoop,
)
from cloudcontrol.node.hypervisor.domains import VirtualMachine
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.node.exc import (
    UndefinedDomain, PoolStorageError, DRBDError, VMMigrationError,
)
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.node.hypervisor.jobs import (
    ImportVolume, ExportVolume, TCPTunnel, DRBD,
)
from cloudcontrol.node.utils import close_fds, set_signal_map


logger = logging.getLogger(__name__)


# FIXME find a way to refactor Handler and Hypervisor class
class Handler(HostHandler):
    def __init__(self, *args, **kwargs):
        """
Anael Beutot's avatar
Anael Beutot committed
        :param loop: MainLoop instance
Anael Beutot's avatar
Anael Beutot committed
        :param hypervisor_name: hypervisor name
        self.hypervisor_name = kwargs.pop('hypervisor_name')
        HostHandler.__init__(self, *args, **kwargs)

Anael Beutot's avatar
Anael Beutot committed
        #: keep index of asynchronous calls
        self.async_calls = dict()
        self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb)
        self.hypervisor = None
        self._virt_connected = False

        # register tags
        self.tag_db.add_tags(tag_inspector(tags, self))

    @property
    def virt_connected(self):
        return self._virt_connected

    @virt_connected.setter
    def virt_connected(self, value):
        self._virt_connected = value
        # update tags
        for tag in ('vir_status', 'sto', 'nvm', 'vmpaused', 'vmstarted',
                    'vmstopped', 'hvver', 'libvirtver', 'hv'):
            self.tag_db['__main__'][tag].update_value()

    def start(self):
        self.timer.start()
        HostHandler.start(self)

    def stop(self):
        self.timer.stop()
        if self.hypervisor is not None:
            self.hypervisor.stop()
        HostHandler.stop(self)

    def virt_connect_cb(self, *args):
        # initialize hypervisor instance
        try:
            self.hypervisor = Hypervisor(
                name=self.hypervisor_name,
                handler=self,
            )
        except libvirt.libvirtError:
            logger.exception('Error while connecting to libvirt')
            return

        self.virt_connected = True
Anael Beutot's avatar
Anael Beutot committed
        # register hypervisor storage tags
        for name, storage in self.hypervisor.storage.storages.iteritems():
            self.tag_db.add_tags((
                Tag('sto%s_state' % name, lambda: storage.state, 5, 5),
                Tag('sto%s_size' % name, lambda: storage.capacity, 5, 5),
                Tag('sto%s_free' % name, lambda: storage.available, 5, 5),
Anael Beutot's avatar
Anael Beutot committed
                Tag('sto%s_used' % name,
                    lambda: storage.capacity - storage.available, 5, 5),
                Tag('sto%s_type' % name, lambda: storage.type, 5, 5),
        # register domains
Anael Beutot's avatar
Anael Beutot committed
        for dom in self.hypervisor.domains.itervalues():
            self.tag_db.add_sub_object(dom.name, dom.tags.itervalues())
        # we must refresh those tags only when domains tags are registered to
        # have the calculated values
        for tag in ('cpualloc', 'cpurunning', 'memalloc', 'memrunning'):
            self.tag_db['__main__'][tag].update_value()

Anael Beutot's avatar
Anael Beutot committed
        self.rpc_handler.update(dict(
            vm_define=self.vm_define,
            vm_undefine=self.vm_undefine,
            vm_export=self.vm_export,
            vm_stop=self.vm_stop,
            vm_start=self.vm_start,
            vm_suspend=self.vm_suspend,
            vm_resume=self.vm_resume,
        ))
        self.main.reset_handler('vm_define', self.vm_define)
        self.main.reset_handler('vm_undefine', self.vm_undefine)
        self.main.reset_handler('vm_export', self.vm_export)
        self.main.reset_handler('vm_stop', self.vm_stop)
        self.main.reset_handler('vm_destroy', self.vm_destroy)
        self.main.reset_handler('vm_start', self.vm_start)
        self.main.reset_handler('vm_suspend', self.vm_suspend)
        self.main.reset_handler('vm_resume', self.vm_resume)
        self.main.reset_handler('vm_migrate_tunneled', self.vm_migrate_tunneled)
        self.main.reset_handler('vol_create', self.vol_create)
        self.main.reset_handler('vol_delete', self.vol_delete)
        self.main.reset_handler('vol_import', self.vol_import)
        self.main.reset_handler('vol_import_wait', self.vol_import_wait)
        self.main.reset_handler('vol_export', self.vol_export)
        self.main.reset_handler('tun_setup', self.tun_setup)
        self.main.reset_handler('tun_connect', self.tun_connect)
        self.main.reset_handler('tun_connect_hv', self.tun_connect_hv)
        self.main.reset_handler('tun_destroy', self.tun_destroy)
Anael Beutot's avatar
Anael Beutot committed
        self.main.reset_handler('drbd_setup', self.drbd_setup)
        self.main.reset_handler('drbd_connect', self.drbd_connect)
        self.main.reset_handler('drbd_role', self.drbd_role)
        self.main.reset_handler('drbd_takeover', self.drbd_takeover)
        self.main.reset_handler('drbd_sync_status', self.drbd_sync_status)
        self.main.reset_handler('drbd_shutdown', self.drbd_shutdown)
Anael Beutot's avatar
Anael Beutot committed
        self.main.reset_handler('vm_open_console', self.vm_open_console)
        self.main.reset_handler('vm_disable_virtio_cache',
                                self.vm_disable_virtio_cache)
        self.main.reset_handler('vm_set_autostart', self.vm_set_autostart)

        # if everything went fine, unregister the timer
        self.timer.stop()

    def virt_connect_restart(self):
        """Restart libvirt connection.
        This method might be called when libvirt connection is lost.
        """
        if not self.virt_connected:
            return

        logger.error('Connection to libvirt lost, trying to restart')
        # update connection state
        self.virt_connected = False
        # refresh those tags
        for tag in ('cpualloc', 'cpurunning', 'memalloc', 'memrunning'):
            self.tag_db['__main__'][tag].update_value()

        # unregister tags that will be re registered later
        for storage in self.hypervisor.storage.storages:
            self.tag_db.remove_tags((
                'sto%s_state' % storage,
                'sto%s_size' % storage,
                'sto%s_free' % storage,
                'sto%s_used' % storage,
                'sto%s_type' % storage,
        # unregister sub objects (for the same reason)
        for sub_id in self.tag_db.keys():
            if sub_id == '__main__':
                continue
            self.tag_db.remove_sub_object(sub_id)
        # stop and delete hypervisor instance
        self.hypervisor.stop()
        self.hypervisor = None

        # remove handlers related to libvirt
        self.main.remove_handler('vm_define')
        self.main.remove_handler('vm_undefine')
        self.main.remove_handler('vm_export')
        self.main.remove_handler('vm_stop')
        self.main.remove_handler('vm_destroy')
        self.main.remove_handler('vm_start')
        self.main.remove_handler('vm_suspend')
        self.main.remove_handler('vm_resume')
        self.main.remove_handler('vm_migrate_tunneled')
        self.main.remove_handler('vol_create')
        self.main.remove_handler('vol_delete')
        self.main.remove_handler('vol_import')
        self.main.remove_handler('vol_import_wait')
        self.main.remove_handler('vol_export')
        self.main.remove_handler('tun_setup')
Loading full blame...