# This file is part of CloudControl. # # CloudControl is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # CloudControl is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with CloudControl. If not, see . import re import errno import logging import socket import weakref from StringIO import StringIO from xml.etree import cElementTree as et from collections import namedtuple from itertools import izip, count import pyev import libvirt from cloudcontrol.common.client.tags import Tag, tag_inspector, TagDB from cloudcontrol.node.hypervisor.lib import DOMAIN_STATES as STATE from cloudcontrol.node.hypervisor.domains import vm_tags from cloudcontrol.node.utils import SocketBuffer from cloudcontrol.node.exc import ConsoleAlreadyOpened, ConsoleError logger = logging.getLogger(__name__) REGEX_TAG_NAME = '[a-zA-Z0-9_-]+' REGEX_TAG_VALUE = '.+' REGEX_TAG_IN_DESCRIPTION = '^@(' + REGEX_TAG_NAME + ')[ ]*?=[ ]*?(' + REGEX_TAG_VALUE + ')$' REGEX_TAG_REPLACE = '^@%s[ ]*?=[ ]*?(' + REGEX_TAG_VALUE + ')$' NetworkInterface = namedtuple('NetworkInterface', ('source', 'mac', 'model')) class VirtualMachine(object): """Represent a VM instance.""" #: buffer size for console connection handling BUFFER_LEN = 1024 def __init__(self, dom, hypervisor): """ :param dom: libvirt domain instance :param hypervisor: hypervisor where the VM is """ self.hypervisor = weakref.proxy(hypervisor) #: UUID string of domain self.uuid = dom.UUIDString() self.name = dom.name() # Get title of VM try: self.title = dom.metadata(libvirt.VIR_DOMAIN_METADATA_TITLE, None) except (libvirt.libvirtError, AttributeError): # libvirtError handle the case where the title is not defined on the # vm, AttributeError handle the case where the libvirt is too old # to allow metadata handling self.title = None #: state of VM: started, stoped, paused self._state = STATE[dom.info()[0]] #: tags for this VM self.tag_db = TagDB(tags=tag_inspector(vm_tags, self)) #: Driver cache behavior for each VM storage, see #: http://libvirt.org/formatdomain.html#elementsDisks self.cache_behaviour = dict() # define dynamic tags for i, v in izip(count(), self.iter_disks()): for t in ( Tag('disk%s_size' % i, v.capacity, 10), Tag('disk%s_path' % i, v.path, 10), Tag('disk%s_pool' % i, v.storage, 10), # FIXME: change Tag('disk%s_vol' % i, v.name, 10), Tag('disk%s_cache' % i, lambda: self.cache_behaviour.get(v.path), 10) ): self.tag_db.add_tag(t) for i, nic in izip(count(), self.iter_nics()): for t in ( Tag('nic%s_mac' % i, nic.mac), Tag('nic%s_source' % i, nic.source), Tag('nic%s_model' % i, nic.model), ): self.tag_db.add_tag(t) #: keep record of CPU stats (libev timestamp, cpu time) self.cpu_stats = (hypervisor.handler.main.evloop.now(), dom.info()[4]) # attributes related to console handling self.stream = None self.sock = None # socketpair endpoint self.read_watcher = None # ev watcher for sock self.write_watcher = None # ev watcher for sock self.from_tunnel = None # buffer self.from_stream = None # buffer self.stream_handling = 0 # libvirt stream event mask self.redefine_on_stop = False # for XML update (see KVM class) self._description_tags_db = TagDB(parent_db=self.tag_db) self.sync_description_tags() @property def state(self): return self._state @state.setter def state(self, value): self._state = value self.tag_db['__main__']['status'].update_value() self.tag_db['__main__']['vncport'].update_value() self.sync_description_tags() @property def description(self): xml = self.lv_dom.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE) descriptions = et.ElementTree().parse(StringIO(xml)).findall('description') if descriptions: return descriptions[0].text else: return '' @description.setter def description(self, value): try: xml = self.lv_dom.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE) except libvirt.libvirtError: logger.exception('Error while getting domain XML from libvirt, %s', self.name) raise xml_tree = et.ElementTree() root = xml_tree.parse(StringIO(xml)) desc = xml_tree.find('description') if desc is None: desc = et.SubElement(root, 'description') desc.text = value # write back the XML tree out = StringIO() xml_tree.write(out) try: self.hypervisor.vir_con.defineXML(out.getvalue()) except libvirt.libvirtError: logger.exception('Cannot update XML file for domain %s', self.name) raise @property def tags(self): return dict(re.findall(REGEX_TAG_IN_DESCRIPTION, self.description, re.MULTILINE)) def set_tag(self, tag, value): if not re.match(REGEX_TAG_NAME + '$', tag): raise RuntimeError('Bad tag name') elif not re.match(REGEX_TAG_VALUE + '$', value): raise RuntimeError('Bad tag value') tags = self.tags if tag in tags: self.description = re.sub(REGEX_TAG_REPLACE % re.escape(tag), '@%s=%s' % (tag, value), self.description, flags=re.MULTILINE) else: self.description += '\n@%s=%s' % (tag, value) def delete_tag(self, tag): tags = self.tags if tag in tags: self.description = re.sub(REGEX_TAG_REPLACE % tag, '', self.description, flags=re.MULTILINE) @property def lv_dom(self): """Libvirt domain instance.""" return self.hypervisor.vir_con.lookupByUUIDString(self.uuid) def start(self, pause=False): flags = 0 if pause: flags |= libvirt.VIR_DOMAIN_START_PAUSED self.lv_dom.createWithFlags(flags) def sync_description_tags(self): tags = dict(re.findall(REGEX_TAG_IN_DESCRIPTION, self.description, re.MULTILINE)) # Add/update static tags: for k, v in tags.iteritems(): tag = self._description_tags_db['__main__'].get(k) if tag is None: self._description_tags_db.add_tag(Tag(k, v, -1)) else: tag.value = v # Purge not more defined static tags: for k in self._description_tags_db['__main__']: if k not in tags: self._description_tags_db.remove_tag(k) def stop(self): self.lv_dom.shutdown() def suspend(self): self.lv_dom.suspend() def resume(self): self.lv_dom.resume() def destroy(self): self.lv_dom.destroy() def undefine(self): self.lv_dom.undefine() @property def disks(self): return list(self.iter_disks()) def iter_disks(self): for d in et.ElementTree().parse( StringIO(self.lv_dom.XMLDesc(0)) ).findall('devices/disk'): if d.get('device') != 'disk': continue type_ = d.get('type') if type_ in ('file', 'block'): path = d.find('source').get(dict(file='file', block='dev')[type_]) volume = self.hypervisor.storage.get_volume(path) if volume is None: continue elif type_ == 'volume': pool = d.find('source').get('pool') vol = d.find('source').get('volume') volume = self.hypervisor.storage.get_volume_by_pool(pool, vol) else: continue # Ignore unknown volumes: if volume is None: logger.warn('Unknown volume for vm %s', self.name) continue path = volume.path # update cache behaviour driver = d.find('driver') if driver is None: driver = {} self.cache_behaviour[path] = driver.get('cache', 'default') yield volume @property def nics(self): return list(self.iter_nics()) def iter_nics(self): for nic in et.ElementTree().parse( StringIO(self.lv_dom.XMLDesc(0)) ).findall('devices/interface'): if nic.get('type') == 'bridge': try: mac = nic.find('mac').get('address') except AttributeError: mac = None try: model = nic.find('model').get('type') except AttributeError: model = None try: source = nic.find('source').get('bridge') except AttributeError: source = None yield NetworkInterface( mac=mac, source=source, model=model, ) def open_console(self): if self.stream is not None: raise ConsoleAlreadyOpened('Console for this VM is already' ' opened') if str( self.hypervisor.handler.tag_db['__main__']['libvirtver'].value, ).startswith('8'): raise ConsoleError( 'Cannot open console, not compatible with this version of libvirt') logger.info('Opening console stream on VM %s', self.name) try: self.stream = self.hypervisor.vir_con.newStream( libvirt.VIR_STREAM_NONBLOCK) except libvirt.libvirtError: logger.error('Cannot create new stream for console %s', self.name) self.close_console() raise self.stream_handling = libvirt.VIR_STREAM_EVENT_READABLE | ( libvirt.VIR_STREAM_EVENT_ERROR | libvirt.VIR_STREAM_EVENT_HANGUP) try: self.lv_dom.openConsole(None, self.stream, 0) self.stream.eventAddCallback(self.stream_handling, self.virt_console_stream_cb, None) except libvirt.libvirtError: logger.error('Cannot open console on domain %s', self.name) self.close_console() raise try: self.sock, tunnel_endpoint = socket.socketpair() except socket.error: logger.error('Cannot create socket pair for console on domain %s', self.name) self.close_console() raise try: self.sock.setblocking(0) except socket.error: logger.error('Cannot set socket to non blocking for console on' ' domain %s', self.name) self.close_console() raise self.read_watcher = self.hypervisor.handler.main.evloop.io( self.sock, pyev.EV_READ, self.read_from_tun_cb) self.read_watcher.start() self.write_watcher = self.hypervisor.handler.main.evloop.io( self.sock, pyev.EV_WRITE, self.write_to_tun_cb) # self.write_watcher.start() self.from_tunnel = SocketBuffer(4096) self.from_stream = SocketBuffer(4096) return tunnel_endpoint def virt_console_stream_cb(self, stream, events, opaque): """Handles read/write from/to libvirt stream.""" if events & libvirt.VIR_EVENT_HANDLE_ERROR or ( events & libvirt.VIR_EVENT_HANDLE_HANGUP): # error/hangup # logger.debug('Received error on stream') self.close_console() return if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: # logger.debug('Write to stream') # logger.debug('Event %s', self.stream_handling) while True: try: to_send = self.from_tunnel.popleft() except IndexError: # update libvirt event mask self.stream_handling ^= libvirt.VIR_STREAM_EVENT_WRITABLE self.stream.eventUpdateCallback(self.stream_handling) break send_buffer = to_send total_sent = 0 while True: try: written = self.stream.send(send_buffer) except: # libvirt send error logger.exception('Error while writing to stream') self.close_console() return if written == -2: # equivalent to EAGAIN self.from_tunnel.appendleft(to_send[total_sent:]) break elif written == len(send_buffer): break total_sent += written send_buffer = buffer(to_send, total_sent) if not self.from_tunnel.is_full(): self.read_watcher.start() if events & libvirt.VIR_EVENT_HANDLE_READABLE: # logger.debug('Read from stream') # logger.debug('Event %s', self.stream_handling) while True: try: incoming = self.stream.recv(self.BUFFER_LEN) except: pass if incoming == -2: # equivalent to EAGAIN break elif not incoming: # EOF self.close_console() return self.from_stream.append(incoming) if self.from_stream.is_full(): # update libvirt event mask self.stream_handling ^= libvirt.VIR_STREAM_EVENT_READABLE self.stream.eventUpdateCallback(self.stream_handling) break if not self.from_stream.is_empty(): self.write_watcher.start() def read_from_tun_cb(self, watcher, revents): """Read data from tunnel and save into buffer.""" # logger.debug('Read from tunnel') # logger.debug('Event %s', self.stream_handling) while True: try: incoming = self.sock.recv(self.BUFFER_LEN) except socket.error as exc: if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK): break logger.exception('Error reading on socket for vm console') self.close_console() return if not incoming: # EOF (we could wait before closing console stream) self.close_console() return self.from_tunnel.append(incoming) if self.from_tunnel.is_full(): self.read_watcher.stop() break if not self.from_tunnel.is_empty(): # update libvirt event callback self.stream_handling |= libvirt.VIR_STREAM_EVENT_WRITABLE self.stream.eventUpdateCallback(self.stream_handling) def write_to_tun_cb(self, watcher, revents): """Write data from buffer to tunnel.""" # logger.debug('Write to tunnel') # logger.debug('Event %s', self.stream_handling) while True: try: to_send = self.from_stream.popleft() except IndexError: self.write_watcher.stop() break send_buffer = to_send total_sent = 0 while True: try: written = self.sock.send(send_buffer) except socket.error as exc: if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK): self.from_stream.appenleft(to_send[total_sent:]) break logger.exception('Error writing on socket for vm console') self.close_console() return if written == len(send_buffer): break total_sent += written send_buffer = buffer(to_send, total_sent) if not self.from_stream.is_full(): # update libvirt event callback self.stream_handling |= libvirt.VIR_STREAM_EVENT_READABLE self.stream.eventUpdateCallback(self.stream_handling) def close_console(self): logger.info('Closing console stream on VM %s', self.name) if self.stream is not None: try: self.stream.eventRemoveCallback() except Exception: logger.error('Error while removing callback on stream') try: self.stream.finish() except Exception: logger.error('Cannot finnish console stream') self.stream = None if self.sock is not None: try: self.sock.close() except socket.error: logger.error('Cannot close socket') self.sock = None if self.read_watcher is not None: self.read_watcher.stop() self.read_watcher = None if self.write_watcher is not None: self.write_watcher.stop() self.write_watcher = None self.from_tunnel = None self.from_stream = None