Newer
Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see <http://www.gnu.org/licenses/>.
"""KVM hypervisor support."""
import os
import sys
import signal
import logging
import weakref
import threading
import traceback
import libvirt
from cloudcontrol.common.client.utils import main_thread
from cloudcontrol.common.client.tags import ParentWrapper
from cloudcontrol.node.hypervisor.lib import (
DOMAIN_STATES, EVENTS,
EventLoop as VirEventLoop,
StorageIndex,
)
from cloudcontrol.node.hypervisor.domains import VirtualMachine
from cloudcontrol.node.utils import close_fds, set_signal_map, num_to_sig
from cloudcontrol.node.exc import VMMigrationError
logger = logging.getLogger(__name__)
BAD_VM_NAME = re.compile('^job-(0|([1-9]\d*))$')
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# FIXME create abstract base class for any hypervisor
class KVM(object):
"""Container for all hypervisor related state."""
def __init__(self, name, handler):
"""
:param str name: name of hypervisor instance
:param Handler handler: hypervisor handler
"""
self.handler = weakref.proxy(handler)
#: hv attributes
self.name = name
self.type = u'kvm'
# register libvirt error handler
libvirt.registerErrorHandler(self.vir_error_cb, None)
# libvirt event loop abstraction
self.vir_event_loop = VirEventLoop(self.handler.main.evloop)
self.vir_con = libvirt.open('qemu:///system') # currently only support KVM
# findout storage
self.storage = StorageIndex(handler, self.vir_con)
logger.debug('Storages: %s', self.storage.paths)
#: domains: vms, containers...
self.domains = dict()
# find defined domains
for dom_name in self.vir_con.listDefinedDomains():
dom = self.vir_con.lookupByName(dom_name)
self.domains[dom.name()] = VirtualMachine(dom, self)
# find started domains
for dom_id in self.vir_con.listDomainsID():
dom = self.vir_con.lookupByID(dom_id)
self.domains[dom.name()] = VirtualMachine(dom, self)
logger.debug('Domains: %s', self.domains)
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def stop(self):
self.vir_event_loop.stop()
# unregister callback
try:
self.vir_con.domainEventDeregister(self.vir_cb)
except libvirt.libvirtError:
# in case the libvirt connection is broken, it will raise the error
pass
ret = self.vir_con.close()
logger.debug('Libvirt still handling %s ref connections', ret)
def vir_error_cb(self, ctxt, err):
"""Libvirt error callback.
See http://libvirt.org/errors.html for more informations.
:param ctxt: arbitrary context data (not needed because context is
givent by self
:param err: libvirt error code
"""
logger.error('Libvirt error %s', err)
def vir_cb(self, conn, dom, event, detail, opaque):
"""Callback for libvirt event loop."""
logger.debug('Received event %s on domain %s, detail %s', event,
dom.name(), detail)
event = EVENTS[event]
if event == 'Added':
# prevents name conflicts with others cc-node objects
if BAD_VM_NAME.match(dom.name()):
logger.error('Cannot register VM %s as its name would '
'conflits with others cc-node objects')
return
# update Storage pools in case VM has volumes that were created
self.storage.update()
# if vm is redefined while running we need to refresh its devices
# when stopped
redefine_on_stop = False
if dom.name() in self.domains:
# sometimes libvirt send us the same event multiple times
# this can be the result of a change in the domain configuration
# we first remove the old domain
vm = self.domains.pop(dom.name())
if vm.state not in ('stopped', 'crashed'):
# if the vm was updated while it was "on", then the
# modifications will not be reflected since we construct the
# object/tags from running XML
redefine_on_stop = True
logger.debug('Domain %s recreated', dom.name())
self.vm_register(dom, redefine_on_stop)
elif event == 'Removed':
vm_name = dom.name()
self.vm_unregister(vm_name)
logger.info('Removed domain %s', vm_name)
elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved',
'Restored'):
vm = self.domains.get(dom.name())
# sometimes libvirt sent a start event before a created event so be
# careful
if vm is not None:
try:
state = DOMAIN_STATES[dom.info()[0]]
except libvirt.libvirtError as exc:
# checks that domain was not previously removed
# seems to happen only in libvirt 0.8.8
if 'Domain not found' in str(exc):
self.vm_unregister(dom.name())
else:
raise
else:
logger.info('Domain change state from %s to %s', vm.state,
state)
if event == 'Stopped' and vm.redefine_on_stop:
# if the vm was changed while it was running, then we
# need to recreate it now as stated above
self.vm_unregister(vm.name)
self.vm_register(dom)
else:
vm.state = state
self.update_domain_count()
def vm_register(self, dom, redefine_on_stop=False):
"""Register a VM to the hypervisor object.
:param dom: libvirt domain instance
:param redefine_on_stop: if we need to reread the domain XML on stop
"""
vm = VirtualMachine(dom, self)
logger.info('Created domain %s', vm.name)
vm.redefine_on_stop = redefine_on_stop
self.domains[vm.name] = vm
vm.tag_db.set_parent(ParentWrapper(vm.name, 'vm', self.handler.tag_db))
self.update_domain_count()
def vm_unregister(self, name):
"""Unregister a VM from the cc-server and remove it from the index."""
try:
vm = self.domains.pop(name)
except KeyError:
# domain already removed, see hypervisor/domains/vm_tags.py
# sometimes libvirt send us the remove event too late
# we still update storage and tag attributes
pass
else:
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# update Storage pools in case VM had volumes that were deleted
self.storage.update()
self.update_domain_count()
def update_domain_count(self):
"""Update domain state count tags."""
# update domain state counts
for tag in ('nvm', 'vmpaused', 'vmstarted', 'vmstopped', 'cpualloc',
'cpurunning', 'memalloc', 'memrunning'):
self.handler.tag_db['__main__'][tag].update_value()
def vm_define(self, xml_desc):
"""Create a VM on the Hypervisor
:param str xml_desc: XML description in libvirt format
:return: VM name created
"""
try:
return self.vir_con.defineXML(xml_desc).name()
except libvirt.libvirtError:
logger.exception('Error while creating domain')
# reraise exception for the cc-server
raise
def _count_domain(self, filter=lambda d: True):
count = 0
for dom in self.domains.itervalues():
if filter(dom):
count += 1
return count
@property
def vm_started(self):
"""Number of VMs started."""
return self._count_domain(lambda d: d.state == 'running')
@property
def vm_stopped(self):
"""Number of VMs stopped."""
return self._count_domain(lambda d: d.state == 'stopped')
@property
def vm_paused(self):
"""Number of VMs paused."""
return self._count_domain(lambda d: d.state == 'paused')
@property
def vm_total(self):
"""Total number of VMs on the hypervisor."""
return self._count_domain()
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
class LiveMigration(object):
def __init__(self, main_loop, vm, node2virt_port, virt2virt_port, timeout,
unsafe=False):
"""Performs live migration in a forked process.
:param main_loop: instance of MainLoop
:param vm: instance of VM to migrate
:param node2virt_port: port for ccnode -> distant libvirt
:param virt2virt_port: port for local libvirt -> distant libvirt
:param float timeout: timeout for libvirt migration (prevents libvirt
from trying to acquire domain lock forever)
:param bool unsafe: for Libvirt >= 0.9.11, see
http://libvirt.org/html/libvirt-libvirt.html#virDomainMigrateFlags
"""
self.main = main_loop
self.vm = vm
self.node2virt_port = node2virt_port
self.virt2virt_port = virt2virt_port
self.timeout = timeout
self.unsafe = unsafe
#: child pid
self.pid = None
self.error_msg = None
self.return_status = None
# event for caller thread to wait migration termination
self.event = threading.Event()
self.do_fork()
def create_watchers(self):
self.timeout_watcher = self.main.evloop.timer(self.timeout, 0.,
self.timeout_cb)
self.child_watcher = self.main.evloop.child(self.pid, False,
self.child_cb)
self.timeout_watcher.start()
self.child_watcher.start()
def child_cb(self, watcher, revents):
self.pid = None
self.return_status = watcher.rstatus
watcher.stop()
if self.timeout_watcher.active:
self.timeout_watcher.stop()
self.child_watcher = None
logger.debug('Status: %s', self.return_status)
# test if killed, then set msg
if os.WIFSIGNALED(self.return_status):
signo = os.WTERMSIG(self.return_status)
if signo == signal.SIGKILL:
self.error_msg = 'Migration timeout for vm %s' % self.vm.name
else:
self.error_msg = 'Migration failed for vm %s, (%s)' % (
self.vm.name, num_to_sig(signo))
else:
# test status
status = os.WEXITSTATUS(self.return_status)
if status == 1:
self.error_msg = (
'Migration failed for vm %s, due to libvirt error'
% self.vm.name)
elif status == 4:
self.error_msg = 'Cannot open new connection to libvirt'
elif status == 5:
self.error_msg = 'Cannot open connection to remote libvirt'
elif status != 0:
self.error_msg = 'Migration failed for vm %s (%d)' % (
self.vm.name, status)
self.event.set()
def timeout_cb(self, watcher, revents):
# kill the young
logger.debug('Killing child migration process')
os.kill(self.pid, signal.SIGKILL)
@main_thread
def do_fork(self):
# we fork and open a new connection to libvirt because sometimes libvirt
# python binding, while doing a operation,
# doesn't seem to realease CPython's GIL, therefore all node
# operations are blocked
# the only solution we have found right now is to use a dedicated
# libvirt connection for the migration and fork, the migration operation
# in itself is handled by the child while other threads can be scheduled
try:
pid = os.fork()
except OSError:
logger.error('Cannot fork before running live migration')
raise
if pid == 0:
# child
try:
self.child_work()
except:
# whatever the matter is we MUST NOT return to libev or sjRPC
traceback.print_exc('Error uncatched')
finally:
os._exit(42)
self.pid = pid
self.create_watchers()
def child_work(self):
# migration is performed here
sys.stderr.write('Hello from child !\n')
sys.stderr.write('Debug is %s\n' % self.main.config.debug)
try:
close_fds(debug=self.main.config.debug)
set_signal_map({
signal.SIGTERM: lambda *args: os._exit(1),
signal.SIGUSR1: signal.SIG_IGN,
signal.SIGINT: signal.SIG_IGN,
# FIXME need more signal ?
})
except:
sys.stderr.write('Error while performing post fork work\n')
traceback.print_exc(file=sys.stderr)
# create a new libvirt connection dedicated to migration
sys.stderr.write('Open new connection to libvirt\n')
try:
new_con = libvirt.open('qemu:///system')
domain = new_con.lookupByUUIDString(self.vm.uuid)
except libvirt.libvirtError:
sys.stderr.write('Cannot connect to libvirt\n')
os._exit(4)
except:
# error
traceback.print_exc(sys.stderr)
os._exit(2)
sys.stderr.write('Open destination libvirt connection\n')
try:
dest_virt_con = libvirt.open(
'qemu+tcp://127.0.0.1:%d/system' % self.node2virt_port)
except libvirt.libvirtError:
sys.stderr.write('Cannot connect to remote libvirt for live'
' migrating vm %s', self.vm.name)
os._exit(5)
except:
# error
traceback.print_exc(file=sys.stderr)
os._exit(2)
try:
if self.unsafe:
# VIR_MIGRATE_UNSAFE is not defined for libvirt < 0.9.11
append_flags = getattr(libvirt, 'VIR_MIGRATE_UNSAFE', 0)
else:
append_flags = 0
sys.stderr.write('Do migrate\n')
domain.migrate(
dest_virt_con,
libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_PEER2PEER |
libvirt.VIR_MIGRATE_TUNNELLED |
libvirt.VIR_MIGRATE_PERSIST_DEST |
libvirt.VIR_MIGRATE_UNDEFINE_SOURCE |
append_flags,
None,
'qemu+tcp://127.0.0.1:%d/system' % self.virt2virt_port,
0,
)
except libvirt.libvirtError:
sys.stderr.write('libvirt error during migration\n')
traceback.print_exc(file=sys.stderr)
os._exit(1)
except:
# whatever the matter is we MUST NOT return to libev or sjRPC
sys.stderr.write('error during migration\n')
traceback.print_exc(file=sys.stderr)
os._exit(2)
else:
os._exit(0)
finally:
new_con.close()
dest_virt_con.close()
def wait(self):
self.event.wait()
if self.return_status != 0:
raise VMMigrationError(self.error_msg)