#TODO: vm informations gathering !!! import libvirt import sys # we use psutils to get host informations we can't get with # libvirt import psutil import multiprocessing import os import re from interface import * from exceptions import * from time import sleep import xml.dom.minidom ### # Defined constants ### ## # Libvirt ## # KVM_LIBVIRT_SESSION = 'qemu:///system' XEN_LIBVIRT_SESSION = 'xen:///' ######### ## States ######### # Hypervisor HV_ARCH_MAP = { 'i686' : 'x86', 'x86_64' : 'x64', } # Virtual Machines VM_LIBVIRT_STATUS = ( 'No state', 'Running', 'Blocked on resource', 'Paused', 'Shutting down ...', 'Shutdown', 'Crashed' ) VM_STATUS = { 0 : 'stopped', 1 : 'running', 2 : 'running', 3 : 'paused', 4 : 'running', 5 : 'shutdown', 6 : 'shutdown', } VM_START_STATES = { 'running': 1, 'paused' : 0 } # Storage Pools POOL_STATE = ( 'Not Running', 'Initializing pool, not available', 'Running normally', 'Running degraded', 'Running, but not accessible' ) ######################### ## Pool informations tags ######################### POOL_NAME = 'pool' POOL_STATUS = 'pstatus' POOL_TOTAL_SIZE = 'psize' POOL_FREE_SPACE = 'pfree_space' POOL_USED_SPACE = 'pused_space' POOL_NUM_VOLUMES = 'num_vols' POOL_BACKEND = 'backend' DEFAULT_VM_START = VM_START_STATES['running'] ## # Pretty size outputs ## MEGABYTE_DIV = 1024 * 1024 GIGABYTE_DIV = 1024 * 1024 * 1024 KILOBYTE_DIV = 1024 class LibvirtVm(VM): ''' Libvirt api access to Virtual Machine actions All libvirt.virDomain calls must stay in this class :param domain: a libvirt domain instance :type domain: :class:`libvirt.virDomain` :param hypervisor: a hypervisor reference object :type hypervisor: :class:`LibvirtHypervisor` ''' _print_header = True def __init__(self, domain, hypervisor): ''' ''' super(LibvirtVm, self).__init__() if isinstance(domain, libvirt.virDomain): self._domain = domain self.vm_info = {} self._find_pid() else: raise TypeError('Need virDomain object given %s' % type(domain)) self.set_hypervisor(hypervisor) def __cmp__(self, other): ''' We overload the compare method so that two virtual machines are identical if they are bound to the same domain object ''' return cmp(self._domain, other._domain) def _find_pid(self): ''' Finds the PID of the current vm ''' result = find_process_id(self._domain.UUIDString()) if result: self._pid = int(result.pop()) else: self._pid = None def get_pid(self): return self._pid def set_hypervisor(self, hypervisor): if isinstance(hypervisor, LibvirtHypervisor): self._hypervisor = hypervisor else: raise TypeError('argument type error') def get_name(self): return self._domain.name() def get_hv_name(self): return self._hypervisor.get_name() def get_mem_used(self): return self._domain.info()[2] * KILOBYTE_DIV def get_mem(self): return self._domain.info()[1] * KILOBYTE_DIV def get_mem_free(self): return (self.get_total_mem() - self.get_used_mem()) * KILOBYTE_DIV def get_cpu(self): return self._domain.info()[3] def get_cpu_percent(self): self._find_pid() if self._pid: p = psutil.Process(self._pid) sleep(0.7) res = int(p.get_cpu_percent()) res = 100 if res > 100 else res return res else: return 0 def get_disks(self): result = [] # we parse xml description of the vm to get disk list xml_string = self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE) root = xml.dom.minidom.parseString(xml_string) domain = root.getElementsByTagName('domain').pop() devices = domain.getElementsByTagName('devices').pop() # iter on "disk" devices for disk in devices.getElementsByTagName('disk'): # skip weird disks if disk.getAttribute('device') == 'disk': d = {} # get type dtype = disk.getAttribute('type') # get disk path if dtype == 'file': d['path'] = disk.getElementsByTagName('source').pop().getAttribute('file') elif dtype == 'block': d['path'] = disk.getElementsByTagName('source').pop().getAttribute('dev') # attributes only resolvable in the storage pool if dtype in ['file', 'block']: sto = self._hypervisor._storage for pool_name, pool in sto.get_pools().iteritems(): for vol_name in sto.get_volume_names(pool): if sto.get_volume_path(pool, vol_name) == d['path']: d['pool'] = pool_name d['vol'] = vol_name d['size'] = sto.get_volume_capacity(pool, vol_name) result.append(d) return result def get_nics(self): result = [] xml_string = self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE) xroot = xml.dom.minidom.parseString(xml_string) xdomain = xroot.getElementsByTagName('domain').pop() xdevices = xdomain.getElementsByTagName('devices').pop() # iter on "interface" devices for iface in xdevices.getElementsByTagName('interface'): d = {} # get type try: d['type'] = iface.getAttribute('type') except: pass # get hardware address try: xmac = iface.getElementsByTagName('mac').pop() d['hwaddr'] = xmac.firstChild.nodeValue except: pass # get target interface try: xtarget = iface.getElementsByTagName('target').pop() d['target'] = xtarget.getAttribute('dev') except: pass result.append(d) return result def get_arch(self): result = None try: xml_string = self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE) xroot = xml.dom.minidom.parseString(xml_string) xdomain = xroot.getElementsByTagName('domain').pop() xos = xdomain.getElementsByTagName('os').pop() xtype = xos.getElementsByTagName('type').pop() arch = xtype.getAttribute('arch') if arch in HV_ARCH_MAP: result = HV_ARCH_MAP[arch] except: pass return result def get_status(self): return VM_STATUS[self._domain.info()[0]] def shutdown(self): try: self._domain.shutdown() except libvirt.libvirtError: raise VMError('%s is not running !' % self.get_name()) def force_poweroff(self): try: self._domain.destroy() except libvirt.libvirtError: raise VMError('%s is not running !' % self.get_name()) def start(self): try: self._domain.create() except libvirt.libvirtError: raise VMError('%s is already running !' % self.get_name()) def suspend(self): try: self._domain.suspend() except libvirt.libvirtError: raise VMError('%s is not running !' % self.get_name()) def resume(self): try: self._domain.resume() except libvirt.libvirtError: raise VMError('%s is not running !' % self.get_name()) def get_uuid(self): ''' Returns the uuid string of the vm ''' return self._domain.UUIDString() def __str__(self): ''' LibvirtVm object string representation ''' header = '%-10s | %-10s | %-10s | %-15s | %-10s\n\n' %\ ('Name', 'UsedMem', 'VCpu', 'CpuUsage', 'Status') self._fetch_memory_stats() self.get_cpu_stats() stats = '%-10s | %-10s | %-10s | %-15s | %-10s' %\ (self.get_name(), self.used_memory, self.vcpu, self.cpu_use, self.get_status()) return header + stats class LibvirtHypervisor(Hypervisor): ''' Libvirt api access to Hypervisor actions. All libvirt calls to the hypervisor stay in this class :param hv_type: This is the hypervisor type, this parameter is important as it specifies the hypervisor type used when connecting to libvirt. :type hv_type: :class:`str` .. note:: hv_type can be any of kvm, xen, ... , other hypervisors, however only one connection at a time is allowed ''' def __init__(self, hv_type): #FIXME: don't use hard coded hypervisor type in URI try: if hv_type == 'kvm': self._con_handle = libvirt.open(KVM_LIBVIRT_SESSION) else: raise NotImplemented('or unknown hypervisor type') except libvirt.libvirtError as error: raise HypervisorError(error, 'libvirt cannot connect to hypervisor') #build storage objs self._storage = LibvirtHVStorage(self) #build vm objects self._build_vm_list() self.hv_info = {} self.hv_type = hv_type self.st_handle = LibvirtHVStorage(self) def get_name(self): ''' Returns hypervisor's name ''' return self._con_handle.getHostname() def get_hv_type(self): return self.hv_type def get_hv_version(self): return str(self._con_handle.getVersion()) def get_uname(self): return ' '.join(os.uname()) def get_uptime(self): result = None try: f = open("/proc/uptime") data = f.read().split() f.close() result = str(int(float(data[0]))) except: pass return result def get_loadavg(self): result = None try: result = ' '.join('%.2f' % load for load in os.getloadavg()) except: pass return result def _build_vm_list(self): ''' Builds the lists of all defined vms (running and offline) in the current hypervisor ''' self._vm_list_running = {} self._vm_list_offline = {} self._vm_list = {} self._doms_running_ids = self._con_handle.listDomainsID() self._doms_defined_names = self._con_handle.listDefinedDomains() for dom in self._doms_running_ids: vm = LibvirtVm(self._con_handle.lookupByID(dom), self) self._vm_list_running[vm.get_name()] = vm for dom in self._doms_defined_names: vm = LibvirtVm(self._con_handle.lookupByName(dom), self) self._vm_list_running[vm.get_name()] = vm self._vm_list = self._vm_list_running self._vm_list.update(self._vm_list_offline) def get_arch_type(self): ''' Get archetcture type of hypervisor ''' result = None arch = self._con_handle.getInfo()[0] if arch in HV_ARCH_MAP: result = HV_ARCH_MAP[arch] return result def get_nb_cpu(self): ''' Returns number of active cpus in hypervisor ''' return self._con_handle.getInfo()[2] def get_cpu_frequency(self): ''' Returns the frequency in MHZ of cpus in the hypervisor ''' return str(self._con_handle.getInfo()[3]) def get_nb_threads(self): ''' Number of threads per core ''' return self._con_handle.getInfo()[7] def get_mem(self): return str(psutil.avail_phymem() + psutil.used_phymem()) def get_mem_free(self): return str(psutil.avail_phymem()) def get_mem_used(self): return str(psutil.used_phymem()) def get_cpu_percent(self): return str(int(psutil.cpu_percent())) def get_cpu(self): return str(multiprocessing.cpu_count()) def get_disks(self): result = {} try: re_pattern = re.compile(r'([sh]d[a-z]+)') found = [bd for bd in os.listdir('/sys/block/') if re_pattern.match(bd)] for disk in found: fullname = os.path.join('/sys/block', disk, 'size') size = int(open(fullname).read()) if size > 0: result[disk] = size * 512 except: pass return result def get_storage_pools(self): return self._storage.get_pools().keys() def get_storage_capacity(self, pool=None): capacity = 0 if pool is None: # for all pool available for pool in self._storage.get_pools().iteritems(): capacity = capacity + self._storage.get_pool_space_total(pool[1]) else: # get the capacity of a specific pool capacity = self._storage.get_pool_space_total( self._storage.get_pools()[pool]) return str(capacity) def get_storage_used(self, pool=None): used = 0 if pool is None: # for all pool available for pool in self._storage.get_pools().iteritems(): used = used + self._storage.get_pool_space_used(pool[1]) else: # get the used space of a specific pool used = self._storage.get_pool_space_used( self._storage.get_pools()[pool]) return str(used) def get_storage_free(self, pool=None): free = 0 if pool is None: # for all pool available for pool in self._storage.get_pools().iteritems(): free = free + self._storage.get_pool_space_free(pool[1]) else: # get the free space of a specific pool free = self._storage.get_pool_space_available( self._storage.get_pools()[pool]) return str(free) def get_storage_type(self, pool_name): result = None try: pool = self._storage.get_pools()[pool_name] xroot = xml.dom.minidom.parseString(pool.XMLDesc(0)) xpool = xroot.getElementsByTagName('pool').pop() result = xpool.getAttribute('type') except: pass return result def get_storage_path(self, pool_name): result = None try: pool = self._storage.get_pools()[pool_name] xroot = xml.dom.minidom.parseString(pool.XMLDesc(0)) xpool = xroot.getElementsByTagName('pool').pop() xtarget = xpool.getElementsByTagName('target').pop() xpath = xtarget.getElementsByTagName('path').pop() result = xpath.firstChild.nodeValue except: pass return result def get_storage_volumes(self, pool_name): result = None try: pool = self._storage.get_pools()[pool_name] result = " ".join(self._storage.get_volume_names(pool)) except: pass return result def get_status(self): raise NotImplementedError() def get_vm_names(self): ''' Returns a list of :class:`LibvirtVm` objects defining all current defined vms in the hypervisor ''' return self._vm_list.keys() def get_vm(self, vm_name): return self._vm_list[vm_name] def get_vm_count(self): return len(self._vm_list) def start_vm(self, vm_name, start_options=DEFAULT_VM_START): ''' Starts the vm identified by name :param name: The name of the virtual machine :type nane: :class:`str` :param start_options: Options flags while starting vm :type start_options: TODO reference to constants ''' try: self._vm_list[vm_name].start() except KeyError: raise VMError('Virtual machine %s not found: ' % vm_name) def stop_vm(self, vm_name, force=False): ''' Poweroff the specifed vm with the specified options :param name: the name of the vm :type name: :class:`str` ''' try: vm = self._vm_list[vm_name] vm.force_poweroff() if force else vm.shutdown() except KeyError: raise VMError('Virtual machine %s not found: ' % vm_name) def suspend_vm(self, vm_name): ''' Suspends the specifed vm :param name: the name of the vm :type name: :class:`str` ''' try: self._vm_list[vm_name].suspend() except KeyError: raise VMError('Virtual machine %s not found: ' % vm_name) def resume_vm(self, vm_name): ''' Resumes the specifed vm :param name: the name of the vm :type name: :class:`str` ''' try: self._vm_list[vm_name].resume() except KeyError: raise VMError('Virtual machine %s not found: ' % vm_name) def local_execute(self, command): ''' Excecutes the command given in command on the local host Returns a tuple with (stdout, stderr) from the process that has been executed ..warning:: This is a dangerous function as it gives the command given in paramter to a shell prompt, anything can then be executed locally ..warning:: If the command given is a long processing one, this may be a deadlock to the node be carefull ! :param command: the command to execute with it's arguments :type command: :class:`str` ''' #FIXME: stop using shell=true and parse arguments with shlex.split() p = subprocess.Popen(command, shell=True, bufsize=-1, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) result = p.communicate() return result def export_vm(self, name, target, port): ''' Migrates the given vm (name) to (target) hypervisor which is already waiting for the migration :param name: the name of the vm :type name: :class:`str` :param target: the destination hypervisor :type target: :class:`str` :param port: the port on the destination to use for the transfer :type port: :class:`str` ''' # We will be using subprocess to pipe the volume using dd in a netcat # connection to the destination # We send a hash of the volume before so that the target can checksum # and validate integrity. # This algorithm is just a proof of concept of the cold migration # process it is likely that it does not anticipates some problems buff = 1024 * 1024 * 10 #10 Mo path = [vm.get_disk_path() for vm in self._vm_list if vm.get_name() == name].pop() dd_cmd = ['dd', 'if=%s' % path] dd_process = subprocess.Popen(dd_cmd, bufsize=-1, stdout=subprocess.PIPE) nc_process = subprocess.Popen(['nc', target, port], bufsize=-1, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) #We send dd output to nc dd_out = dd_process.stdout nc_in = nc_process.stdin m = hashlib.md5() print 'going to pipe loop' try: read = dd_out.read(buff) except Exception as err: raise IOError('error while reading dd process output %s' % err) try: while len(read) != 0: print len(read) m.update(read) nc_in.write(read) nc_in.flush() read = dd_out.read(buff) except Exception as err: raise IOError('Error occured while writing to nc process %s' % err) nc_in.close() print m.hexdigest() def get_network_conf(self): raise NotImplementedError def get_storage_stats(self): raise NotImplementedError #TODO: finish storage class, make tests, class LibvirtHVStorage(HVStorage): ''' Base storage class using libvirt api for storage managment Storage pools here are libvirt.virStoragePool instances ''' def __init__(self, hypervisor): if isinstance(hypervisor, LibvirtHypervisor): self.hv_handle = hypervisor self._fetch_st_pools() else: raise TypeError('Expected %s given %s' % (LibvirtHypervisor, hypervisor)) def _fetch_st_pools(self): ''' Sets an attribute self._pool_objs containging a list of libvirt pool objects bound to the hypervisor ''' pools = [] pools.extend(self.hv_handle._con_handle.listDefinedStoragePools()) pools.extend(self.hv_handle._con_handle.listStoragePools()) self._pools = dict([(p, pool_obj(self.hv_handle._con_handle, p)) \ for p in pools]) def get_pools(self): ''' Returns a dict of storage pools bound to the host ''' pools = {} for pool_name, pool in self._pools.iteritems(): if pool.isActive(): pools[pool_name] = pool return pools def get_volume_names(self, pool=None): ''' Returns volume names stored in this pool or all pools ''' volumes = [] try: if pool is None: for pool in self._pools.iteritems(): volumes.extend(pool[1].listVolumes()) else: volumes = pool.listVolumes() except libvirt.libvirtError as e: raise StorageError("Failed to get volume list (%s)" % e) return volumes def add_volume(self, pool, name, space): ''' Adds a volume to the specified pool :param pool: the pool in which to create the volume :type pool: :class:`str` :param name: name of the new volume :type name: :class:`str` :param space: size of the new volume in gigabytes :type space: :class:`int` ''' xml_desc = """ <volume> <name>%(vol_name)s</name> <capacity>%(vol_size)u</capacity> </volume> """ % { "vol_name" : name, "vol_size" : space * GIGABYTE_DIV, } try: self._pools[pool].createXML(xml_desc, 0); except libvirt.libvirtError as e: raise StorageError("Failed to create the volume (%s)" % e) def del_volume(self, pool, name, wipe=False): ''' Deletes a volume in the specified pool :param pool: the pool in which delete the volume :type pool: :class:`str` :param name: the name of the volume :type name: :class:`str` ''' try: vol = pool.storageVolLookupByName(name) except libvirt.libvirtError as e: raise StorageError("Volume not found (%s)" % e) if wipe: try: vol.wipe(0) except libvirt.libvirtError as e: raise StorageError("Failed to wipe volume, data integrity" " is unknown (%s)" % e) try: vol.delete(0) except libvirt.libvirtError as e: raise StorageError("Failed to delete the volume, but it may be" " wiped (%s)" % e) def find_space(self, new_space): ''' Tries to find a suitable chunk of space for volume allocation. :param new_space: a space size in gigabytes :type new_space: :class:`int` :return: a :class:`tuple` of best location or :class:`False` if no pool is suitable ''' for pool in self._pools.iteritems(): try: # FIXME, crappy overhead delta if new_space * GIGABYTE_DIV <= pool[1].info()[3] - MEGABYTE_DIV: return (pool) except libvirt.libvirtError as e: raise StorageError("Can't get pool informations (%s)" % e) return False def get_pool_name(self, pool): ''' Returns the name of this pool :param pool: the storage pool name :type pool: libvirt.`virStoragePool` :return: :class:`str` name of the pool ''' try: return pool.name() except libvirt.libvirtError as e: raise StorageError("Can't get pool name (%s)" % e) def get_pool_state(self, pool): ''' Returns the running state of the pool :param pool: the storage pool name :type pool: libvirt.`virStoragePool` ''' try: return POOL_STATE[pool.info()[0]] except libvirt.libvirtError as e: raise StorageError("Can't get pool state (%s)" % e) def get_pool_space_total(self, pool): ''' Returns the storage capacity of this pool in bytes :param pool: the pool to get information from :type pool: :class:`virStoragePool` :return: :class:`int` of capacity in bytes ''' try: return pool.info()[1] except libvirt.libvirtError as e: raise StorageError("Can't get pool informations (%s)" % e) def get_pool_space_available(self, pool): ''' Returns available space of this storage pool in bytes :param pool: the pool to get information from :type pool: :class:`virStoragePool` :return: :class:`int` of available free space in bytes ''' try: return pool.info()[2] except libvirt.libvirtError as e: raise StorageError("Can't get pool informations (%s)" % e) def get_pool_space_used(self, pool): ''' Returns current storage pool usage in bytes :param pool: the pool to get information from :type pool: :class:`virStoragePool` :return: :class:`int` of used space in bytes ''' try: return pool.info()[3] except libvirt.libvirtError as e: raise StorageError("Can't get pool informations (%s)" % e) def get_volume_path(self, pool, vol_name): ''' Returns the file path to the volume :param pool: the storage pool containing this volume :type pool: libvirt.`virStoragePool` :param volume_name: name of the pool volume :type volume_name: :class:`str` :return: :class:`str` path to the volume file ''' try: vol = pool.storageVolLookupByName(vol_name) except libvirt.libvirtError as e: raise StorageError("Can't find volume in pool (%s)" % e) try: return vol.path() except libvirt.libvirtError as e: raise StorageError("Volume has no path information (%s)" % e) def get_volume_allocation(self, pool, vol_name): ''' Returns the pool space used by this volume in bytes :param pool: the pool containing this volume from :type pool: :class:`virStoragePool` :param vol_name: name of the volume to query :type vol_name: :class:`str` :return: :class:`int` of allocation in bytes ''' try: vol = pool.storageVolLookupByName(vol_name) except libvirt.libvirtError as e: raise StorageError("Volume not found (%s)" % e) try: return vol.info()[2] except libvirt.libvirtError as e: raise StorageError("Volume has no allocation information (%s)" % e) def get_volume_capacity(self, pool, vol_name): ''' Returns the capacity (usable space) of this volume in bytes :param pool: the pool containing this volume from :type pool: :class:`virStoragePool` :param vol_name: name of the volume to query :type vol_name: :class:`str` :return: :class:`int` of capacity in bytes ''' try: vol = pool.storageVolLookupByName(vol_name) except libvirt.libvirtError as e: raise StorageError("Volume not found (%s)" % e) try: return vol.info()[1] except libvirt.libvirtError as e: raise StorageError("Volume has no capacity information (%s)" % e) #### Helper functions def map_process(process): ''' map each process object with it's pid and command line options ''' return (process.pid, process.cmdline) def find_process_id(uuid): ''' Find the process id of a kvm process gevin an UUID of a virDomain object Returns a list of one process id ''' return [p.pid for p in psutil.get_process_list() if p.cmdline.__contains__(uuid)] def pool_obj(con_handle, pool_name): ''' Creates a libvirt pool object given the name of the pool :param con_handle: libvirt connection handle :type con_handle: :class:`libvirt.virConnect` ''' return con_handle.storagePoolLookupByName(pool_name)