Newer
Older
#TODO: vm informations gathering !!!
import libvirt
import sys
# we use psutils to get host informations we can't get with
# libvirt
import psutil
from interface import *
from exceptions import *
from time import sleep
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
###
# Defined constants
###
##
# Libvirt
##
#
KVM_LIBVIRT_SESSION = 'qemu:///system'
XEN_LIBVIRT_SESSION = 'xen:///'
#########
## States
#########
# Virtual Machines
VM_STATUS = (
'No state',
'Running',
'Blocked on resource',
'Paused',
'Shutting down ...',
'Shutdown',
'Crashed'
)
VM_START_STATES = {
'running': 1,
'paused' : 0
}
# Storage Pools
POOL_STATE = (
'Not Running',
'Initializing pool, not available',
'Running normally',
'Running degraded',
'Running, but not accessible'
)
#########################
## Pool informations tags
#########################
POOL_NAME = 'pool'
POOL_STATUS = 'pstatus'
POOL_TOTAL_SIZE = 'psize'
POOL_FREE_SPACE = 'pfree_space'
POOL_USED_SPACE = 'pused_space'
POOL_NUM_VOLUMES = 'num_vols'
POOL_BACKEND = 'backend'
DEFAULT_VM_START = VM_START_STATES['running']
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# Pretty size outputs
##
MEGABYTE_DIV = 1024 * 1024
GIGABYTE_DIV = 1024 * 1024 * 1024
KILOBYTE_DIV = 1024
class LibvirtVm(VM):
'''
Libvirt api access to Virtual Machine actions
All libvirt.virDomain calls must stay in this class
:param domain: a libvirt domain instance
:type domain: :class:`libvirt.virDomain`
:param hypervisor: a hypervisor reference object
:type hypervisor: :class:`LibvirtHypervisor`
'''
_print_header = True
def __init__(self, domain, hypervisor):
'''
'''
super(LibvirtVm, self).__init__()
if isinstance(domain, libvirt.virDomain):
self._domain = domain
self.vm_info = {}
self._find_pid()
else:
raise TypeError('Need virDomain object given %s' % type(domain))
self.set_hypervisor(hypervisor)
def __cmp__(self, other):
'''
We overload the compare method so that two virtual machines are
identical if they are bound to the same domain object
'''
return cmp(self._domain, other._domain)
def _find_pid(self):
'''
Finds the PID of the current vm
'''
result = find_process_id(self._domain.UUIDString())
if result:
self._pid = int(result.pop())
else:
self._pid = None
def get_pid(self):
return self._pid
def set_hypervisor(self, hypervisor):
if isinstance(hypervisor, LibvirtHypervisor):
self._hypervisor = hypervisor
else:
raise TypeError('argument type error')
def get_name(self):
return self._domain.name()
def get_hv_name(self):
return self._hypervisor.get_name()
def get_used_mem(self):
return self._domain.info()[2] / KILOBYTE_DIV
def get_total_mem(self):
return self._domain.info()[1] / KILOBYTE_DIV
def get_free_mem(self):
return (self.get_total_mem() - self.get_used_mem()) / KILOBYTE_DIV
def get_vcpu(self):
return self._domain.info()[3]
def get_cpu_percent(self):
self._find_pid()
if self._pid:
p = psutil.Process(self._pid)
sleep(0.7)
res = int(p.get_cpu_percent())
res = 100 if res > 100 else res
return res
else:
return 0
def get_disks(self):
result = []
# we parse xml description of the vm to get disk list
xml_string = self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE)
root = xml.dom.minidom.parseString(xml_string)
domain = root.getElementsByTagName('domain').pop()
devices = domain.getElementsByTagName('devices').pop()
for disk in devices.getElementsByTagName('disk'):
if disk.getAttribute('device') == 'disk':
d = {}
# get type
dtype = disk.getAttribute('type')
# get disk path
if dtype == 'file':
d['path'] = disk.getElementsByTagName('source').pop().getAttribute('file')
d['path'] = disk.getElementsByTagName('source').pop().getAttribute('dev')
# attributes only resolvable in the storage pool
if dtype in ['file', 'block']:
sto = self._hypervisor._storage
for pool_name, pool in sto.get_pools().iteritems():
for vol_name in sto.get_volume_names(pool):
if sto.get_volume_path(pool, vol_name) == d['path']:
d['pool'] = pool_name
d['vol'] = vol_name
d['size'] = sto.get_volume_capacity(pool, vol_name)
result.append(d)
return result
def get_status(self):
return VM_STATUS[self._domain.info()[0]]
def shutdown(self):
try:
self._domain.shutdown()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
def force_poweroff(self):
try:
self._domain.destroy()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
def start(self):
try:
self._domain.create()
except libvirt.libvirtError:
raise VMError('%s is already running !' % self.get_name())
try:
self._domain.suspend()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
try:
self._domain.resume()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
def get_uuid(self):
'''
Returns the uuid string of the vm
'''
return self._domain.UUIDString()
def __str__(self):
'''
LibvirtVm object string representation
'''
header = '%-10s | %-10s | %-10s | %-15s | %-10s\n\n' %\
('Name', 'UsedMem', 'VCpu', 'CpuUsage', 'Status')
self._fetch_memory_stats()
self.get_cpu_stats()
stats = '%-10s | %-10s | %-10s | %-15s | %-10s' %\
(self.get_name(), self.used_memory, self.vcpu, self.cpu_use,
self.get_status())
return header + stats
class LibvirtHypervisor(Hypervisor):
'''
Libvirt api access to Hypervisor actions.
All libvirt calls to the hypervisor stay in this class
:param hv_type: This is the hypervisor type, this parameter is
important as it specifies the hypervisor type used when
connecting to libvirt.
:type hv_type: :class:`str`
.. note::
hv_type can be any of kvm, xen, ... , other hypervisors, however only
one connection at a time is allowed
'''
def __init__(self, hv_type):
#FIXME: don't use hard coded hypervisor type in URI
try:
if hv_type == 'kvm':
self._con_handle = libvirt.open(KVM_LIBVIRT_SESSION)
else:
raise NotImplemented('or unknown hypervisor type')
except libvirt.libvirtError as error:
raise HypervisorError(error, 'libvirt cannot connect to hypervisor')
#build storage objs
self._storage = LibvirtHVStorage(self)
#build vm objects
self._build_vm_list()
self.hv_info = {}
self.hv_type = hv_type
self.st_handle = LibvirtHVStorage(self)
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
def get_name(self):
'''
Returns hypervisor's name
'''
return self._con_handle.getHostname()
def get_hv_type(self):
return self.hv_type
def get_hv_version(self):
return self._con_handle.getVersion()
def _build_vm_list(self):
'''
Builds the lists of all defined vms (running and offline) in the
current hypervisor
'''
self._runing_vm_list = []
self._offline_vm_list = []
self._dom_ids = self._con_handle.listDomainsID()
self._defined_doms = self._con_handle.listDefinedDomains()
for doms in self._dom_ids:
vm = self._con_handle.lookupByID(doms)
self._runing_vm_list.append(LibvirtVm(vm, self))
for defined_dom in self._defined_doms:
vm = self._con_handle.lookupByName(defined_dom)
self._offline_vm_list.append(LibvirtVm(vm, self))
self._vm_list = []
self._vm_list = self._runing_vm_list
self._vm_list.extend(self._offline_vm_list)
def get_vms(self):
'''
Returns a list of :class:`LibvirtVm` objects defining all
current defined vms in the hypervisor
'''
return self._vm_list
def get_arch_type(self):
'''
Get archetcture type of hypervisor
'''
return self._con_handle.getInfo()[0]
def get_nb_cpu(self):
'''
Returns number of active cpus in hypervisor
'''
return self._con_handle.getInfo()[2]
def get_cpu_frequency(self):
'''
Returns the frequency in MHZ of cpus in the hypervisor
'''
return self._con_handle.getInfo()[3]
def get_nb_threads(self):
'''
Number of threads per core
'''
return self._con_handle.getInfo()[7]
def get_mem(self):
return (psutil.avail_phymem() + psutil.used_phymem())
def get_mem_free(self):
return psutil.avail_phymem()
def get_mem_used(self):
return psutil.used_phymem()
def get_cpu_percent(self):
return '%2.0f' % psutil.cpu_percent()
def get_disks(self):
result = {}
try:
re_pattern = re.compile(r'([sh]d[a-z]+)')
found = [bd for bd in os.listdir('/sys/block/') if re_pattern.match(bd)]
for disk in found:
fullname = os.path.join('/sys/block', disk, 'size')
size = int(open(fullname).read())
if size > 0:
result[disk] = size * 512
except:
pass
return result
def get_storage_pools(self):
return self._storage.get_pools().keys()
def get_storage_capacity(self, pool=None):
if pool is None:
# for all pool available
for pool in self._storage.get_pools().iteritems():
capacity = capacity + self._storage.get_pool_space_total(pool[1])
else:
# get the capacity of a specific pool
capacity = self._storage.get_pool_space_total(
self._storage.get_pools()[pool])
def get_storage_used(self, pool=None):
used = 0
if pool is None:
# for all pool available
for pool in self._storage.get_pools().iteritems():
used = used + self._storage.get_pool_space_used(pool[1])
else:
# get the used space of a specific pool
used = self._storage.get_pool_space_used(
self._storage.get_pools()[pool])
def get_storage_free(self, pool=None):
free = 0
if pool is None:
# for all pool available
for pool in self._storage.get_pools().iteritems():
free = free + self._storage.get_pool_space_free(pool[1])
else:
# get the free space of a specific pool
free = self._storage.get_pool_space_available(
self._storage.get_pools()[pool])
return free
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
def get_storage_type(self, pool_name):
result = None
try:
pool = self._storage.get_pools()[pool_name]
xroot = xml.dom.minidom.parseString(pool.XMLDesc(0))
xpool = xroot.getElementsByTagName('pool').pop()
result = xpool.getAttribute('type')
except:
pass
return result
def get_storage_path(self, pool_name):
result = None
try:
pool = self._storage.get_pools()[pool_name]
xroot = xml.dom.minidom.parseString(pool.XMLDesc(0))
xpool = xroot.getElementsByTagName('pool').pop()
xtarget = xpool.getElementsByTagName('target').pop()
xpath = xtarget.getElementsByTagName('path').pop()
result = xpath.firstChild.nodeValue
except:
pass
return result
def get_storage_volumes(self, pool_name):
result = None
try:
pool = self._storage.get_pools()[pool_name]
result = " ".join(self._storage.get_volume_names(pool))
except:
pass
def get_status(self):
raise NotImplementedError()
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
def start_vm(self, name, start_options=DEFAULT_VM_START):
'''
Starts the vm identified by name
:param name: The name of the virtual machine
:type nane: :class:`str`
:param start_options: Options flags while starting vm
:type start_options: TODO reference to constants
'''
for vm in self._vm_list:
if vm.get_name() == name:
vm.start()
return
raise VMError('Virtual Machine %s not found: '% name)
def stop_vm(self, name, force=False):
'''
Poweroff the specifed vm with the specified options
:param name: the name of the vm
:type name: :class:`str`
'''
for vm in self._vm_list:
if vm.get_name() == name:
vm.force_poweroff() if force else vm.shutdown()
return
raise VMError('Virtual Machine %s not found: ' % name)
def suspend_vm(self, name):
'''
Suspends the specifed vm
:param name: the name of the vm
:type name: :class:`str`
'''
for vm in self._vm_list:
if vm.get_name() == name:
vm.suspend()
return
raise VMError('Virtual machine %s not found: ' % name)
def resume_vm(self, name):
'''
Resumes the specifed vm
:param name: the name of the vm
:type name: :class:`str`
'''
for vm in self._vm_list:
if vm.get_name() == name:
vm.resume()
return
raise VMError('Virtual machine %s not found: ' % name)
def local_execute(self, command):
'''
Excecutes the command given in command on the local host
Returns a tuple with (stdout, stderr) from the process that has
been executed
..warning:: This is a dangerous function as it gives the command given
in paramter to a shell prompt, anything can then be executed locally
..warning:: If the command given is a long processing one, this may be
a deadlock to the node be carefull !
:param command: the command to execute with it's arguments
:type command: :class:`str`
'''
#FIXME: stop using shell=true and parse arguments with shlex.split()
p = subprocess.Popen(command, shell=True,
bufsize=-1,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result = p.communicate()
return result
def export_vm(self, name, target, port):
'''
Migrates the given vm (name) to (target) hypervisor which is already
waiting for the migration
:param name: the name of the vm
:type name: :class:`str`
:param target: the destination hypervisor
:type target: :class:`str`
:param port: the port on the destination to use for the transfer
:type port: :class:`str`
'''
# We will be using subprocess to pipe the volume using dd in a netcat
# connection to the destination
# We send a hash of the volume before so that the target can checksum
# and validate integrity.
# This algorithm is just a proof of concept of the cold migration
# process it is likely that it does not anticipates some problems
buff = 1024 * 1024 * 10 #10 Mo
path = [vm.get_disk_path() for vm in self._vm_list
if vm.get_name() == name].pop()
dd_cmd = ['dd', 'if=%s' % path]
dd_process = subprocess.Popen(dd_cmd, bufsize=-1,
stdout=subprocess.PIPE)
nc_process = subprocess.Popen(['nc', target, port],
bufsize=-1,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
#We send dd output to nc
dd_out = dd_process.stdout
nc_in = nc_process.stdin
m = hashlib.md5()
print 'going to pipe loop'
try:
read = dd_out.read(buff)
except Exception as err:
raise IOError('error while reading dd process output %s' % err)
try:
while len(read) != 0:
print len(read)
m.update(read)
nc_in.write(read)
nc_in.flush()
read = dd_out.read(buff)
except Exception as err:
raise IOError('Error occured while writing to nc process %s' % err)
nc_in.close()
print m.hexdigest()
def get_network_conf(self):
raise NotImplementedError
def get_storage_stats(self):
raise NotImplementedError
#TODO: finish storage class, make tests,
class LibvirtHVStorage(HVStorage):
'''
Base storage class using libvirt api for storage managment
Storage pools here are libvirt.virStoragePool instances
'''
def __init__(self, hypervisor):
if isinstance(hypervisor, LibvirtHypervisor):
self.hv_handle = hypervisor
self._fetch_st_pools()
else:
raise TypeError('Expected %s given %s' % (LibvirtHypervisor,
hypervisor))
def _fetch_st_pools(self):
'''
Sets an attribute self._pool_objs containging a list of libvirt
pool objects bound to the hypervisor
'''
pools = []
pools.extend(self.hv_handle._con_handle.listDefinedStoragePools())
pools.extend(self.hv_handle._con_handle.listStoragePools())
self._pools = dict([(p, pool_obj(self.hv_handle._con_handle, p)) \
for p in pools])
def get_pools(self):
Returns a dict of storage pools bound to the host
pools = {}
for pool_name, pool in self._pools.iteritems():
if pool.isActive():
pools[pool_name] = pool
return pools
def get_volume_names(self, pool=None):
Returns volume names stored in this pool or all pools
try:
if pool is None:
for pool in self._pools.iteritems():
volumes.extend(pool[1].listVolumes())
else:
volumes = pool.listVolumes()
except libvirt.libvirtError as e:
raise StorageError("Failed to get volume list (%s)" % e)
def add_volume(self, pool, name, space):
'''
Adds a volume to the specified pool
:param pool: the pool in which to create the volume
:type pool: :class:`str`
:param name: name of the new volume
:type name: :class:`str`
:param space: size of the new volume in gigabytes
:type space: :class:`int`
'''
xml_desc = """
<volume>
<name>%(vol_name)s</name>
<capacity>%(vol_size)u</capacity>
</volume>
""" % {
"vol_name" : name,
"vol_size" : space * GIGABYTE_DIV,
}
try:
self._pools[pool].createXML(xml_desc, 0);
except libvirt.libvirtError as e:
raise StorageError("Failed to create the volume (%s)" % e)
def del_volume(self, pool, name, wipe=False):
'''
Deletes a volume in the specified pool
:param pool: the pool in which delete the volume
:type pool: :class:`str`
:param name: the name of the volume
:type name: :class:`str`
'''
try:
vol = pool.storageVolLookupByName(name)
except libvirt.libvirtError as e:
raise StorageError("Volume not found (%s)" % e)
if wipe:
try:
vol.wipe(0)
except libvirt.libvirtError as e:
raise StorageError("Failed to wipe volume, data integrity"
" is unknown (%s)" % e)
try:
vol.delete(0)
except libvirt.libvirtError as e:
raise StorageError("Failed to delete the volume, but it may be"
" wiped (%s)" % e)
def find_space(self, new_space):
'''
Tries to find a suitable chunk of space for volume allocation.
:param new_space: a space size in gigabytes
:type new_space: :class:`int`
:return: a :class:`tuple` of best location or :class:`False` if no
pool is suitable
Thibault VINCENT
committed
# FIXME, crappy overhead delta
if new_space * GIGABYTE_DIV <= pool[1].info()[3] - MEGABYTE_DIV:
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
def get_pool_name(self, pool):
'''
Returns the name of this pool
:param pool: the storage pool name
:type pool: libvirt.`virStoragePool`
:return: :class:`str` name of the pool
'''
try:
return pool.name()
except libvirt.libvirtError as e:
raise StorageError("Can't get pool name (%s)" % e)
def get_pool_state(self, pool):
'''
Returns the running state of the pool
:param pool: the storage pool name
:type pool: libvirt.`virStoragePool`
'''
try:
return POOL_STATE[pool.info()[0]]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool state (%s)" % e)
Returns the storage capacity of this pool in bytes
:param pool: the pool to get information from
:type pool: :class:`virStoragePool`
:return: :class:`int` of capacity in bytes
try:
return pool.info()[1]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
Returns available space of this storage pool in bytes
:param pool: the pool to get information from
:type pool: :class:`virStoragePool`
:return: :class:`int` of available free space in bytes
try:
return pool.info()[2]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
Returns current storage pool usage in bytes
:param pool: the pool to get information from
:type pool: :class:`virStoragePool`
:return: :class:`int` of used space in bytes
try:
return pool.info()[3]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
def get_volume_path(self, pool, vol_name):
'''
Returns the file path to the volume
:param pool: the storage pool containing this volume
:type pool: libvirt.`virStoragePool`
:param volume_name: name of the pool volume
:type volume_name: :class:`str`
:return: :class:`str` path to the volume file
'''
try:
vol = pool.storageVolLookupByName(vol_name)
except libvirt.libvirtError as e:
raise StorageError("Can't find volume in pool (%s)" % e)
try:
return vol.path()
except libvirt.libvirtError as e:
raise StorageError("Volume has no path information (%s)" % e)
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
def get_volume_allocation(self, pool, vol_name):
'''
Returns the pool space used by this volume in bytes
:param pool: the pool containing this volume from
:type pool: :class:`virStoragePool`
:param vol_name: name of the volume to query
:type vol_name: :class:`str`
:return: :class:`int` of allocation in bytes
'''
try:
vol = pool.storageVolLookupByName(vol_name)
except libvirt.libvirtError as e:
raise StorageError("Volume not found (%s)" % e)
try:
return vol.info()[2]
except libvirt.libvirtError as e:
raise StorageError("Volume has no allocation information (%s)" % e)
def get_volume_capacity(self, pool, vol_name):
'''
Returns the capacity (usable space) of this volume in bytes
:param pool: the pool containing this volume from
:type pool: :class:`virStoragePool`
:param vol_name: name of the volume to query
:type vol_name: :class:`str`
:return: :class:`int` of capacity in bytes
'''
try:
vol = pool.storageVolLookupByName(vol_name)
except libvirt.libvirtError as e:
raise StorageError("Volume not found (%s)" % e)
try:
return vol.info()[1]
except libvirt.libvirtError as e:
raise StorageError("Volume has no capacity information (%s)" % e)
#### Helper functions
def map_process(process):
'''
map each process object with it's pid and command line options
'''
return (process.pid, process.cmdline)
def find_process_id(uuid):
'''
Find the process id of a kvm process gevin an UUID of a virDomain object
Returns a list of one process id
'''
return [p.pid for p in psutil.get_process_list() if
p.cmdline.__contains__(uuid)]
def pool_obj(con_handle, pool_name):
'''
Creates a libvirt pool object given the name of the pool
:param con_handle: libvirt connection handle
:type con_handle: :class:`libvirt.virConnect`
'''
return con_handle.storagePoolLookupByName(pool_name)