Skip to content
__init__.py 14.1 KiB
Newer Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl.  If not, see <http://www.gnu.org/licenses/>.


Anael Beutot's avatar
Anael Beutot committed
import errno
Anael Beutot's avatar
Anael Beutot committed
import logging
Anael Beutot's avatar
Anael Beutot committed
import socket
Anael Beutot's avatar
Anael Beutot committed
import weakref
from StringIO import StringIO
from xml.etree import cElementTree as et
Anael Beutot's avatar
Anael Beutot committed
from collections import namedtuple
Anael Beutot's avatar
Anael Beutot committed

Anael Beutot's avatar
Anael Beutot committed
import pyev
import libvirt
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.common.client.tags import Tag, tag_inspector
Anael Beutot's avatar
Anael Beutot committed

from cloudcontrol.node.hypervisor import lib as _libvirt
from cloudcontrol.node.hypervisor.lib import DOMAIN_STATES as STATE
from cloudcontrol.node.hypervisor.domains import vm_tags
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.node.utils import SocketBuffer
from cloudcontrol.node.exc import ConsoleAlreadyOpened, ConsoleError
Anael Beutot's avatar
Anael Beutot committed


logger = logging.getLogger(__name__)
Anael Beutot's avatar
Anael Beutot committed
NetworkInterface = namedtuple('NetworkInterface', ('source', 'mac', 'model'))


class VirtualMachine(object):
    """Represent a VM instance."""
Anael Beutot's avatar
Anael Beutot committed

    #: buffer size for console connection handling
    BUFFER_LEN = 1024

Anael Beutot's avatar
Anael Beutot committed
    def __init__(self, dom, hypervisor):
        """
        :param dom: libvirt domain instance
Anael Beutot's avatar
Anael Beutot committed
        :param hypervisor: hypervisor where the VM is
        self.hypervisor = weakref.proxy(hypervisor)

        #: UUID string of domain
        self.uuid = dom.UUIDString()
        self.name = dom.name()
        #: state of VM: started, stoped, paused
Anael Beutot's avatar
Anael Beutot committed
        self._state = STATE[dom.info()[0]]
        #: tags for this VM
        # FIXME use a tag db instance
Anael Beutot's avatar
Anael Beutot committed
        self.tags = dict((t.name, t) for t in tag_inspector(vm_tags, self))
Anael Beutot's avatar
Anael Beutot committed
        #: Driver cache behavior for each VM storage, see
        #: http://libvirt.org/formatdomain.html#elementsDisks
        self.cache_behaviour = dict()
        # define dynamic tags
        i = 0
        for v in self.iter_disks():
            for t in (
Anael Beutot's avatar
Anael Beutot committed
                Tag('disk%s_size' % i, v.capacity, 10),
                Tag('disk%s_path' % i, v.path, 10),
                Tag('disk%s_pool' % i, v.storage, 10),  # FIXME: change
                Tag('disk%s_vol' % i, v.name, 10),
Anael Beutot's avatar
Anael Beutot committed
                Tag('disk%s_cache' %i,
                    lambda: self.cache_behaviour.get(v.path), 10)
            ):
                self.tags[t.name] = t
Anael Beutot's avatar
Anael Beutot committed

Anael Beutot's avatar
Anael Beutot committed
        i = 0
        for nic in self.iter_nics():
            for t in (
Anael Beutot's avatar
Anael Beutot committed
                Tag('nic%s_mac' % i, nic.mac, 10),
                Tag('nic%s_source' % i, nic.source, 10),
                Tag('nic%s_model' %i, nic.model, 10),
Anael Beutot's avatar
Anael Beutot committed
            ):
                self.tags[t.name] = t
Anael Beutot's avatar
Anael Beutot committed
        logger.debug('Virtual Machine tags: %s', self.tags)
        #: keep record of CPU stats (libev timestamp, cpu time)
        self.cpu_stats = (hypervisor.handler.main.evloop.now(), dom.info()[4])

Anael Beutot's avatar
Anael Beutot committed
        # attributes related to console handling
        self.stream = None
        self.sock = None  # socketpair endpoint
        self.read_watcher = None  # ev watcher for sock
        self.write_watcher = None  # ev watcher for sock
        self.from_tunnel = None  # buffer
        self.from_stream = None  # buffer
        self.stream_handling = 0  # libvirt stream event mask

Anael Beutot's avatar
Anael Beutot committed
    @property
    def state(self):
        return self._state

    @state.setter
    def state(self, value):
        self._state = value
        self.tags['status'].update_value()
        self.tags['vncport'].update_value()
Anael Beutot's avatar
Anael Beutot committed

Anael Beutot's avatar
Anael Beutot committed
    @property
    def lv_dom(self):
        """Libvirt domain instance."""
        return self.hypervisor.vir_con.lookupByUUIDString(self.uuid)
    def start(self):
        self.lv_dom.create()

    def stop(self):
        self.lv_dom.shutdown()
    def suspend(self):
        self.lv_dom.suspend()
    def resume(self):
        self.lv_dom.resume()
    def destroy(self):
        self.lv_dom.destroy()
    def undefine(self):
        self.lv_dom.undefine()

    @property
    def disks(self):
        return list(self.iter_disks())

    def iter_disks(self):
        for d in et.ElementTree().parse(
            StringIO(self.lv_dom.XMLDesc(0))
        ).findall('devices/disk'):
            if d.get('device') != 'disk':
                continue

            type_ = d.get('type')

            if type_ not in ('file', 'block'):
                continue

            path = d.find('source').get(dict(file='file', block='dev')[type_])
Anael Beutot's avatar
Anael Beutot committed

            # update cache behaviour
            driver = d.find('driver')
            if driver is None:
                driver = {}
            self.cache_behaviour[path] = driver.get('cache', 'default')

            volume = self.hypervisor.storage.get_volume(path)
            if volume is None:
                continue

            yield volume
Anael Beutot's avatar
Anael Beutot committed

    @property
    def nics(self):
        return list(self.iter_nics())

    def iter_nics(self):
        for nic in et.ElementTree().parse(
            StringIO(self.lv_dom.XMLDesc(0))
        ).findall('devices/interface'):
            if nic.get('type') == 'bridge':
                try:
                    mac = nic.find('mac').get('address')
                except AttributeError:
                    mac = None
                try:
                    model = nic.find('model').get('type')
                except AttributeError:
                    model = None
                try:
                    source = nic.find('source').get('bridge')
                except AttributeError:
                    source = None
                yield NetworkInterface(
                    mac=mac,
                    source=source,
                    model=model,
                )
Anael Beutot's avatar
Anael Beutot committed

    def open_console(self):
        if self.stream is not None:
            raise ConsoleAlreadyOpened('Console for this VM is already'
                                       ' opened')
        if self.hypervisor.handler.tag_db['libvirtver'].value.startswith('8'):
            raise ConsoleError(
                'Cannot open console, not compatible with this version of libvirt')
Anael Beutot's avatar
Anael Beutot committed
        logger.info('Opening console stream on VM %s', self.name)
        try:
            self.stream = self.hypervisor.vir_con.newStream(
                libvirt.VIR_STREAM_NONBLOCK)
        except libvirt.libvirtError:
            logger.error('Cannot create new stream for console %s', self.name)
            self.close_console()
            raise
        self.stream_handling = libvirt.VIR_STREAM_EVENT_READABLE | (
            libvirt.VIR_STREAM_EVENT_ERROR | libvirt.VIR_STREAM_EVENT_HANGUP)
        try:
            self.lv_dom.openConsole(None, self.stream, 0)
            self.stream.eventAddCallback(self.stream_handling,
                                         self.virt_console_stream_cb,
                                         None)
        except libvirt.libvirtError:
            logger.error('Cannot open console on domain %s', self.name)
            self.close_console()
            raise

        try:
            self.sock, tunnel_endpoint = socket.socketpair()
        except socket.error:
            logger.error('Cannot create socket pair for console on domain %s',
                         self.name)
            self.close_console()
            raise
        try:
            self.sock.setblocking(0)
        except socket.error:
            logger.error('Cannot set socket to non blocking for console on'
                         ' domain %s', self.name)
            self.close_console()
            raise
        self.read_watcher = self.hypervisor.handler.main.evloop.io(
            self.sock, pyev.EV_READ, self.read_from_tun_cb)
        self.read_watcher.start()
        self.write_watcher = self.hypervisor.handler.main.evloop.io(
            self.sock, pyev.EV_WRITE, self.write_to_tun_cb)
        # self.write_watcher.start()

        self.from_tunnel = SocketBuffer(4096)
        self.from_stream = SocketBuffer(4096)

        return tunnel_endpoint

    def virt_console_stream_cb(self, stream, events, opaque):
        """Handles read/write from/to libvirt stream."""

        if events & libvirt.VIR_EVENT_HANDLE_ERROR or (
            events & libvirt.VIR_EVENT_HANDLE_HANGUP):
            # error/hangup
            # logger.debug('Received error on stream')
            self.close_console()
            return

        if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
            # logger.debug('Write to stream')
            # logger.debug('Event %s', self.stream_handling)
            while True:
                try:
                    to_send = self.from_tunnel.popleft()
                except IndexError:
                    # update libvirt event mask
                    self.stream_handling ^= libvirt.VIR_STREAM_EVENT_WRITABLE
                    self.stream.eventUpdateCallback(self.stream_handling)
                    break
                send_buffer = to_send
                total_sent = 0
                while True:
                    try:
                        written = self.stream.send(send_buffer)
                    except:
                        # libvirt send error
                        logger.exception('Error while writing to stream')
                        self.close_console()
                        return

                    if written == -2:  # equivalent to EAGAIN
                        self.from_tunnel.appendleft(to_send[total_sent:])
                        break
                    elif written == len(send_buffer):
                        break

                    total_sent += written
                    send_buffer = buffer(to_send, total_sent)

            if not self.from_tunnel.is_full():
                self.read_watcher.start()

        if events & libvirt.VIR_EVENT_HANDLE_READABLE:
            # logger.debug('Read from stream')
            # logger.debug('Event %s', self.stream_handling)
            while True:
                try:
                    incoming = self.stream.recv(self.BUFFER_LEN)
                except:
                    pass
                if incoming == -2:
                    # equivalent to EAGAIN
                    break
                elif not incoming:
                    # EOF
                    self.close_console()
                    return

                self.from_stream.append(incoming)
                if self.from_stream.is_full():
                    # update libvirt event mask
                    self.stream_handling ^= libvirt.VIR_STREAM_EVENT_READABLE
                    self.stream.eventUpdateCallback(self.stream_handling)
                    break

            if not self.from_stream.is_empty():
                self.write_watcher.start()

    def read_from_tun_cb(self, watcher, revents):
        """Read data from tunnel and save into buffer."""
        # logger.debug('Read from tunnel')
        # logger.debug('Event %s', self.stream_handling)
        while True:
            try:
                incoming = self.sock.recv(self.BUFFER_LEN)
            except socket.error as exc:
                if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
                    break
                logger.exception('Error reading on socket for vm console')
                self.close_console()
                return

            if not incoming:
                # EOF (we could wait before closing console stream)
                self.close_console()
                return
            self.from_tunnel.append(incoming)
            if self.from_tunnel.is_full():
                self.read_watcher.stop()
                break

        if not self.from_tunnel.is_empty():
            # update libvirt event callback
            self.stream_handling |= libvirt.VIR_STREAM_EVENT_WRITABLE
            self.stream.eventUpdateCallback(self.stream_handling)

    def write_to_tun_cb(self, watcher, revents):
        """Write data from buffer to tunnel."""
        # logger.debug('Write to tunnel')
        # logger.debug('Event %s', self.stream_handling)
        while True:
            try:
                to_send = self.from_stream.popleft()
            except IndexError:
                self.write_watcher.stop()
                break
            send_buffer = to_send
            total_sent = 0
            while True:
                try:
                    written = self.sock.send(send_buffer)
                except socket.error as exc:
                    if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
                        self.from_stream.appenleft(to_send[total_sent:])
                        break
                    logger.exception('Error writing on socket for vm console')
                    self.close_console()
                    return

                if written == len(send_buffer):
                    break

                total_sent += written
                send_buffer = buffer(to_send, total_sent)

        if not self.from_stream.is_full():
            # update libvirt event callback
            self.stream_handling |= libvirt.VIR_STREAM_EVENT_READABLE
            self.stream.eventUpdateCallback(self.stream_handling)

    def close_console(self):
        logger.info('Closing console stream on VM %s', self.name)
        if self.stream is not None:
            try:
                self.stream.eventRemoveCallback()
            except Exception:
                logger.error('Error while removing callback on stream')
            try:
                self.stream.finish()
            except Exception:
                logger.error('Cannot finnish console stream')
            self.stream = None

        if self.sock is not None:
            try:
                self.sock.close()
            except socket.error:
                logger.error('Cannot close socket')
            self.sock = None

        if self.read_watcher is not None:
            self.read_watcher.stop()
            self.read_watcher = None
        if self.write_watcher is not None:
            self.write_watcher.stop()
            self.write_watcher = None

        self.from_tunnel = None
        self.from_stream = None