Newer
Older
#TODO: vm informations gathering !!!
import libvirt
import sys
# we use psutils to get host informations we can't get with
# libvirt
import psutil
from interface import *
from exceptions import *
from time import sleep
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
###
# Defined constants
###
##
# Libvirt
##
#
KVM_LIBVIRT_SESSION = 'qemu:///system'
XEN_LIBVIRT_SESSION = 'xen:///'
#########
## States
#########
# Virtual Machines
VM_STATUS = (
'No state',
'Running',
'Blocked on resource',
'Paused',
'Shutting down ...',
'Shutdown',
'Crashed'
)
VM_START_STATES = {
'running': 1,
'paused' : 0
}
# Storage Pools
POOL_STATE = (
'Not Running',
'Initializing pool, not available',
'Running normally',
'Running degraded',
'Running, but not accessible'
)
#########################
## Pool informations tags
#########################
POOL_NAME = 'pool'
POOL_STATUS = 'pstatus'
POOL_TOTAL_SIZE = 'psize'
POOL_FREE_SPACE = 'pfree_space'
POOL_USED_SPACE = 'pused_space'
POOL_NUM_VOLUMES = 'num_vols'
POOL_BACKEND = 'backend'
DEFAULT_VM_START = VM_START_STATES['running']
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# Pretty size outputs
##
MEGABYTE_DIV = 1024 * 1024
GIGABYTE_DIV = 1024 * 1024 * 1024
KILOBYTE_DIV = 1024
class LibvirtVm(VM):
'''
Libvirt api access to Virtual Machine actions
All libvirt.virDomain calls must stay in this class
:param domain: a libvirt domain instance
:type domain: :class:`libvirt.virDomain`
:param hypervisor: a hypervisor reference object
:type hypervisor: :class:`LibvirtHypervisor`
'''
_print_header = True
def __init__(self, domain, hypervisor):
'''
'''
super(LibvirtVm, self).__init__()
if isinstance(domain, libvirt.virDomain):
self._domain = domain
self.vm_info = {}
self._find_pid()
else:
raise TypeError('Need virDomain object given %s' % type(domain))
self.set_hypervisor(hypervisor)
def __cmp__(self, other):
'''
We overload the compare method so that two virtual machines are
identical if they are bound to the same domain object
'''
return cmp(self._domain, other._domain)
def _find_pid(self):
'''
Finds the PID of the current vm
'''
result = find_process_id(self._domain.UUIDString())
if result:
self._pid = int(result.pop())
else:
self._pid = None
def get_pid(self):
return self._pid
def set_hypervisor(self, hypervisor):
if isinstance(hypervisor, LibvirtHypervisor):
self._hypervisor = hypervisor
else:
raise TypeError('argument type error')
def get_name(self):
return self._domain.name()
def get_hv_name(self):
return self._hypervisor.get_name()
def get_used_mem(self):
return self._domain.info()[2] / KILOBYTE_DIV
def get_total_mem(self):
return self._domain.info()[1] / KILOBYTE_DIV
def get_free_mem(self):
return (self.get_total_mem() - self.get_used_mem()) / KILOBYTE_DIV
def get_vcpu(self):
return self._domain.info()[3]
def get_cpu_percent(self):
self._find_pid()
if self._pid:
p = psutil.Process(self._pid)
sleep(0.7)
res = int(p.get_cpu_percent())
res = 100 if res > 100 else res
return res
else:
return 0
def get_status(self):
return VM_STATUS[self._domain.info()[0]]
def shutdown(self):
try:
self._domain.shutdown()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
def force_poweroff(self):
try:
self._domain.destroy()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
def start(self):
try:
self._domain.create()
except libvirt.libvirtError:
raise VMError('%s is already running !' % self.get_name())
def suspend(self):
try:
self._domain.suspend()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
try:
self._domain.resume()
except libvirt.libvirtError:
raise VMError('%s is not running !' % self.get_name())
def get_disk_path(self):
'''
Returns the path to the disk atached to the vm
'''
# We parse xml description of the vm to get this info
xml_string = self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE)
dom = xml.dom.minidom.parseString(xml_string)
disk = dom.getElementsByTagName('disk').pop()
#FIXME Currently only handles file type backend, must add other backends
path = disk.getElementsByTagName('source').pop()
path = path.getAttribute('file')
return path
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def get_uuid(self):
'''
Returns the uuid string of the vm
'''
return self._domain.UUIDString()
def __str__(self):
'''
LibvirtVm object string representation
'''
header = '%-10s | %-10s | %-10s | %-15s | %-10s\n\n' %\
('Name', 'UsedMem', 'VCpu', 'CpuUsage', 'Status')
self._fetch_memory_stats()
self.get_cpu_stats()
stats = '%-10s | %-10s | %-10s | %-15s | %-10s' %\
(self.get_name(), self.used_memory, self.vcpu, self.cpu_use,
self.get_status())
return header + stats
class LibvirtHypervisor(Hypervisor):
'''
Libvirt api access to Hypervisor actions.
All libvirt calls to the hypervisor stay in this class
:param hv_type: This is the hypervisor type, this parameter is
important as it specifies the hypervisor type used when
connecting to libvirt.
:type hv_type: :class:`str`
.. note::
hv_type can be any of kvm, xen, ... , other hypervisors, however only
one connection at a time is allowed
'''
def __init__(self, hv_type):
#FIXME: don't use hard coded hypervisor type in URI
try:
if hv_type == 'kvm':
self._con_handle = libvirt.open(KVM_LIBVIRT_SESSION)
else:
raise NotImplemented('or unknown hypervisor type')
except libvirt.libvirtError as error:
raise HypervisorError(error, 'libvirt cannot connect to hypervisor')
#build storage objs
self._storage = LibvirtHVStorage(self)
#build vm objects
self._build_vm_list()
self.hv_info = {}
self.hv_type = hv_type
self.st_handle = LibvirtHVStorage(self)
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def get_name(self):
'''
Returns hypervisor's name
'''
return self._con_handle.getHostname()
def get_hv_type(self):
return self.hv_type
def get_hv_version(self):
return self._con_handle.getVersion()
def _build_vm_list(self):
'''
Builds the lists of all defined vms (running and offline) in the
current hypervisor
'''
self._runing_vm_list = []
self._offline_vm_list = []
self._dom_ids = self._con_handle.listDomainsID()
self._defined_doms = self._con_handle.listDefinedDomains()
for doms in self._dom_ids:
vm = self._con_handle.lookupByID(doms)
self._runing_vm_list.append(LibvirtVm(vm, self))
for defined_dom in self._defined_doms:
vm = self._con_handle.lookupByName(defined_dom)
self._offline_vm_list.append(LibvirtVm(vm, self))
self._vm_list = []
self._vm_list = self._runing_vm_list
self._vm_list.extend(self._offline_vm_list)
def get_vms(self):
'''
Returns a list of :class:`LibvirtVm` objects defining all
current defined vms in the hypervisor
'''
return self._vm_list
def get_arch_type(self):
'''
Get archetcture type of hypervisor
'''
return self._con_handle.getInfo()[0]
def get_nb_cpu(self):
'''
Returns number of active cpus in hypervisor
'''
return self._con_handle.getInfo()[2]
def get_cpu_frequency(self):
'''
Returns the frequency in MHZ of cpus in the hypervisor
'''
return self._con_handle.getInfo()[3]
def get_nb_threads(self):
'''
Number of threads per core
'''
return self._con_handle.getInfo()[7]
def get_free_mem(self):
return psutil.avail_phymem() / MEGABYTE_DIV
def get_used_mem(self):
return psutil.used_phymem() / MEGABYTE_DIV
def get_total_mem(self):
return ((psutil.avail_phymem() + psutil.used_phymem()) /
MEGABYTE_DIV)
def get_cpu_percent(self):
return '%2.0f' % psutil.cpu_percent()
capacity = 0
for pool in self._storage.get_pools().iteritems():
capacity = capacity + self._storage.get_pool_space_total(pool[1])
return capacity
free = 0
for pool in self._storage.get_pools().iteritems():
free = free + self._storage.get_pool_space_available(pool[1])
return free
used = 0
for pool in self._storage.get_pools().iteritems():
used = used + self._storage.get_pool_space_used(pool[1])
return used
def get_status(self):
raise NotImplementedError()
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def start_vm(self, name, start_options=DEFAULT_VM_START):
'''
Starts the vm identified by name
:param name: The name of the virtual machine
:type nane: :class:`str`
:param start_options: Options flags while starting vm
:type start_options: TODO reference to constants
'''
for vm in self._vm_list:
if vm.get_name() == name:
vm.start()
return
raise VMError('Virtual Machine %s not found: '% name)
def stop_vm(self, name, force=False):
'''
Poweroff the specifed vm with the specified options
:param name: the name of the vm
:type name: :class:`str`
'''
for vm in self._vm_list:
if vm.get_name() == name:
vm.force_poweroff() if force else vm.shutdown()
return
raise VMError('Virtual Machine %s not found: ' % name)
def suspend_vm(self, name):
'''
Suspends the specifed vm
:param name: the name of the vm
:type name: :class:`str`
'''
for vm in self._vm_list:
if vm.get_name() == name:
vm.suspend()
return
raise VMError('Virtual machine %s not found: ' % name)
def resume_vm(self, name):
'''
Resumes the specifed vm
:param name: the name of the vm
:type name: :class:`str`
'''
for vm in self._vm_list:
if vm.get_name() == name:
vm.resume()
return
raise VMError('Virtual machine %s not found: ' % name)
def local_execute(self, command):
'''
Excecutes the command given in command on the local host
Returns a tuple with (stdout, stderr) from the process that has
been executed
..warning:: This is a dangerous function as it gives the command given
in paramter to a shell prompt, anything can then be executed locally
..warning:: If the command given is a long processing one, this may be
a deadlock to the node be carefull !
:param command: the command to execute with it's arguments
:type command: :class:`str`
'''
#FIXME: stop using shell=true and parse arguments with shlex.split()
p = subprocess.Popen(command, shell=True,
bufsize=-1,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result = p.communicate()
return result
def export_vm(self, name, target, port):
'''
Migrates the given vm (name) to (target) hypervisor which is already
waiting for the migration
:param name: the name of the vm
:type name: :class:`str`
:param target: the destination hypervisor
:type target: :class:`str`
:param port: the port on the destination to use for the transfer
:type port: :class:`str`
'''
# We will be using subprocess to pipe the volume using dd in a netcat
# connection to the destination
# We send a hash of the volume before so that the target can checksum
# and validate integrity.
# This algorithm is just a proof of concept of the cold migration
# process it is likely that it does not anticipates some problems
buff = 1024 * 1024 * 10 #10 Mo
path = [vm.get_disk_path() for vm in self._vm_list
if vm.get_name() == name].pop()
dd_cmd = ['dd', 'if=%s' % path]
dd_process = subprocess.Popen(dd_cmd, bufsize=-1,
stdout=subprocess.PIPE)
nc_process = subprocess.Popen(['nc', target, port],
bufsize=-1,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
#We send dd output to nc
dd_out = dd_process.stdout
nc_in = nc_process.stdin
m = hashlib.md5()
print 'going to pipe loop'
try:
read = dd_out.read(buff)
except Exception as err:
raise IOError('error while reading dd process output %s' % err)
try:
while len(read) != 0:
print len(read)
m.update(read)
nc_in.write(read)
nc_in.flush()
read = dd_out.read(buff)
except Exception as err:
raise IOError('Error occured while writing to nc process %s' % err)
nc_in.close()
print m.hexdigest()
def get_network_conf(self):
raise NotImplementedError
def get_storage_conf(self):
raise NotImplementedError
def get_storage_stats(self):
raise NotImplementedError
#TODO: finish storage class, make tests,
class LibvirtHVStorage(HVStorage):
'''
Base storage class using libvirt api for storage managment
Storage pools here are libvirt.virStoragePool instances
'''
def __init__(self, hypervisor):
if isinstance(hypervisor, LibvirtHypervisor):
self.hv_handle = hypervisor
self._fetch_st_pools()
else:
raise TypeError('Expected %s given %s' % (LibvirtHypervisor,
hypervisor))
def _fetch_st_pools(self):
'''
Sets an attribute self._pool_objs containging a list of libvirt
pool objects bound to the hypervisor
'''
pools = []
pools.extend(self.hv_handle._con_handle.listDefinedStoragePools())
pools.extend(self.hv_handle._con_handle.listStoragePools())
self._pools = dict([(p, pool_obj(self.hv_handle._con_handle, p)) \
for p in pools])
def get_pools(self):
Returns a dict of storage pools bound to the host
def get_volume_names(self, pool=None):
Returns volume names stored in this pool or all pools
try:
if pool is None:
for pool in self._pools.iteritems():
volumes.extend(pool[1].listVolumes())
else:
volumes = pool.listVolumes()
except libvirt.libvirtError as e:
raise StorageError("Failed to get volume list (%s)" % e)
def add_volume(self, pool, name, space):
'''
Adds a volume to the specified pool
:param pool: the pool in which to create the volume
:type pool: :class:`str`
:param name: name of the new volume
:type name: :class:`str`
:param space: size of the new volume in gigabytes
:type space: :class:`int`
'''
xml_desc = """
<volume>
<name>%(vol_name)s</name>
<capacity>%(vol_size)u</capacity>
</volume>
""" % {
"vol_name" : name,
"vol_size" : space * GIGABYTE_DIV,
}
try:
self._pools[pool].createXML(xml_desc, 0);
except libvirt.libvirtError as e:
raise StorageError("Failed to create the volume (%s)" % e)
def del_volume(self, pool, name, wipe=False):
'''
Deletes a volume in the specified pool
:param pool: the pool in which delete the volume
:type pool: :class:`str`
:param name: the name of the volume
:type name: :class:`str`
'''
try:
vol = pool.storageVolLookupByName(name)
except libvirt.libvirtError as e:
raise StorageError("Volume not found (%s)" % e)
if wipe:
try:
vol.wipe(0)
except libvirt.libvirtError as e:
raise StorageError("Failed to wipe volume, data integrity"
" is unknown (%s)" % e)
try:
vol.delete(0)
except libvirt.libvirtError as e:
raise StorageError("Failed to delete the volume, but it may be"
" wiped (%s)" % e)
def find_space(self, new_space):
'''
Tries to find a suitable chunk of space for volume allocation.
:param new_space: a space size in gigabytes
:type new_space: :class:`int`
:return: a :class:`tuple` of best location or :class:`False` if no
pool is suitable
Thibault VINCENT
committed
# FIXME, crappy overhead delta
if new_space * GIGABYTE_DIV <= pool[1].info()[3] - MEGABYTE_DIV:
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
def get_pool_name(self, pool):
'''
Returns the name of this pool
:param pool: the storage pool name
:type pool: libvirt.`virStoragePool`
:return: :class:`str` name of the pool
'''
try:
return pool.name()
except libvirt.libvirtError as e:
raise StorageError("Can't get pool name (%s)" % e)
def get_pool_state(self, pool):
'''
Returns the running state of the pool
:param pool: the storage pool name
:type pool: libvirt.`virStoragePool`
'''
try:
return POOL_STATE[pool.info()[0]]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool state (%s)" % e)
Returns the storage capacity of this pool in bytes
:param pool: the pool to get information from
:type pool: :class:`virStoragePool`
:return: :class:`int` of capacity in bytes
try:
return pool.info()[1]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
Returns available space of this storage pool in bytes
:param pool: the pool to get information from
:type pool: :class:`virStoragePool`
:return: :class:`int` of available free space in bytes
try:
return pool.info()[2]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
Returns current storage pool usage in bytes
:param pool: the pool to get information from
:type pool: :class:`virStoragePool`
:return: :class:`int` of used space in bytes
try:
return pool.info()[3]
except libvirt.libvirtError as e:
raise StorageError("Can't get pool informations (%s)" % e)
def get_volume_path(self, pool, vol_name):
'''
Returns the file path to the volume
:param pool: the storage pool containing this volume
:type pool: libvirt.`virStoragePool`
:param volume_name: name of the pool volume
:type volume_name: :class:`str`
:return: :class:`str` path to the volume file
'''
try:
vol = pool.storageVolLookupByName(vol_name)
except libvirt.libvirtError as e:
raise StorageError("Can't find volume in pool (%s)" % e)
try:
return vol.path()
except libvirt.libvirtError as e:
raise StorageError("Volume has no path information (%s)" % e)
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
def get_volume_allocation(self, pool, vol_name):
'''
Returns the pool space used by this volume in bytes
:param pool: the pool containing this volume from
:type pool: :class:`virStoragePool`
:param vol_name: name of the volume to query
:type vol_name: :class:`str`
:return: :class:`int` of allocation in bytes
'''
try:
vol = pool.storageVolLookupByName(vol_name)
except libvirt.libvirtError as e:
raise StorageError("Volume not found (%s)" % e)
try:
return vol.info()[2]
except libvirt.libvirtError as e:
raise StorageError("Volume has no allocation information (%s)" % e)
def get_volume_capacity(self, pool, vol_name):
'''
Returns the capacity (usable space) of this volume in bytes
:param pool: the pool containing this volume from
:type pool: :class:`virStoragePool`
:param vol_name: name of the volume to query
:type vol_name: :class:`str`
:return: :class:`int` of capacity in bytes
'''
try:
vol = pool.storageVolLookupByName(vol_name)
except libvirt.libvirtError as e:
raise StorageError("Volume not found (%s)" % e)
try:
return vol.info()[1]
except libvirt.libvirtError as e:
raise StorageError("Volume has no capacity information (%s)" % e)
#### Helper functions
def map_process(process):
'''
map each process object with it's pid and command line options
'''
return (process.pid, process.cmdline)
def find_process_id(uuid):
'''
Find the process id of a kvm process gevin an UUID of a virDomain object
Returns a list of one process id
'''
return [p.pid for p in psutil.get_process_list() if
p.cmdline.__contains__(uuid)]
def pool_obj(con_handle, pool_name):
'''
Creates a libvirt pool object given the name of the pool
:param con_handle: libvirt connection handle
:type con_handle: :class:`libvirt.virConnect`
'''
return con_handle.storagePoolLookupByName(pool_name)