# This file is part of CloudControl. # # CloudControl is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # CloudControl is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with CloudControl. If not, see . import errno import logging import socket import weakref from StringIO import StringIO from xml.etree import cElementTree as et from collections import namedtuple import pyev import libvirt from cloudcontrol.common.client.tags import Tag, tag_inspector from cloudcontrol.node.hypervisor import lib as _libvirt from cloudcontrol.node.hypervisor.lib import DOMAIN_STATES as STATE from cloudcontrol.node.hypervisor.domains import vm_tags from cloudcontrol.node.utils import SocketBuffer from cloudcontrol.node.exc import ConsoleAlreadyOpened, ConsoleError logger = logging.getLogger(__name__) NetworkInterface = namedtuple('NetworkInterface', ('source', 'mac', 'model')) class VirtualMachine(object): """Represent a VM instance.""" #: buffer size for console connection handling BUFFER_LEN = 1024 def __init__(self, dom, hypervisor): """ :param dom: libvirt domain instance :param hypervisor: hypervisor where the VM is """ self.hypervisor = weakref.proxy(hypervisor) #: UUID string of domain self.uuid = dom.UUIDString() self.name = dom.name() #: state of VM: started, stoped, paused self._state = STATE[dom.info()[0]] #: tags for this VM # FIXME use a tag db instance self.tags = dict((t.name, t) for t in tag_inspector(vm_tags, self)) #: Driver cache behavior for each VM storage, see #: http://libvirt.org/formatdomain.html#elementsDisks self.cache_behaviour = dict() # define dynamic tags i = 0 for v in self.iter_disks(): for t in ( Tag('disk%s_size' % i, v.capacity, 10), Tag('disk%s_path' % i, v.path, 10), Tag('disk%s_pool' % i, v.storage, 10), # FIXME: change Tag('disk%s_vol' % i, v.name, 10), Tag('disk%s_cache' %i, lambda: self.cache_behaviour.get(v.path), 10) ): self.tags[t.name] = t i += 1 i = 0 for nic in self.iter_nics(): for t in ( Tag('nic%s_mac' % i, nic.mac, 10), Tag('nic%s_source' % i, nic.source, 10), Tag('nic%s_model' %i, nic.model, 10), ): self.tags[t.name] = t logger.debug('Virtual Machine tags: %s', self.tags) #: keep record of CPU stats (libev timestamp, cpu time) self.cpu_stats = (hypervisor.handler.main.evloop.now(), dom.info()[4]) # attributes related to console handling self.stream = None self.sock = None # socketpair endpoint self.read_watcher = None # ev watcher for sock self.write_watcher = None # ev watcher for sock self.from_tunnel = None # buffer self.from_stream = None # buffer self.stream_handling = 0 # libvirt stream event mask @property def state(self): return self._state @state.setter def state(self, value): self._state = value self.tags['status'].update_value() self.tags['vncport'].update_value() @property def lv_dom(self): """Libvirt domain instance.""" return self.hypervisor.vir_con.lookupByUUIDString(self.uuid) def start(self): self.lv_dom.create() def stop(self): self.lv_dom.shutdown() def suspend(self): self.lv_dom.suspend() def resume(self): self.lv_dom.resume() def destroy(self): self.lv_dom.destroy() def undefine(self): self.lv_dom.undefine() @property def disks(self): return list(self.iter_disks()) def iter_disks(self): for d in et.ElementTree().parse( StringIO(self.lv_dom.XMLDesc(0)) ).findall('devices/disk'): if d.get('device') != 'disk': continue type_ = d.get('type') if type_ not in ('file', 'block'): continue path = d.find('source').get(dict(file='file', block='dev')[type_]) # update cache behaviour driver = d.find('driver') if driver is None: driver = {} self.cache_behaviour[path] = driver.get('cache', 'default') volume = self.hypervisor.storage.get_volume(path) if volume is None: continue yield volume @property def nics(self): return list(self.iter_nics()) def iter_nics(self): for nic in et.ElementTree().parse( StringIO(self.lv_dom.XMLDesc(0)) ).findall('devices/interface'): if nic.get('type') == 'bridge': try: mac = nic.find('mac').get('address') except AttributeError: mac = None try: model = nic.find('model').get('type') except AttributeError: model = None try: source = nic.find('source').get('bridge') except AttributeError: source = None yield NetworkInterface( mac=mac, source=source, model=model, ) def open_console(self): if self.stream is not None: raise ConsoleAlreadyOpened('Console for this VM is already' ' opened') if self.hypervisor.handler.tag_db['libvirtver'].value.startswith('8'): raise ConsoleError( 'Cannot open console, not compatible with this version of libvirt') logger.info('Opening console stream on VM %s', self.name) try: self.stream = self.hypervisor.vir_con.newStream( libvirt.VIR_STREAM_NONBLOCK) except libvirt.libvirtError: logger.error('Cannot create new stream for console %s', self.name) self.close_console() raise self.stream_handling = libvirt.VIR_STREAM_EVENT_READABLE | ( libvirt.VIR_STREAM_EVENT_ERROR | libvirt.VIR_STREAM_EVENT_HANGUP) try: self.lv_dom.openConsole(None, self.stream, 0) self.stream.eventAddCallback(self.stream_handling, self.virt_console_stream_cb, None) except libvirt.libvirtError: logger.error('Cannot open console on domain %s', self.name) self.close_console() raise try: self.sock, tunnel_endpoint = socket.socketpair() except socket.error: logger.error('Cannot create socket pair for console on domain %s', self.name) self.close_console() raise try: self.sock.setblocking(0) except socket.error: logger.error('Cannot set socket to non blocking for console on' ' domain %s', self.name) self.close_console() raise self.read_watcher = self.hypervisor.handler.main.evloop.io( self.sock, pyev.EV_READ, self.read_from_tun_cb) self.read_watcher.start() self.write_watcher = self.hypervisor.handler.main.evloop.io( self.sock, pyev.EV_WRITE, self.write_to_tun_cb) # self.write_watcher.start() self.from_tunnel = SocketBuffer(4096) self.from_stream = SocketBuffer(4096) return tunnel_endpoint def virt_console_stream_cb(self, stream, events, opaque): """Handles read/write from/to libvirt stream.""" if events & libvirt.VIR_EVENT_HANDLE_ERROR or ( events & libvirt.VIR_EVENT_HANDLE_HANGUP): # error/hangup # logger.debug('Received error on stream') self.close_console() return if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: # logger.debug('Write to stream') # logger.debug('Event %s', self.stream_handling) while True: try: to_send = self.from_tunnel.popleft() except IndexError: # update libvirt event mask self.stream_handling ^= libvirt.VIR_STREAM_EVENT_WRITABLE self.stream.eventUpdateCallback(self.stream_handling) break send_buffer = to_send total_sent = 0 while True: try: written = self.stream.send(send_buffer) except: # libvirt send error logger.exception('Error while writing to stream') self.close_console() return if written == -2: # equivalent to EAGAIN self.from_tunnel.appendleft(to_send[total_sent:]) break elif written == len(send_buffer): break total_sent += written send_buffer = buffer(to_send, total_sent) if not self.from_tunnel.is_full(): self.read_watcher.start() if events & libvirt.VIR_EVENT_HANDLE_READABLE: # logger.debug('Read from stream') # logger.debug('Event %s', self.stream_handling) while True: try: incoming = self.stream.recv(self.BUFFER_LEN) except: pass if incoming == -2: # equivalent to EAGAIN break elif not incoming: # EOF self.close_console() return self.from_stream.append(incoming) if self.from_stream.is_full(): # update libvirt event mask self.stream_handling ^= libvirt.VIR_STREAM_EVENT_READABLE self.stream.eventUpdateCallback(self.stream_handling) break if not self.from_stream.is_empty(): self.write_watcher.start() def read_from_tun_cb(self, watcher, revents): """Read data from tunnel and save into buffer.""" # logger.debug('Read from tunnel') # logger.debug('Event %s', self.stream_handling) while True: try: incoming = self.sock.recv(self.BUFFER_LEN) except socket.error as exc: if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK): break logger.exception('Error reading on socket for vm console') self.close_console() return if not incoming: # EOF (we could wait before closing console stream) self.close_console() return self.from_tunnel.append(incoming) if self.from_tunnel.is_full(): self.read_watcher.stop() break if not self.from_tunnel.is_empty(): # update libvirt event callback self.stream_handling |= libvirt.VIR_STREAM_EVENT_WRITABLE self.stream.eventUpdateCallback(self.stream_handling) def write_to_tun_cb(self, watcher, revents): """Write data from buffer to tunnel.""" # logger.debug('Write to tunnel') # logger.debug('Event %s', self.stream_handling) while True: try: to_send = self.from_stream.popleft() except IndexError: self.write_watcher.stop() break send_buffer = to_send total_sent = 0 while True: try: written = self.sock.send(send_buffer) except socket.error as exc: if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK): self.from_stream.appenleft(to_send[total_sent:]) break logger.exception('Error writing on socket for vm console') self.close_console() return if written == len(send_buffer): break total_sent += written send_buffer = buffer(to_send, total_sent) if not self.from_stream.is_full(): # update libvirt event callback self.stream_handling |= libvirt.VIR_STREAM_EVENT_READABLE self.stream.eventUpdateCallback(self.stream_handling) def close_console(self): logger.info('Closing console stream on VM %s', self.name) if self.stream is not None: try: self.stream.eventRemoveCallback() except Exception: logger.error('Error while removing callback on stream') try: self.stream.finish() except Exception: logger.error('Cannot finnish console stream') self.stream = None if self.sock is not None: try: self.sock.close() except socket.error: logger.error('Cannot close socket') self.sock = None if self.read_watcher is not None: self.read_watcher.stop() self.read_watcher = None if self.write_watcher is not None: self.write_watcher.stop() self.write_watcher = None self.from_tunnel = None self.from_stream = None