# This file is part of CloudControl. # # CloudControl is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # CloudControl is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with CloudControl. If not, see . import logging import socket import json from StringIO import StringIO from xml.etree import cElementTree as et import libvirt from sjrpc.utils import threadless, pass_connection from cloudcontrol.common.client.tags import Tag, tag_inspector, ParentWrapper from cloudcontrol.common.client.plugins import ( rpc_handler, rpc_handler_decorator_factory, get_rpc_handlers, ) from cloudcontrol.node.host import Handler as HostHandler from cloudcontrol.node.hypervisor import tags from cloudcontrol.node.hypervisor.kvm import KVM, LiveMigration from cloudcontrol.node.exc import ( UndefinedDomain, DRBDError, PoolStorageError ) from cloudcontrol.node.hypervisor.jobs import ( ImportVolume, ExportVolume, TCPTunnel, DRBD ) from cloudcontrol.node.utils import execute logger = logging.getLogger(__name__) libvirt_handler_marker = '_libvirt_rpc_handler' libvirt_handler = rpc_handler_decorator_factory(libvirt_handler_marker) # FIXME find a way to refactor Handler and Hypervisor class class Handler(HostHandler): def __init__(self, *args, **kwargs): """ :param loop: MainLoop instance :param hypervisor_name: hypervisor name """ self.hypervisor_name = kwargs.pop('hypervisor_name') self._virt_connected = False HostHandler.__init__(self, *args, **kwargs) #: keep index of asynchronous calls self.async_calls = dict() self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb) self.hypervisor = None # list of libvirt related RPC handlers self.virt_handlers = get_rpc_handlers(self, libvirt_handler_marker) # register tags self.tag_db.add_tags(tag_inspector(tags, self)) @property def virt_connected(self): return self._virt_connected @virt_connected.setter def virt_connected(self, value): self._virt_connected = value # update tags for tag in ('vir_status', 'sto', 'nvm', 'vmpaused', 'vmstarted', 'vmstopped', 'hvver', 'libvirtver', 'hv'): self.tag_db['__main__'][tag].update_value() def start(self): self.timer.start() HostHandler.start(self) def stop(self): self.timer.stop() if self.hypervisor is not None: self.hypervisor.stop() HostHandler.stop(self) def virt_connect_cb(self, *args): # initialize hypervisor instance try: self.hypervisor = KVM( name=self.hypervisor_name, handler=self, ) except libvirt.libvirtError: logger.exception('Error while connecting to libvirt') return self.virt_connected = True # register hypervisor storage tags for name, storage in self.hypervisor.storage.storages.iteritems(): self.tag_db.add_tags(( Tag('sto%s_state' % name, lambda sto: sto.state, 5, 5, storage), Tag('sto%s_size' % name, lambda sto: sto.capacity, 5, 5, storage), Tag('sto%s_free' % name, lambda sto: sto.available, 5, 5, storage), Tag('sto%s_used' % name, lambda sto: sto.capacity - sto.available, 5, 5, storage), Tag('sto%s_type' % name, lambda sto: sto.type, 5, 5, storage), Tag('sto%s_vol' % name, lambda sto: ' '.join(sto.volumes) if sto.volumes and sto.type != 'rbd' else None, 5, 5, storage), Tag('sto%s_ratio' % name, lambda sto: '%0.2f' % (1 - float(sto.available) / sto.capacity), 5, 5, storage), )) # register domains for dom in self.hypervisor.domains.itervalues(): dom.tag_db.set_parent(ParentWrapper(dom.name, 'vm', self.tag_db)) # we must refresh those tags only when domains tags are registered to # have the calculated values tags_to_refresh = ('cpualloc', 'cpurunning', 'cpuremaining', 'cpuallocratio', 'memalloc', 'memrunning', 'memremaining', 'memallocratio') for tag in tags_to_refresh: self.tag_db['__main__'][tag].update_value() # register libvirt handlers self.rpc_handler.update(self.virt_handlers) for k, v in self.virt_handlers.iteritems(): self.main.reset_handler(k, v) # if everything went fine, unregister the timer self.timer.stop() def virt_connect_restart(self): """Restart libvirt connection. This method might be called when libvirt connection is lost. """ if not self.virt_connected: return logger.error('Connection to libvirt lost, trying to restart') # update connection state self.virt_connected = False # refresh those tags tags_to_refresh = ('cpualloc', 'cpurunning', 'cpuremaining', 'cpuallocratio', 'memalloc', 'memrunning', 'memremaining', 'memallocratio') for tag in tags_to_refresh: self.tag_db['__main__'][tag].update_value() # unregister tags that will be re registered later for storage in self.hypervisor.storage.storages: self.tag_db.remove_tags(( 'sto%s_state' % storage, 'sto%s_size' % storage, 'sto%s_free' % storage, 'sto%s_used' % storage, 'sto%s_type' % storage, 'sto%s_vol' % storage, 'sto%s_ratio' % storage, )) # unregister sub objects (for the same reason) for sub_id in self.tag_db.keys(): if sub_id == '__main__': continue self.tag_db.remove_sub_object(sub_id) # stop and delete hypervisor instance self.hypervisor.stop() self.hypervisor = None # remove handlers related to libvirt for handler in self.virt_handlers: del self.rpc_handler[handler] self.main.remove_handler(handler) # launch connection timer self.timer.start() def iter_vms(self, vm_names): """Utility function to iterate over VM objects using their names.""" if vm_names is None: return get_domain = self.hypervisor.domains.get for name in vm_names: dom = get_domain(name) if dom is not None: yield dom @libvirt_handler def vm_define(self, data, format='xml'): logger.debug('VM define') if format == 'xml': return self.hypervisor.vm_define(data) elif format == 'vmspec': # Encode tags as description: if 'tags' in data: if 'description' not in data: data['description'] = '' for tag, value in data['tags'].iteritems(): data['description'] += '\n@%s=%s' % (tag, value) # Delete the tags key which is not recognized by hkvm-define try: del data['tags'] except KeyError: pass rcode, output = execute(self.main, [self.main.config.define_script], stdin=json.dumps(data)) if rcode == 0: return output.strip() else: raise RuntimeError(output.strip().split('\n')[-1].strip()) else: raise NotImplementedError('Format not supported') @libvirt_handler def vm_undefine(self, name): logger.debug('VM undefine %s', name) vm = self.hypervisor.domains.get(name) if vm is not None: vm.undefine() @libvirt_handler def vm_export(self, name, format='xml'): logger.debug('VM export %s', name) if format != 'xml': raise NotImplementedError('Format not supported') vm = self.hypervisor.domains.get(name) if vm is None: return return vm.lv_dom.XMLDesc(0) @libvirt_handler def vm_rescue(self, name): logger.debug('VM rescue %s', name) if name in self.hypervisor.domains: rcode, output = execute(self.main, [self.main.config.rescue_script, '-r', name]) if rcode != 0: raise RuntimeError(output.strip().split('\n')[-1].strip()) else: msg = 'Cannot rescue VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) @libvirt_handler def vm_unrescue(self, name): logger.debug('VM unrescue %s', name) if name in self.hypervisor.domains: rcode, output = execute(self.main, [self.main.config.rescue_script, '-u', name]) if rcode != 0: raise RuntimeError(output.strip().split('\n')[-1].strip()) else: msg = 'Cannot unrescue VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) @libvirt_handler def vm_stop(self, name): logger.debug('VM stop %s', name) try: self.hypervisor.domains[name].stop() except libvirt.libvirtError: logger.exception('Error while stopping VM %s', name) raise except KeyError: msg = 'Cannot stop VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) @libvirt_handler def vm_destroy(self, name): logger.debug('VM destroy %s', name) try: self.hypervisor.domains[name].destroy() except libvirt.libvirtError as exc: # Libvirt raises exception 'domain is not running' even if domain # is running, might be a bug in libvirt if 'domain is not running' not in str(exc) or (self.hypervisor.domains[name].state != 'running'): logger.exception('Error while destroying VM %s', name) raise except KeyError: msg = 'Cannot destroy VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) @libvirt_handler def vm_start(self, name, pause=False): """ :param str name: VM name to start :param bool pause: start VM in pause """ logger.debug('VM start %s', name) try: self.hypervisor.domains[name].start(pause) except libvirt.libvirtError: logger.exception('Error while starting VM %s', name) raise except KeyError: msg = 'Cannot start VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) @libvirt_handler def vm_suspend(self, name): logger.debug('VM suspend %s', name) try: self.hypervisor.domains[name].suspend() except libvirt.libvirtError: logger.exception('Error while suspending VM %s', name) raise except KeyError: msg = 'Cannot suspend VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) @libvirt_handler def vm_resume(self, name): logger.debug('VM resume %s', name) try: self.hypervisor.domains[name].resume() except libvirt.libvirtError: logger.exception('Error while resuming VM %s', name) raise except KeyError: msg = 'Cannot resume VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) @libvirt_handler def vm_change_title(self, name, new_title): logger.debug('VM edit title %s', name) try: self.hypervisor.domains[name].title = new_title except libvirt.libvirtError: logger.exception('Error while changing VM title %s', name) raise except KeyError: msg = 'Cannot change title open VM %s because it is not defined' % name logger.error(msg) raise UndefinedDomain(msg) @libvirt_handler def vm_migrate(self, name, dest_uri, live=False): try: dom = self.hypervisor.domains[name] except KeyError: raise UndefinedDomain('Cannot migrate VM %s because it is not defined', name) dom.migrate(dest_uri, live=live) @libvirt_handler def vm_migrate_tunneled(self, name, tun_res, migtun_res, unsafe=False, timeout=60.): """Live migrate VM through TCP tunnel. :param name: VM name to migrate :param tun_res: result of tunnel_setup handler :param migtun_res: result of tunnel setup handler :param bool unsafe: unsafe migration :param float timeout: timeout for libvirt migration (prevents libvirt from trying to acquire domain lock forever) :param float timeout: migration timeout in seconds """ logger.debug('VM live migrate %s', name) try: # this is the port used by our libvirt in the cc-node (client # libvirt) to connect to the remote libvirtd remote_virt_port = tun_res['port'] except KeyError: logger.error('Invalid formatted argument tun_res for live' ' migration') raise try: # this is the port used by local libvirtd to connect to the remote # libvirtd (see http://libvirt.org/migration.html) remote_virt_port2 = migtun_res['port'] except KeyError: logger.error('Invalid formatted argument migtun_res for live' ' migration') raise try: vm = self.hypervisor.domains[name] except KeyError: logger.exception('Cannot find domain %s on hypervisor for live' ' migration', name) raise migration = LiveMigration(self.main, vm, remote_virt_port, remote_virt_port2, timeout, unsafe) try: migration.wait() except Exception: logger.exception('Error during live migration for vm %s', name) logger.debug('Exit status %d', migration.return_status) raise logger.info('Sucessfuly migrated vm %s', name) @threadless @pass_connection @libvirt_handler def vm_open_console(self, conn, name): """ :param conn: sjRPC connection instance :param name: VM name """ vm = self.hypervisor.domains[name] # create connection to the VM console try: endpoint = vm.open_console() except socket.error: # cannot create socketpair logger.error('Cannot create connection to VM console') raise except Exception: logger.exception('Error while trying to open console for domain %s', name) raise def on_shutdown(tun): """Method of Tunnel protocol close callback.""" vm.close_console() # connect as tunnel endpoint proto = conn.create_tunnel(endpoint=endpoint, on_shutdown=on_shutdown) return proto.label @libvirt_handler def vm_disable_virtio_cache(self, name): """Set virtio cache to none on VM disks. :param name: VM name """ vm = self.hypervisor.domains[name] # get VM XML try: xml = vm.lv_dom.XMLDesc(0) except libvirt.libvirtError: logger.exception('Error while getting domain XML from libvirt, %s', vm.name) raise xml_tree = et.ElementTree() xml_tree.parse(StringIO(xml)) for disk in xml_tree.findall('devices/disk'): # check that disk is virtio target = disk.find('target') if target is None or target.get('bus') != 'virtio': continue # modify cache attr driver = disk.find('driver') assert driver is not None driver.set('cache', 'none') logger.debug('Set cache attribute for disk %s of VM %s', target.get('dev'), name) # write back the XML tree out = StringIO() xml_tree.write(out) # check encoding is fine try: self.hypervisor.vir_con.defineXML(out.getvalue()) except libvirt.libvirtError: logger.exception('Cannot update XML file for domain %s', name) raise @libvirt_handler def vm_set_autostart(self, name, autostart=True): """Set autostart on VM. :param name: VM name :param bool autostart: autostart value to set """ vm = self.hypervisor.domains[name] vm.lv_dom.setAutostart(int(bool(autostart))) # update autostart value now instead of 10 seconds lag vm.tag_db['__main__']['autostart'].update_value() @libvirt_handler def tag_add(self, name, tag, value): """Add a static tag on specified VM. :param name: VM name :param tag: tag name :param value: tag value """ vm = self.hypervisor.domains[name] vm.set_tag(tag, value) @libvirt_handler def tag_delete(self, name, tag): """Delete a static tag on specified VM. :param name: VM name :param tag: tag name """ vm = self.hypervisor.domains[name] vm.delete_tag(tag) @libvirt_handler def tag_show(self, name): """Show static tags of the specified VM. :param name: VM name """ vm = self.hypervisor.domains[name] return vm.tags @libvirt_handler def vol_create(self, pool, name, size): logger.debug('Volume create %s, pool %s, size %s', name, pool, size) try: self.hypervisor.storage.create_volume(pool, name, size) except Exception: logger.exception('Error while creating volume') raise @libvirt_handler def vol_delete(self, pool, name): logger.debug('Volume delete %s, pool %s', name, pool) try: self.hypervisor.storage.delete_volume(pool, name) except Exception: logger.exception('Error while deleting volume') raise @libvirt_handler def vol_import(self, pool, name): """ :param pool: pool name where the volume is :param name: name of the volume """ logger.debug('Volume import pool = %s, volume = %s', pool, name) try: pool = self.hypervisor.storage.get_storage(pool) if pool is None: raise PoolStorageError('Pool storage does not exist') volume = pool.volumes.get(name) if volume is None: raise PoolStorageError('Volume does not exist') # create the job job = self.main.job_manager.create(ImportVolume, volume) job.start() except Exception: logger.exception('Error while starting import job') raise return dict(id=job.id, port=job.port) @libvirt_handler def vol_import_wait(self, job_id): """Block until completion of the given job id.""" job = self.main.job_manager.get(job_id) logger.debug('Waiting for import job to terminate') job.wait() logger.debug('Import job terminated') return dict(id=job.id, log='', checksum=job.checksum) @libvirt_handler def vol_import_cancel(self, job_id): """Cancel import job.""" logger.debug('Cancel import job') job = self.main.job_manager.get(job_id) self.main.job_manager.cancel(job_id) # wait for job to end job.join() # we don't call wait as it is already called in # vol_import_wait handler @libvirt_handler def vol_export(self, pool, name, raddr, rport): """ :param pool: pool name where the volume is :param name: name of the volume :param raddr: IP address of the destination to send the volume to :param rport: TCP port of the destination """ pool = self.hypervisor.storage.get_storage(pool) if pool is None: raise PoolStorageError('Pool storage does not exist') volume = pool.volumes.get(name) if volume is None: raise PoolStorageError('Volume does not exist') try: job = self.main.job_manager.create(ExportVolume, volume, raddr, rport) job.start() job.wait() except Exception: logger.exception('Error while exporting volume') raise logger.debug('Export volume successfull') return dict(id=job.id, log='', checksum=job.checksum) @threadless @rpc_handler def tun_setup(self, local=True): """Set up local tunnel and listen on a random port. :param local: indicate if we should listen on localhost or all interfaces """ logger.debug('Tunnel setup: local = %s', local) # create job job = self.main.job_manager.create(TCPTunnel) job.setup_listen('127.0.0.1' if local else '0.0.0.0') return dict( jid=job.id, key='FIXME', port=job.port, ) @threadless @rpc_handler def tun_connect(self, res, remote_res, remote_ip): """Connect tunnel to the other end. :param res: previous result of `tun_setup` handler :param remote_res: other end result of `tun_setup` handler :param remote_ip: where to connect """ logger.debug('Tunnel connect %s %s', res['jid'], remote_ip) job = self.main.job_manager.get(res['jid']) job.setup_connect((remote_ip, remote_res['port'])) job.start() @threadless @rpc_handler def tun_connect_hv(self, res, migration=False): """Connect tunnel to local libvirt Unix socket. :param res: previous result of `tun_setup` handler """ logger.debug('Tunnel connect hypervisor %s', res['jid']) job = self.main.job_manager.get(res['jid']) job.setup_connect('/var/run/libvirt/libvirt-sock') job.start() @threadless @rpc_handler def tun_destroy(self, res): """Close given tunnel. :param res: previous result as givent by `tun_setup` handler """ logger.debug('Tunnel destroy %s', res['jid']) job = self.main.job_manager.get(res['jid']) self.main.job_manager.cancel(job.id) job.wait() @libvirt_handler def drbd_setup(self, pool, name): """Create DRBD volumes. :param pool: storage pool :param name: storage volume name """ pool = self.hypervisor.storage.get_storage(pool) if pool is None: raise DRBDError('Cannot setup DRBD: pool storage does not exist') elif pool.type != 'logical': raise DRBDError('Cannot setup DRBD: pool storage is not LVM') volume = pool.volumes.get(name) if volume is None: raise DRBDError('Cannot setup DRBD: volume does not exist') try: job = self.main.job_manager.create(DRBD, self.hypervisor.storage, pool, volume) except Exception: logger.exception('Error while creating DRBD job') raise job.setup() logger.debug('DRBD setup successfull') return dict( jid=job.id, port=job.drbd_port, ) @libvirt_handler def drbd_connect(self, res, remote_res, remote_ip): """Set up DRBD in connect mode. (Wait for connection and try to connect to the remote peer. :param res: previous result of `drbd_setup` handler :param remote_res: result of remote `drbd_setup` handler :param remote_ip: IP of remote peer """ job = self.main.job_manager.get(res['jid']) job.connect(remote_ip, remote_res['port']) job.wait_connection() @libvirt_handler def drbd_role(self, res, primary): """Set up DRBD role. :param res: previous result of `drbd_setup` handler :param bool primary: if True, set up in primary mode else secondary """ job = self.main.job_manager.get(res['jid']) if primary: job.switch_primary() else: job.switch_secondary() @libvirt_handler def drbd_takeover(self, res, state): """Set up DRBD device as the VM disk. FIXME :param res: previous result of `drbd_setup` handler :param state: FIXME """ job = self.main.job_manager.get(res['jid']) job.takeover() @libvirt_handler def drbd_sync_status(self, res): """Return synchronization status of a current DRBD job. :param res: previous result of `drbd_setup` handler """ status = self.main.job_manager.get(res['jid']).status() result = dict( done=status['disk'] == 'UpToDate', completion=status['percent'], ) logger.debug('DRBD status %s', result) return result @libvirt_handler def drbd_shutdown(self, res): """Destroy DRBD related block devices. :param res: previous result of `drbd_setup` handler """ logger.debug('DRBD shutdown') job = self.main.job_manager.get(res['jid']) job.cleanup() # remove job from job_manager list self.main.job_manager.notify(job)