Newer
Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see <http://www.gnu.org/licenses/>.
from StringIO import StringIO
from xml.etree import cElementTree as et
from cloudcontrol.common.client.tags import Tag, tag_inspector, ParentWrapper
from cloudcontrol.common.client.plugins import (
rpc_handler,
rpc_handler_decorator_factory,
get_rpc_handlers,
)
from cloudcontrol.node.host import Handler as HostHandler
from cloudcontrol.node.hypervisor import tags
from cloudcontrol.node.hypervisor.kvm import KVM, LiveMigration
UndefinedDomain, DRBDError, PoolStorageError
ImportVolume, ExportVolume, TCPTunnel, DRBD
from cloudcontrol.node.utils import execute
logger = logging.getLogger(__name__)
libvirt_handler_marker = '_libvirt_rpc_handler'
libvirt_handler = rpc_handler_decorator_factory(libvirt_handler_marker)
# FIXME find a way to refactor Handler and Hypervisor class
class Handler(HostHandler):
def __init__(self, *args, **kwargs):
"""
self.hypervisor_name = kwargs.pop('hypervisor_name')
HostHandler.__init__(self, *args, **kwargs)
#: keep index of asynchronous calls
self.async_calls = dict()
self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb)
self.hypervisor = None
# list of libvirt related RPC handlers
self.virt_handlers = get_rpc_handlers(self, libvirt_handler_marker)
# register tags
self.tag_db.add_tags(tag_inspector(tags, self))
@property
def virt_connected(self):
return self._virt_connected
@virt_connected.setter
def virt_connected(self, value):
self._virt_connected = value
# update tags
for tag in ('vir_status', 'sto', 'nvm', 'vmpaused', 'vmstarted',
'vmstopped', 'hvver', 'libvirtver', 'hv'):
self.tag_db['__main__'][tag].update_value()
def start(self):
self.timer.start()
HostHandler.start(self)
def stop(self):
self.timer.stop()
if self.hypervisor is not None:
self.hypervisor.stop()
HostHandler.stop(self)
def virt_connect_cb(self, *args):
# initialize hypervisor instance
self.hypervisor = KVM(
name=self.hypervisor_name,
handler=self,
)
except libvirt.libvirtError:
logger.exception('Error while connecting to libvirt')
return
self.virt_connected = True
self.hypervisor.storage.update()
for dom in self.hypervisor.domains.itervalues():
dom.tag_db.set_parent(ParentWrapper(dom.name, 'vm', self.tag_db))
# we must refresh those tags only when domains tags are registered to
# have the calculated values
tags_to_refresh = ('cpualloc', 'cpurunning', 'cpuremaining',
'cpuallocratio', 'memalloc', 'memrunning',
'memremaining', 'memallocratio')
for tag in tags_to_refresh:
self.tag_db['__main__'][tag].update_value()
# register libvirt handlers
self.rpc_handler.update(self.virt_handlers)
for k, v in self.virt_handlers.iteritems():
self.main.reset_handler(k, v)
# if everything went fine, unregister the timer
self.timer.stop()
def virt_connect_restart(self):
"""Restart libvirt connection.
This method might be called when libvirt connection is lost.
"""
if not self.virt_connected:
return
logger.error('Connection to libvirt lost, trying to restart')
# update connection state
self.virt_connected = False
# refresh those tags
tags_to_refresh = ('cpualloc', 'cpurunning', 'cpuremaining',
'cpuallocratio', 'memalloc', 'memrunning',
'memremaining', 'memallocratio')
for tag in tags_to_refresh:
self.tag_db['__main__'][tag].update_value()
# unregister tags that will be re registered later
for storage in self.hypervisor.storage.storages:
if storage.name.startswith('_'):
continue # Ignore internal storages
self.tag_db.remove_tags((
'sto%s_state' % storage,
'sto%s_size' % storage,
'sto%s_free' % storage,
'sto%s_used' % storage,
'sto%s_vol' % storage,
# unregister sub objects (for the same reason)
for sub_id in self.tag_db.keys():
if sub_id == '__main__':
continue
self.tag_db.remove_sub_object(sub_id)
# stop and delete hypervisor instance
self.hypervisor.stop()
self.hypervisor = None
# remove handlers related to libvirt
for handler in self.virt_handlers:
del self.rpc_handler[handler]
# launch connection timer
self.timer.start()
def iter_vms(self, vm_names):
"""Utility function to iterate over VM objects using their names."""
if vm_names is None:
return
get_domain = self.hypervisor.domains.get
for name in vm_names:
dom = get_domain(name)
if dom is not None:
yield dom
def vm_define(self, data, format='xml'):
logger.debug('VM define')
if format == 'xml':
return self.hypervisor.vm_define(data)
elif format == 'vmspec':
# Encode tags as description:
if 'tags' in data:
if 'description' not in data:
data['description'] = ''
for tag, value in data['tags'].iteritems():
data['description'] += '\n@%s=%s' % (tag, value)
# Delete the tags key which is not recognized by hkvm-define
try:
del data['tags']
except KeyError:
pass
rcode, output = execute(self.main, [self.main.config.define_script], stdin=json.dumps(data))
if rcode == 0:
return output.strip()
else:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
raise NotImplementedError('Format not supported')
vm = self.hypervisor.domains.get(name)
if vm is not None:
vm.undefine()
def vm_export(self, name, format='xml'):
if format != 'xml':
raise NotImplementedError('Format not supported')
vm = self.hypervisor.domains.get(name)
if vm is None:
return
return vm.lv_dom.XMLDesc(0)
@libvirt_handler
def vm_rescue(self, name):
logger.debug('VM rescue %s', name)
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.rescue_script, '-r', name])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot rescue VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_unrescue(self, name):
logger.debug('VM unrescue %s', name)
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.rescue_script, '-u', name])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot unrescue VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_install(self, name):
logger.debug('VM install %s', name)
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.install_script, '-i', name])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot install VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_uninstall(self, name):
logger.debug('VM uninstall %s', name)
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.install_script, '-u', name])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot uninstall VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_set_vlans(self, name, vlan_update_format, mac_address=None):
logger.debug('VM set vlan %s', name)
if name in self.hypervisor.domains:
if mac_address is None:
rcode, output = execute(self.main, [self.main.config.vlan_script,
name, '--', vlan_update_format])
else:
rcode, output = execute(self.main, [self.main.config.vlan_script,
'--iface-macaddr', mac_address,
name, '--', vlan_update_format])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot set vlans on VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
@libvirt_handler
def vm_attach_disk(self, name, pool, volume, driver='virtio', bps=0, iops=0):
logger.debug('VM attach disk %s/%s -> %s ', pool, volume, name)
self.hypervisor.storage.update()
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.attach_script,
'--driver', driver,
'--bps', str(bps),
'--iops', str(iops),
name, pool, volume])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot attach disk on VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_detach_disk(self, name, pool, volume):
logger.debug('VM detach disk %s/%s <- %s ', pool, volume, name)
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.detach_script,
name, pool, volume])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot detach disk from VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_boot_order(self, name, order):
logger.debug('VM boot order %s -> %s', name, order)
if name in self.hypervisor.domains:
args_order = ['%s:%s' % tuple(x) for x in order]
rcode, output = execute(self.main, [self.main.config.boot_order_script,
name] + args_order)
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot change boot order of VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
def vm_stop(self, name):
logger.debug('VM stop %s', name)
try:
self.hypervisor.domains[name].stop()
except libvirt.libvirtError:
logger.exception('Error while stopping VM %s', name)
raise
except KeyError:
msg = 'Cannot stop VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
def vm_destroy(self, name):
logger.debug('VM destroy %s', name)
try:
self.hypervisor.domains[name].destroy()
except libvirt.libvirtError as exc:
# Libvirt raises exception 'domain is not running' even if domain
# is running, might be a bug in libvirt
if 'domain is not running' not in str(exc) or (self.hypervisor.domains[name].state != 'running'):
logger.exception('Error while destroying VM %s', name)
raise
except KeyError:
msg = 'Cannot destroy VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
def vm_start(self, name, pause=False):
"""
:param str name: VM name to start
:param bool pause: start VM in pause
"""
logger.debug('VM start %s', name)
try:
self.hypervisor.domains[name].start(pause)
except libvirt.libvirtError:
logger.exception('Error while starting VM %s', name)
raise
except KeyError:
msg = 'Cannot start VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
def vm_suspend(self, name):
logger.debug('VM suspend %s', name)
try:
self.hypervisor.domains[name].suspend()
except libvirt.libvirtError:
logger.exception('Error while suspending VM %s', name)
raise
except KeyError:
msg = 'Cannot suspend VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
def vm_resume(self, name):
logger.debug('VM resume %s', name)
try:
self.hypervisor.domains[name].resume()
except libvirt.libvirtError:
logger.exception('Error while resuming VM %s', name)
raise
except KeyError:
msg = 'Cannot resume VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_reset(self, name):
logger.debug('VM reset %s', name)
try:
self.hypervisor.domains[name].reset()
except libvirt.libvirtError:
logger.exception('Error while resetting VM %s', name)
raise
except KeyError:
msg = 'Cannot reset VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_cycle(self, name):
logger.debug('VM cycle %s', name)
try:
self.hypervisor.domains[name].destroy()
time.sleep(1)
self.hypervisor.domains[name].start()
except libvirt.libvirtError:
logger.exception('Error while cycle VM %s', name)
raise
except KeyError:
msg = 'Cannot cycle VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_change_title(self, name, new_title):
logger.debug('VM edit title %s', name)
try:
self.hypervisor.domains[name].title = new_title
except libvirt.libvirtError:
logger.exception('Error while changing VM title %s', name)
raise
except KeyError:
msg = 'Cannot change title open VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_migrate(self, name, dest_uri, live=False):
try:
dom = self.hypervisor.domains[name]
except KeyError:
raise UndefinedDomain('Cannot migrate VM %s because it is not defined', name)
dom.migrate(dest_uri, live=live)
def vm_migrate_tunneled(self, name, tun_res, migtun_res, unsafe=False,
timeout=60.):
"""Live migrate VM through TCP tunnel.
:param name: VM name to migrate
:param tun_res: result of tunnel_setup handler
:param migtun_res: result of tunnel setup handler
:param bool unsafe: unsafe migration
:param float timeout: timeout for libvirt migration (prevents libvirt
from trying to acquire domain lock forever)
:param float timeout: migration timeout in seconds
"""
logger.debug('VM live migrate %s', name)
try:
# this is the port used by our libvirt in the cc-node (client
# libvirt) to connect to the remote libvirtd
remote_virt_port = tun_res['port']
except KeyError:
logger.error('Invalid formatted argument tun_res for live'
' migration')
raise
try:
# this is the port used by local libvirtd to connect to the remote
# libvirtd (see http://libvirt.org/migration.html)
remote_virt_port2 = migtun_res['port']
except KeyError:
logger.error('Invalid formatted argument migtun_res for live'
' migration')
raise
try:
vm = self.hypervisor.domains[name]
except KeyError:
logger.exception('Cannot find domain %s on hypervisor for live'
' migration', name)
raise
migration = LiveMigration(self.main, vm, remote_virt_port,
remote_virt_port2, timeout, unsafe)
migration.wait()
except Exception:
logger.exception('Error during live migration for vm %s', name)
logger.debug('Exit status %d', migration.return_status)
logger.info('Sucessfuly migrated vm %s', name)
def vm_open_console(self, conn, name):
"""
:param conn: sjRPC connection instance
:param name: VM name
"""
vm = self.hypervisor.domains[name]
# create connection to the VM console
try:
endpoint = vm.open_console()
except socket.error:
# cannot create socketpair
logger.error('Cannot create connection to VM console')
raise
except Exception:
logger.exception('Error while trying to open console for domain %s',
name)
raise
def on_shutdown(tun):
"""Method of Tunnel protocol close callback."""
vm.close_console()
# connect as tunnel endpoint
proto = conn.create_tunnel(endpoint=endpoint, on_shutdown=on_shutdown)
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
def vm_disable_virtio_cache(self, name):
"""Set virtio cache to none on VM disks.
:param name: VM name
"""
vm = self.hypervisor.domains[name]
# get VM XML
try:
xml = vm.lv_dom.XMLDesc(0)
except libvirt.libvirtError:
logger.exception('Error while getting domain XML from libvirt, %s',
vm.name)
raise
xml_tree = et.ElementTree()
xml_tree.parse(StringIO(xml))
for disk in xml_tree.findall('devices/disk'):
# check that disk is virtio
target = disk.find('target')
if target is None or target.get('bus') != 'virtio':
continue
# modify cache attr
driver = disk.find('driver')
assert driver is not None
driver.set('cache', 'none')
logger.debug('Set cache attribute for disk %s of VM %s',
target.get('dev'), name)
# write back the XML tree
out = StringIO()
xml_tree.write(out) # check encoding is fine
try:
self.hypervisor.vir_con.defineXML(out.getvalue())
except libvirt.libvirtError:
logger.exception('Cannot update XML file for domain %s', name)
raise
def vm_set_autostart(self, name, autostart=True):
"""Set autostart on VM.
:param name: VM name
:param bool autostart: autostart value to set
"""
vm = self.hypervisor.domains[name]
vm.lv_dom.setAutostart(int(bool(autostart)))
# update autostart value now instead of 10 seconds lag
vm.tag_db['__main__']['autostart'].update_value()
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
@libvirt_handler
def tag_add(self, name, tag, value):
"""Add a static tag on specified VM.
:param name: VM name
:param tag: tag name
:param value: tag value
"""
vm = self.hypervisor.domains[name]
vm.set_tag(tag, value)
@libvirt_handler
def tag_delete(self, name, tag):
"""Delete a static tag on specified VM.
:param name: VM name
:param tag: tag name
"""
vm = self.hypervisor.domains[name]
vm.delete_tag(tag)
@libvirt_handler
def tag_show(self, name):
"""Show static tags of the specified VM.
:param name: VM name
"""
vm = self.hypervisor.domains[name]
return vm.tags
def vol_create(self, pool, name, size):
logger.debug('Volume create %s, pool %s, size %s', name, pool, size)
try:
self.hypervisor.storage.create_volume(pool, name, size)
except Exception:
logger.exception('Error while creating volume')
raise
def vol_delete(self, pool, name):
logger.debug('Volume delete %s, pool %s', name, pool)
try:
self.hypervisor.storage.delete_volume(pool, name)
except Exception:
logger.exception('Error while deleting volume')
raise
def vol_import(self, pool, name):
"""
:param pool: pool name where the volume is
:param name: name of the volume
"""
logger.debug('Volume import pool = %s, volume = %s', pool, name)
try:
self.hypervisor.storage.update()
pool = self.hypervisor.storage.get_storage(pool)
if pool is None:
volume = pool.volumes.get(name)
if volume is None:
# create the job
job = self.main.job_manager.create(ImportVolume, volume)
job.start()
except Exception:
logger.exception('Error while starting import job')
raise
return dict(id=job.id, port=job.port)
def vol_import_wait(self, job_id):
"""Block until completion of the given job id."""
job = self.main.job_manager.get(job_id)
logger.debug('Waiting for import job to terminate')
logger.debug('Import job terminated')
return dict(id=job.id, log='', checksum=job.checksum)
def vol_import_cancel(self, job_id):
"""Cancel import job."""
logger.debug('Cancel import job')
self.main.job_manager.cancel(job_id)
# wait for job to end
job.join() # we don't call wait as it is already called in
# vol_import_wait handler
def vol_export(self, pool, name, raddr, rport):
"""
:param pool: pool name where the volume is
:param name: name of the volume
:param raddr: IP address of the destination to send the volume to
:param rport: TCP port of the destination
"""
pool = self.hypervisor.storage.get_storage(pool)
if pool is None:
volume = pool.volumes.get(name)
if volume is None:
try:
job = self.main.job_manager.create(ExportVolume, volume, raddr, rport)
except Exception:
logger.exception('Error while exporting volume')
raise
logger.debug('Export volume successfull')
return dict(id=job.id, log='', checksum=job.checksum)
def tun_setup(self, local=True):
"""Set up local tunnel and listen on a random port.
:param local: indicate if we should listen on localhost or all
interfaces
"""
logger.debug('Tunnel setup: local = %s', local)
# create job
job.setup_listen('127.0.0.1' if local else '0.0.0.0')
return dict(
jid=job.id,
key='FIXME',
port=job.port,
)
@threadless
def tun_connect(self, res, remote_res, remote_ip):
"""Connect tunnel to the other end.
:param res: previous result of `tun_setup` handler
:param remote_res: other end result of `tun_setup` handler
:param remote_ip: where to connect
"""
logger.debug('Tunnel connect %s %s', res['jid'], remote_ip)
job = self.main.job_manager.get(res['jid'])
job.setup_connect((remote_ip, remote_res['port']))
def tun_connect_hv(self, res, migration=False):
"""Connect tunnel to local libvirt Unix socket.
:param res: previous result of `tun_setup` handler
"""
logger.debug('Tunnel connect hypervisor %s', res['jid'])
job = self.main.job_manager.get(res['jid'])
job.setup_connect('/var/run/libvirt/libvirt-sock')
def tun_destroy(self, res):
"""Close given tunnel.
:param res: previous result as givent by `tun_setup` handler
"""
logger.debug('Tunnel destroy %s', res['jid'])
def drbd_setup(self, pool, name):
"""Create DRBD volumes.
:param pool: storage pool
:param name: storage volume name
"""
pool = self.hypervisor.storage.get_storage(pool)
if pool is None:
raise DRBDError('Cannot setup DRBD: pool storage does not exist')
elif pool.type != 'logical':
raise DRBDError('Cannot setup DRBD: pool storage is not LVM')
volume = pool.volumes.get(name)
if volume is None:
raise DRBDError('Cannot setup DRBD: volume does not exist')
try:
job = self.main.job_manager.create(DRBD, self.hypervisor.storage,
pool, volume)
except Exception:
logger.exception('Error while creating DRBD job')
raise
job.setup()
logger.debug('DRBD setup successfull')
return dict(
jid=job.id,
port=job.drbd_port,
)
def drbd_connect(self, res, remote_res, remote_ip):
"""Set up DRBD in connect mode. (Wait for connection and try to connect
to the remote peer.
:param res: previous result of `drbd_setup` handler
:param remote_res: result of remote `drbd_setup` handler
:param remote_ip: IP of remote peer
"""
job = self.main.job_manager.get(res['jid'])
job.connect(remote_ip, remote_res['port'])
job.wait_connection()
def drbd_role(self, res, primary):
"""Set up DRBD role.
:param res: previous result of `drbd_setup` handler
:param bool primary: if True, set up in primary mode else secondary
"""
job = self.main.job_manager.get(res['jid'])
if primary:
job.switch_primary()
else:
job.switch_secondary()
def drbd_takeover(self, res, state):
"""Set up DRBD device as the VM disk. FIXME
:param res: previous result of `drbd_setup` handler
:param state: FIXME
"""
job = self.main.job_manager.get(res['jid'])
job.takeover()
def drbd_sync_status(self, res):
"""Return synchronization status of a current DRBD job.
:param res: previous result of `drbd_setup` handler
"""
status = self.main.job_manager.get(res['jid']).status()
result = dict(
done=status['disk'] == 'UpToDate',
completion=status['percent'],
)
logger.debug('DRBD status %s', result)
return result
def drbd_shutdown(self, res):
"""Destroy DRBD related block devices.
:param res: previous result of `drbd_setup` handler
"""
logger.debug('DRBD shutdown')
job = self.main.job_manager.get(res['jid'])
job.cleanup()
# remove job from job_manager list
self.main.job_manager.notify(job)