Newer
Older
#TODO: vm informations gathering !!!
import libvirt
import sys
# we use psutils to get host informations we can't get with
import multiprocessing
import os
import re
from interface import *
from exceptions import *
from time import sleep
###
# Defined constants
###
##
# Libvirt
##
#
KVM_LIBVIRT_SESSION = 'qemu:///system'
XEN_LIBVIRT_SESSION = 'xen:///'
#########
## States
#########
# Hypervisor
HV_ARCH_MAP = {
'i686' : 'x86',
'x86_64' : 'x64',
}
VM_LIBVIRT_STATUS = (
'No state',
'Running',
'Blocked on resource',
'Paused',
'Shutting down ...',
'Shutdown',
'Crashed'
)
VM_STATUS = {
0 : 'stopped',
1 : 'running',
2 : 'running',
3 : 'paused',
4 : 'running',
5 : 'shutdown',
6 : 'shutdown',
}
VM_START_STATES = {
'running': 1,
'paused' : 0
}
# Storage Pools
POOL_STATE = (
'Not Running',
'Initializing pool, not available',
'Running normally',
'Running degraded',
'Running, but not accessible'
)
#########################
## Pool informations tags
#########################
POOL_NAME = 'pool'
POOL_STATUS = 'pstatus'
POOL_TOTAL_SIZE = 'psize'
POOL_FREE_SPACE = 'pfree_space'
POOL_USED_SPACE = 'pused_space'
POOL_NUM_VOLUMES = 'num_vols'
POOL_BACKEND = 'backend'
DEFAULT_VM_START = VM_START_STATES['running']
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# Pretty size outputs
##
MEGABYTE_DIV = 1024 * 1024
GIGABYTE_DIV = 1024 * 1024 * 1024
KILOBYTE_DIV = 1024
class LibvirtVm(VM):
'''
Libvirt api access to Virtual Machine actions
All libvirt.virDomain calls must stay in this class
:param domain: a libvirt domain instance
:type domain: :class:`libvirt.virDomain`
:param hypervisor: a hypervisor reference object
:type hypervisor: :class:`LibvirtHypervisor`
'''
_print_header = True
def __init__(self, domain, hypervisor):
'''
'''
super(LibvirtVm, self).__init__()
if isinstance(domain, libvirt.virDomain):
self._domain = domain
self.vm_info = {}
self._find_pid()
else:
raise TypeError('Need virDomain object given %s' % type(domain))
self.set_hypervisor(hypervisor)
def __cmp__(self, other):
'''
We overload the compare method so that two virtual machines are
identical if they are bound to the same domain object
'''
return cmp(self._domain, other._domain)
def _find_pid(self):
'''
Finds the PID of the current vm
'''
result = find_process_id(self._domain.UUIDString())
if result:
self._pid = int(result.pop())
else:
self._pid = None
def get_pid(self):
return self._pid
def set_hypervisor(self, hypervisor):
if isinstance(hypervisor, LibvirtHypervisor):
self._hypervisor = hypervisor
else:
raise TypeError('argument type error')
def get_name(self):
return self._domain.name()
def get_hv_name(self):
return self._hypervisor.get_name()
def get_mem_used(self):
return self._domain.info()[2] * KILOBYTE_DIV
def get_mem(self):
return self._domain.info()[1] * KILOBYTE_DIV
def get_mem_free(self):
return (self.get_total_mem() - self.get_used_mem()) * KILOBYTE_DIV
def get_cpu(self):
def get_cpu_percent(self):
self._find_pid()
if self._pid:
p = psutil.Process(self._pid)
sleep(0.7)
res = int(p.get_cpu_percent())
res = 100 if res > 100 else res
return res
else:
return 0
def get_disks(self):
result = []
# we parse xml description of the vm to get disk list
xml_string = self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE)
root = xml.dom.minidom.parseString(xml_string)
domain = root.getElementsByTagName('domain').pop()
devices = domain.getElementsByTagName('devices').pop()
for disk in devices.getElementsByTagName('disk'):
if disk.getAttribute('device') == 'disk':
d = {}
# get type
dtype = disk.getAttribute('type')
# get disk path
if dtype == 'file':
d['path'] = disk.getElementsByTagName('source').pop().getAttribute('file')
d['path'] = disk.getElementsByTagName('source').pop().getAttribute('dev')
# attributes only resolvable in the storage pool
if dtype in ['file', 'block']:
sto = self._hypervisor._storage
for pool_name, pool in sto.get_pools().iteritems():
for vol_name in sto.get_volume_names(pool):
if sto.get_volume_path(pool, vol_name) == d['path']:
d['pool'] = pool_name
d['vol'] = vol_name
d['size'] = sto.get_volume_capacity(pool, vol_name)
result.append(d)
return result
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def get_nics(self):
result = []
xml_string = self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE)
xroot = xml.dom.minidom.parseString(xml_string)
xdomain = xroot.getElementsByTagName('domain').pop()
xdevices = xdomain.getElementsByTagName('devices').pop()
# iter on "interface" devices
for iface in xdevices.getElementsByTagName('interface'):
d = {}
# get type
try:
d['type'] = iface.getAttribute('type')
except:
pass
# get hardware address
try:
xmac = iface.getElementsByTagName('mac').pop()
d['hwaddr'] = xmac.firstChild.nodeValue
except:
pass
# get target interface
try:
xtarget = iface.getElementsByTagName('target').pop()
d['target'] = xtarget.getAttribute('dev')
except:
pass
result.append(d)
return result
def get_arch(self):
result = None
try:
xml_string = self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE)
xroot = xml.dom.minidom.parseString(xml_string)
xdomain = xroot.getElementsByTagName('domain').pop()
xos = xdomain.getElementsByTagName('os').pop()
xtype = xos.getElementsByTagName('type').pop()
arch = xtype.getAttribute('arch')
if arch in HV_ARCH_MAP:
result = HV_ARCH_MAP[arch]
except:
pass
def get_status(self):
return VM_STATUS[self._domain.info()[0]]
def shutdown(self):
try:
self._domain.shutdown()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
def force_poweroff(self):
try:
self._domain.destroy()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
def start(self):
try:
self._domain.create()
except libvirt.libvirtError:
raise VMError('%s is already running !' % self.get_name())
try:
self._domain.suspend()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
try:
self._domain.resume()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
def get_uuid(self):
'''
Returns the uuid string of the vm
'''
return self._domain.UUIDString()
def __str__(self):
'''
LibvirtVm object string representation
'''
header = '%-10s | %-10s | %-10s | %-15s | %-10s\n\n' %\
('Name', 'UsedMem', 'VCpu', 'CpuUsage', 'Status')
self._fetch_memory_stats()
self.get_cpu_stats()
stats = '%-10s | %-10s | %-10s | %-15s | %-10s' %\
(self.get_name(), self.used_memory, self.vcpu, self.cpu_use,
self.get_status())
return header + stats
class LibvirtHypervisor(Hypervisor):
'''
Libvirt api access to Hypervisor actions.
All libvirt calls to the hypervisor stay in this class
:param hv_type: This is the hypervisor type, this parameter is
important as it specifies the hypervisor type used when
connecting to libvirt.
:type hv_type: :class:`str`
.. note::
hv_type can be any of kvm, xen, ... , other hypervisors, however only
one connection at a time is allowed
'''
def __init__(self, hv_type):
#FIXME: don't use hard coded hypervisor type in URI
try:
if hv_type == 'kvm':
self._con_handle = libvirt.open(KVM_LIBVIRT_SESSION)
else:
raise NotImplemented('or unknown hypervisor type')
except libvirt.libvirtError as error:
raise HypervisorError(error, 'libvirt cannot connect to hypervisor')
#build storage objs
self._storage = LibvirtHVStorage(self)
#build vm objects
self._build_vm_list()
self.hv_info = {}
self.hv_type = hv_type
self.st_handle = LibvirtHVStorage(self)
def get_name(self):
'''
Returns hypervisor's name
'''
return self._con_handle.getHostname()
def get_hv_type(self):
return self.hv_type
def get_hv_version(self):
return str(self._con_handle.getVersion())
def get_uname(self):
return ' '.join(os.uname())
def get_uptime(self):
result = None
try:
f = open("/proc/uptime")
data = f.read().split()
f.close()
result = str(int(float(data[0])))
except:
pass
return result
def get_loadavg(self):
result = None
try:
result = ' '.join('%.2f' % load for load in os.getloadavg())
except:
pass
return result
def _build_vm_list(self):
'''
Builds the lists of all defined vms (running and offline) in the
current hypervisor
'''
self._vm_list_running = {}
self._vm_list_offline = {}
self._vm_list = {}
self._doms_running_ids = self._con_handle.listDomainsID()
self._doms_defined_names = self._con_handle.listDefinedDomains()
for dom in self._doms_running_ids:
vm = LibvirtVm(self._con_handle.lookupByID(dom), self)
self._vm_list_running[vm.get_name()] = vm
for dom in self._doms_defined_names:
vm = LibvirtVm(self._con_handle.lookupByName(dom), self)
self._vm_list_running[vm.get_name()] = vm
self._vm_list = self._vm_list_running
self._vm_list.update(self._vm_list_offline)
def get_arch_type(self):
'''
Get archetcture type of hypervisor
'''
result = None
arch = self._con_handle.getInfo()[0]
if arch in HV_ARCH_MAP:
result = HV_ARCH_MAP[arch]
return result
def get_nb_cpu(self):
'''
Returns number of active cpus in hypervisor
'''
return self._con_handle.getInfo()[2]
def get_cpu_frequency(self):
'''
Returns the frequency in MHZ of cpus in the hypervisor
'''
return str(self._con_handle.getInfo()[3])
def get_nb_threads(self):
'''
Number of threads per core
'''
return self._con_handle.getInfo()[7]
return str(psutil.avail_phymem() + psutil.used_phymem())
return str(psutil.avail_phymem())
return str(psutil.used_phymem())
return str(int(psutil.cpu_percent()))
def get_cpu(self):
return str(multiprocessing.cpu_count())
def get_disks(self):
result = {}
try:
re_pattern = re.compile(r'([sh]d[a-z]+)')
found = [bd for bd in os.listdir('/sys/block/') if re_pattern.match(bd)]
for disk in found:
fullname = os.path.join('/sys/block', disk, 'size')
size = int(open(fullname).read())
if size > 0:
result[disk] = size * 512
except:
pass
return result
def get_storage_pools(self):
return self._storage.get_pools().keys()
def get_storage_capacity(self, pool=None):
if pool is None:
# for all pool available
for pool in self._storage.get_pools().iteritems():
capacity = capacity + self._storage.get_pool_space_total(pool[1])
else:
# get the capacity of a specific pool
capacity = self._storage.get_pool_space_total(
self._storage.get_pools()[pool])
return str(capacity)
def get_storage_used(self, pool=None):
used = 0
if pool is None:
# for all pool available
for pool in self._storage.get_pools().iteritems():
used = used + self._storage.get_pool_space_used(pool[1])
else:
# get the used space of a specific pool
used = self._storage.get_pool_space_used(
self._storage.get_pools()[pool])
return str(used)
def get_storage_free(self, pool=None):
free = 0
if pool is None:
# for all pool available
for pool in self._storage.get_pools().iteritems():
free = free + self._storage.get_pool_space_free(pool[1])
else:
# get the free space of a specific pool
free = self._storage.get_pool_space_available(
self._storage.get_pools()[pool])
return str(free)
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
def get_storage_type(self, pool_name):
result = None
try:
pool = self._storage.get_pools()[pool_name]
xroot = xml.dom.minidom.parseString(pool.XMLDesc(0))
xpool = xroot.getElementsByTagName('pool').pop()
result = xpool.getAttribute('type')
except:
pass
return result
def get_storage_path(self, pool_name):
result = None
try:
pool = self._storage.get_pools()[pool_name]
xroot = xml.dom.minidom.parseString(pool.XMLDesc(0))
xpool = xroot.getElementsByTagName('pool').pop()
xtarget = xpool.getElementsByTagName('target').pop()
xpath = xtarget.getElementsByTagName('path').pop()
result = xpath.firstChild.nodeValue
except:
pass
return result
def get_storage_volumes(self, pool_name):
result = None
try:
pool = self._storage.get_pools()[pool_name]
result = " ".join(self._storage.get_volume_names(pool))
except:
pass
return result
def get_status(self):
raise NotImplementedError()
def get_vm_names(self):
'''
Returns a list of :class:`LibvirtVm` objects defining all
current defined vms in the hypervisor
'''
return self._vm_list.keys()
def get_vm(self, vm_name):
return self._vm_list[vm_name]
def get_vm_count(self):
return len(self._vm_list)
def start_vm(self, vm_name, start_options=DEFAULT_VM_START):
'''
Starts the vm identified by name
:param name: The name of the virtual machine
:type nane: :class:`str`
:param start_options: Options flags while starting vm
:type start_options: TODO reference to constants
'''
try:
self._vm_list[vm_name].start()
except KeyError:
raise VMError('Virtual machine %s not found: ' % vm_name)
def stop_vm(self, vm_name, force=False):
'''
Poweroff the specifed vm with the specified options
:param name: the name of the vm
:type name: :class:`str`
'''
try:
vm = self._vm_list[vm_name]
vm.force_poweroff() if force else vm.shutdown()
except KeyError:
raise VMError('Virtual machine %s not found: ' % vm_name)
def suspend_vm(self, vm_name):
'''
Suspends the specifed vm
:param name: the name of the vm
:type name: :class:`str`
'''
try:
self._vm_list[vm_name].suspend()
except KeyError:
raise VMError('Virtual machine %s not found: ' % vm_name)
def resume_vm(self, vm_name):
'''
Resumes the specifed vm
:param name: the name of the vm
:type name: :class:`str`
'''
try:
self._vm_list[vm_name].resume()
except KeyError:
raise VMError('Virtual machine %s not found: ' % vm_name)
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
def local_execute(self, command):
'''
Excecutes the command given in command on the local host
Returns a tuple with (stdout, stderr) from the process that has
been executed
..warning:: This is a dangerous function as it gives the command given
in paramter to a shell prompt, anything can then be executed locally
..warning:: If the command given is a long processing one, this may be
a deadlock to the node be carefull !
:param command: the command to execute with it's arguments
:type command: :class:`str`
'''
#FIXME: stop using shell=true and parse arguments with shlex.split()
p = subprocess.Popen(command, shell=True,
bufsize=-1,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result = p.communicate()
return result
def export_vm(self, name, target, port):
'''
Migrates the given vm (name) to (target) hypervisor which is already
waiting for the migration
:param name: the name of the vm
:type name: :class:`str`
:param target: the destination hypervisor
:type target: :class:`str`
:param port: the port on the destination to use for the transfer
:type port: :class:`str`
'''
# We will be using subprocess to pipe the volume using dd in a netcat
# connection to the destination
# We send a hash of the volume before so that the target can checksum
# and validate integrity.
# This algorithm is just a proof of concept of the cold migration
# process it is likely that it does not anticipates some problems
buff = 1024 * 1024 * 10 #10 Mo
path = [vm.get_disk_path() for vm in self._vm_list
if vm.get_name() == name].pop()
dd_cmd = ['dd', 'if=%s' % path]
dd_process = subprocess.Popen(dd_cmd, bufsize=-1,
stdout=subprocess.PIPE)
nc_process = subprocess.Popen(['nc', target, port],
bufsize=-1,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
#We send dd output to nc
dd_out = dd_process.stdout
nc_in = nc_process.stdin
m = hashlib.md5()
print 'going to pipe loop'
try:
read = dd_out.read(buff)
except Exception as err:
raise IOError('error while reading dd process output %s' % err)
try:
while len(read) != 0:
print len(read)
m.update(read)
nc_in.write(read)
nc_in.flush()
read = dd_out.read(buff)
except Exception as err:
raise IOError('Error occured while writing to nc process %s' % err)
nc_in.close()
print m.hexdigest()
def get_network_conf(self):
raise NotImplementedError
def get_storage_stats(self):
raise NotImplementedError
#TODO: finish storage class, make tests,
class LibvirtHVStorage(HVStorage):
'''
Base storage class using libvirt api for storage managment
Storage pools here are libvirt.virStoragePool instances
'''
def __init__(self, hypervisor):
if isinstance(hypervisor, LibvirtHypervisor):
self.hv_handle = hypervisor
self._fetch_st_pools()
else:
raise TypeError('Expected %s given %s' % (LibvirtHypervisor,
hypervisor))
def _fetch_st_pools(self):
'''
Sets an attribute self._pool_objs containging a list of libvirt
pool objects bound to the hypervisor
'''
pools = []
pools.extend(self.hv_handle._con_handle.listDefinedStoragePools())
pools.extend(self.hv_handle._con_handle.listStoragePools())
self._pools = dict([(p, pool_obj(self.hv_handle._con_handle, p)) \
for p in pools])
def get_pools(self):
Returns a dict of storage pools bound to the host
pools = {}
for pool_name, pool in self._pools.iteritems():
if pool.isActive():
pools[pool_name] = pool
return pools
def get_volume_names(self, pool=None):
Returns volume names stored in this pool or all pools
try:
if pool is None:
for pool in self._pools.iteritems():
volumes.extend(pool[1].listVolumes())
else:
volumes = pool.listVolumes()
except libvirt.libvirtError as e:
raise StorageError("Failed to get volume list (%s)" % e)
def add_volume(self, pool, name, space):
'''
Adds a volume to the specified pool
:param pool: the pool in which to create the volume
:type pool: :class:`str`
:param name: name of the new volume
:type name: :class:`str`
:param space: size of the new volume in gigabytes
:type space: :class:`int`
'''
xml_desc = """
<volume>
<name>%(vol_name)s</name>
<capacity>%(vol_size)u</capacity>
</volume>
""" % {
"vol_name" : name,
"vol_size" : space * GIGABYTE_DIV,
}
try:
self._pools[pool].createXML(xml_desc, 0);
except libvirt.libvirtError as e:
raise StorageError("Failed to create the volume (%s)" % e)
def del_volume(self, pool, name, wipe=False):
'''
Deletes a volume in the specified pool
:param pool: the pool in which delete the volume
:type pool: :class:`str`
:param name: the name of the volume
:type name: :class:`str`
'''
try:
vol = pool.storageVolLookupByName(name)
except libvirt.libvirtError as e:
raise StorageError("Volume not found (%s)" % e)
if wipe:
try:
vol.wipe(0)
except libvirt.libvirtError as e:
raise StorageError("Failed to wipe volume, data integrity"
" is unknown (%s)" % e)
try:
vol.delete(0)
except libvirt.libvirtError as e:
raise StorageError("Failed to delete the volume, but it may be"
" wiped (%s)" % e)
def find_space(self, new_space):
'''
Tries to find a suitable chunk of space for volume allocation.
:param new_space: a space size in gigabytes
:type new_space: :class:`int`
:return: a :class:`tuple` of best location or :class:`False` if no
pool is suitable
Thibault VINCENT
committed
# FIXME, crappy overhead delta
if new_space * GIGABYTE_DIV <= pool[1].info()[3] - MEGABYTE_DIV:
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
def get_pool_name(self, pool):
'''
Returns the name of this pool
:param pool: the storage pool name
:type pool: libvirt.`virStoragePool`
:return: :class:`str` name of the pool
'''
try:
return pool.name()
except libvirt.libvirtError as e:
raise StorageError("Can't get pool name (%s)" % e)
def get_pool_state(self, pool):
'''
Returns the running state of the pool
:param pool: the storage pool name
:type pool: libvirt.`virStoragePool`
'''
try:
return POOL_STATE[pool.info()[0]]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool state (%s)" % e)
Returns the storage capacity of this pool in bytes
:param pool: the pool to get information from
:type pool: :class:`virStoragePool`
:return: :class:`int` of capacity in bytes
try:
return pool.info()[1]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
Returns available space of this storage pool in bytes
:param pool: the pool to get information from
:type pool: :class:`virStoragePool`
:return: :class:`int` of available free space in bytes
try:
return pool.info()[2]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
Returns current storage pool usage in bytes
:param pool: the pool to get information from
:type pool: :class:`virStoragePool`
:return: :class:`int` of used space in bytes
try:
return pool.info()[3]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
def get_volume_path(self, pool, vol_name):
'''
Returns the file path to the volume
:param pool: the storage pool containing this volume
:type pool: libvirt.`virStoragePool`
:param volume_name: name of the pool volume
:type volume_name: :class:`str`
:return: :class:`str` path to the volume file
'''
try:
vol = pool.storageVolLookupByName(vol_name)
except libvirt.libvirtError as e:
raise StorageError("Can't find volume in pool (%s)" % e)
try:
return vol.path()
except libvirt.libvirtError as e:
raise StorageError("Volume has no path information (%s)" % e)
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
def get_volume_allocation(self, pool, vol_name):
'''
Returns the pool space used by this volume in bytes
:param pool: the pool containing this volume from
:type pool: :class:`virStoragePool`
:param vol_name: name of the volume to query
:type vol_name: :class:`str`
:return: :class:`int` of allocation in bytes
'''
try:
vol = pool.storageVolLookupByName(vol_name)
except libvirt.libvirtError as e:
raise StorageError("Volume not found (%s)" % e)
try:
return vol.info()[2]
except libvirt.libvirtError as e:
raise StorageError("Volume has no allocation information (%s)" % e)
def get_volume_capacity(self, pool, vol_name):
'''
Returns the capacity (usable space) of this volume in bytes
:param pool: the pool containing this volume from
:type pool: :class:`virStoragePool`
:param vol_name: name of the volume to query
:type vol_name: :class:`str`
:return: :class:`int` of capacity in bytes
'''
try:
vol = pool.storageVolLookupByName(vol_name)
except libvirt.libvirtError as e:
raise StorageError("Volume not found (%s)" % e)
try:
return vol.info()[1]
except libvirt.libvirtError as e:
raise StorageError("Volume has no capacity information (%s)" % e)
#### Helper functions
def map_process(process):
'''
map each process object with it's pid and command line options
'''
return (process.pid, process.cmdline)
def find_process_id(uuid):
'''
Find the process id of a kvm process gevin an UUID of a virDomain object
Returns a list of one process id
'''
return [p.pid for p in psutil.get_process_list() if
p.cmdline.__contains__(uuid)]
def pool_obj(con_handle, pool_name):
'''
Creates a libvirt pool object given the name of the pool
:param con_handle: libvirt connection handle
:type con_handle: :class:`libvirt.virConnect`
'''
return con_handle.storagePoolLookupByName(pool_name)