From 64e152fef9f103b17c5e78083545c71378e6502f Mon Sep 17 00:00:00 2001 From: Thibault VINCENT Date: Mon, 10 Jan 2011 18:41:23 +0100 Subject: [PATCH] Huge rewrite, new tags, better node support on non-hv hosts Signed-off-by: Thibault VINCENT --- bin/cc-node | 11 +- ccnode/__init__.py | 60 +- ccnode/ccnode.py | 10 +- ccnode/ccnodehandlers.py | 386 ------------ ccnode/common.py | 217 +++++++ ccnode/exceptions.py | 55 +- ccnode/handlers.py | 498 +++++++++++++++ ccnode/interface.py | 313 ---------- ccnode/kvm.py | 15 +- ccnode/libvirtwrapper.py | 1233 ++++++++++++-------------------------- debian/control | 2 +- etc/cc-node.conf | 12 +- 12 files changed, 1153 insertions(+), 1659 deletions(-) delete mode 100644 ccnode/ccnodehandlers.py create mode 100644 ccnode/common.py create mode 100644 ccnode/handlers.py delete mode 100644 ccnode/interface.py diff --git a/bin/cc-node b/bin/cc-node index cb29870..e74923d 100755 --- a/bin/cc-node +++ b/bin/cc-node @@ -31,6 +31,7 @@ DEFAULT_CONFIGURATION = { 'verbosity': '0', 'detect_hypervisor': 'yes', #'ssl_cert': '', + 'command_execution' : 'yes', } MAX_AUTH_TIMEOUT = 10 @@ -62,7 +63,8 @@ def run_node(options): try: node = CCNode(options['address'], int(options['port']), - options['detect_hypervisor'] == 'yes') + options['detect_hypervisor'] == 'yes', + options['command_execution'] == 'yes') except Exception as err: logging.critical('Starting fail: %s' % err) return @@ -104,6 +106,7 @@ def run_node(options): logging.critical('Node failed: %s' % err) return + if __name__ == '__main__': usage = 'usage: %prog [OPTION...]' @@ -117,7 +120,7 @@ if __name__ == '__main__': cliopts, args = op.parse_args() - # Reading the config file: + # read the config file config = ConfigParser.SafeConfigParser() config.read(cliopts.config) try: @@ -127,7 +130,7 @@ if __name__ == '__main__': "in '%s'\n" % cliopts.config) sys.exit(1) - # Applying default config file options: + # applying default config file options for opt, default in DEFAULT_CONFIGURATION.iteritems(): if opt not in options or not options[opt]: if default is None: @@ -140,4 +143,4 @@ if __name__ == '__main__': while True: run_node(options) logging.warning('Critical error, restarting node') - sleep(2) + sleep(2) \ No newline at end of file diff --git a/ccnode/__init__.py b/ccnode/__init__.py index c44b82e..3dbbd74 100644 --- a/ccnode/__init__.py +++ b/ccnode/__init__.py @@ -2,65 +2,9 @@ #coding:utf-8 """ -This API abstracts all calls to any hypervisor type. -This API is split in four parts: - * Global Interface - * Libvirt Wrapper - * KVM, XEN, ... hypervisor interface - * Exceptions - -Every entity in the hypervisor library has a common ancestor -:class:`interface.Host`. - -The inheritance diagram shown below reflects the relationship between -each class. - - -.. graphviz:: - - digraph { - subgraph cluster_interface { - "Host" [shape=box]; - "Hypervisor" [shape=box]; - "VM" [shape=box]; - "Host" -> "Hypervisor"; - "Host" -> "VM"; - label = "interface"; - style = filled; - color = "#e6fbff"; - } - subgraph cluster_libvirt { - "LibvirtHypervisor" [shape=box]; - "LibvirtVm" [shape=box]; - "Hypervisor" -> "LibvirtHypervisor"; - "VM" -> "LibvirtVm"; - style=filled; - color="#ffe6ea"; - label = "libvirt"; - } - subgraph cluster_hypervisor_calls { - "KvmHypervisor" [shape=box]; - "XenHypervisor" [shape=box]; - "LibvirtHypervisor" -> "KvmHypervisor"; - "LibvirtHypervisor" -> "XenHypervisor"; - label = "Hypervisor Calls"; - labelloc = "b"; - style = filled; - color = "#fffcc9"; - } - subgraph cluster_vm_calls { - "KvmVm" [shape=box]; - "XenVm" [shape=box]; - "LibvirtVm" -> "KvmVm"; - "LibvirtVm" -> "XenVm"; - label = "Virtual Machine Calls"; - labelloc = "b"; - style = filled; - color = "#fffcc9"; - } - } +TODO : rewrite based on older doc """ -__version__ = 7 \ No newline at end of file +__version__ = '8~dev+revamp' diff --git a/ccnode/ccnode.py b/ccnode/ccnode.py index 98eace9..c873b03 100644 --- a/ccnode/ccnode.py +++ b/ccnode/ccnode.py @@ -1,3 +1,4 @@ + import socket import threading import ssl @@ -5,16 +6,17 @@ import logging from sjrpc.client import SimpleRpcClient from sjrpc.utils import ConnectionProxy from sjrpc.core import RpcError -from ccnodehandlers import * +import handlers class CCNode(object): ''' - This class handles node initialization and connection and authentication + Handle node initialization, connection to server, and authentication ''' - def __init__(self, server, port, hypervisor, cert=None): + def __init__(self, server, port, hypervisor, exec_cmd, cert=None): self.manager = SimpleRpcClient.from_addr(server, port, enable_ssl=True, - default_handler=NodeHandler(self, hypervisor)) + default_handler=handlers.NodeHandler(self, hypervisor, + exec_cmd)) self.server = ConnectionProxy(self.manager) def run(self): diff --git a/ccnode/ccnodehandlers.py b/ccnode/ccnodehandlers.py deleted file mode 100644 index 03c8303..0000000 --- a/ccnode/ccnodehandlers.py +++ /dev/null @@ -1,386 +0,0 @@ -# Gerer le cas ou le mdp n'est pa valable -#TODO: catch livbirt disconnection errors and handle it - - -from __init__ import __version__ -from interface import Host -from kvm import * -from sjrpc.utils import RpcHandler -from sjrpc.utils import pure -from sjrpc.core import RpcError -from exceptions import VMError -import logging - -############ -## Constants -############ - -# Logging errors - -TAG_METHOD_ERROR = 'No method found in hypervisor to handle tag %s' -TAG_NOT_FOUND_ERROR = 'Tag %s requested but unknown in node handler' - -# Tags to method mappings - -# Hypervisor - -HV_TAG_WRAP_MAP = { - 'h' : 'get_name', - 'hv' : 'get_name', - 'hvtype' : 'get_hv_type', - 'libvirtver': 'get_hv_version', - 'arch' : 'get_arch_type', - 'uname' : 'get_uname', - 'uptime' : 'get_uptime', - 'load' : 'get_loadavg', - 'cpu' : 'get_cpu', - 'cpufreq' : 'get_cpu_frequency', - 'cpuuse' : 'get_cpu_percent', - 'status' : 'get_status', - 'mem' : 'get_mem', - 'memused' : 'get_mem_used', - 'memfree' : 'get_mem_free', - 'nvm' : 'get_vm_count', -} - -HV_TAG_HELPER_LIST = [ - 'disk', - 'sto', -] - -# Vm - -VM_TAG_WRAP_MAP = { - 'h' : 'get_name', - 'vm' : 'get_name', - #'hv' : 'get_hv_name', - 'arch' : 'get_arch', - 'cpu' : 'get_cpu', - 'status' : 'get_status', - 'mem' : 'get_mem', - 'memused' : 'get_memused', - 'memfree' : 'get_memfree', -} - -VM_TAG_HELPER_LIST = [ - 'disk', - 'nic', -] - - -class NodeHandler(RpcHandler): - ''' - This is the main node handler that exports the hypervisor - capabilities to any connected server. In other terms this is - the node interface to the server. - ''' - - def __init__(self, connection, hypervisor): - super(RpcHandler, self).__init__() - self._connection = connection - #do not detect hypervisor if connection is disable in configuration - if not hypervisor: - logging.debug('Hypervisor detection disabled, running as regular' - ' node') - self.hv_handle = Host() - else: - #we set our hypervisor handle - #FIXME: must be aware of hypervisor type - logging.debug('Initializing a connection on the hypervisor') - self.hv_handle = KvmHypervisor() - logging.debug('Initialized connection to a KVM hypervisor') - - def _call_hv_method(self, method): - ''' - Calls the given method in the :class:`Hypervisor` instance - Returns the result of the method called - - :param method: the method to be called - :type method: :class:`str` - ''' - logging.debug('called _call_hv_method(%s)' % method) - return getattr(self.hv_handle, HV_TAG_WRAP_MAP[method])() - - def _call_hv_helper(self, method): - logging.debug('called _call_hv_helper(%s)' % method) - return getattr(self, "_helper_hv_" + method)() - - def _call_vm_method(self, vm_name, tag): - ''' - Calls the given method in vm instance - Returns the result of the method called - - :param vm: The vm object in which call the method - :vm type: :class:`VM` - :param method: the method to be called - :type method: :class:`str` - ''' - logging.debug('called _call_vm_method(%s,%s)' % (vm_name, tag)) - vm_obj = self.hv_handle.get_vm(vm_name) - return getattr(vm_obj, VM_TAG_WRAP_MAP[tag])() - - def _call_vm_helper(self, vm_name, helper): - logging.debug('called _call_vm_helper(%s,%s)' % (vm_name, helper)) - vm_obj = self.hv_handle.get_vm(vm_name) - return getattr(self, "_helper_vm_" + helper)(vm_obj) - - def _helper_hv_sto(self): - logging.debug('called _helper_hv_sto()') - result = {} - hv = self.hv_handle - # fetch pool list - pools = hv.get_storage_pools() - result['sto'] = " ".join(pools) - # get pool info - for pool in pools: - # storage type - result['sto%s_type' % pool] = hv.get_storage_type(pool) - # path to the pool base - result['sto%s_path' % pool] = hv.get_storage_path(pool) - # pool size - result['sto%s_size' % pool] = str(hv.get_storage_capacity(pool)) - # pool used space - result['sto%s_used' % pool] = str(hv.get_storage_used(pool)) - # pool free space - result['sto%s_free' % pool] = str(hv.get_storage_free(pool)) - # pool volume list - result['sto%s_vol' % pool] = hv.get_storage_volumes(pool) - return result - - def _helper_hv_disk(self): - logging.debug('called _helper_hv_disk()') - result = {} - disks = self.hv_handle.get_disks() - result['disk'] = " ".join(disks.keys()) - for disk, size in disks.iteritems(): - result['disk%s_size' % disk] = str(size) - return result - - def _helper_vm_disk(self, vm): - logging.debug('called _helper_vm_disk(%s)' % vm.get_name()) - result = {} - # fetch disk list - disks = vm.get_disks() - logging.debug('_helper_vm_disk: disk list "%s"' % disks) - if len(disks): - result['disk'] = " ".join(str(id) for id in range(0, len(disks))) - # add disk info - for n, disk in enumerate(disks): - # disk path to real storage - result['disk%i_path' % n] = disk['path'] - # disk size - result['disk%i_size' % n] = str(disk['size']) - # disk pool - result['disk%i_pool' % n] = disk['pool'] - # disk volume - result['disk%i_vol' % n] = disk['vol'] - logging.debug('_helper_vm_disk returns "%s"' % result) - return result - - def _helper_vm_nic(self, vm): - logging.debug('called _helper_vm_nic(%s)' % vm.get_name()) - result = {} - # fetch NIC list - nics = vm.get_nics() - logging.debug('_helper_vm_nic: NIC list "%s"' % nics) - if len(nics): - result['nic'] = " ".join(str(id) for id in range(0, len(nics))) - # add NIC info - for n, nic in enumerate(nics): - try: - result['nic%i_type' % n] = nic['type'] - except: - pass - try: - result['nic%i_hwaddr' % n] = nic['hwaddr'] - except: - pass - try: - result['nic%i_source' % n] = nic['source'] - except: - pass - try: - result['nic%i_target' % n] = nic['target'] - except: - pass - try: - result['nic%i_tx' % n] = nic['tx'] - except: - pass - try: - result['nic%i_rx' % n] = nic['rx'] - except: - pass - logging.debug('_helper_vm_nic returns "%s"' % result) - return result - - @pure - def get_tags(self, tags=None): - ''' - Return tags bound to hypervisor specified in tags parameter otherwise - returns all - - :param tags: list of tags or None - :type tags: :class:`list` - ''' - result = {} - if tags is not None: - tags = set(tags) - tags |= set(('h', 'hv')) - hv_tags = HV_TAG_WRAP_MAP.keys() if tags is None else tags - # static tags - result['version'] = str(__version__) - # direct wrapping - for tag in hv_tags: - try: - result[tag] = str(self._call_hv_method(tag)) - except AttributeError: - logging.warning(TAG_METHOD_ERROR % tag) - except KeyError: - logging.warning(TAG_NOT_FOUND_ERROR % tag) - except NotImplementedError: - logging.debug('get_tags: no method in hypervisor to handle "%s"' % tag) - # tag aggregations from wrapping helpers - for helper in HV_TAG_HELPER_LIST: - htags = self._call_hv_helper(helper) - for tag, val in htags.iteritems(): - if tags is None or tag in tags: - result.update({tag:val}) - return result - - @pure - def list_vm(self, vm_names=None, tags=None): - ''' - Return a list of vm tags - If vm_names is None or [] returns all vms - If tags is None or [] returns only vm name tag - - :param vm_names: vm names - :type vm_names: :class:`list` of strings - :param tags: tags to be returned - :type tags: :class:`list` of strings - ''' - result = [] - if tags is not None: - tags = set(tags) - tags |= set(('h', 'vm')) - vm_tags = VM_TAG_WRAP_MAP.keys() if tags is None else tags - # iter on all VMs or only requested names - for vm in self.hv_handle.get_vm_names() if vm_names is None else vm_names: - vm_info = {} - # direct wrapping - for tag in vm_tags: - try: - vm_info[tag] = str(self._call_vm_method(vm, tag)) - except AttributeError: - logging.warning(TAG_METHOD_ERROR % tag) - except KeyError: - logging.warning(TAG_NOT_FOUND_ERROR % tag) - except NotImplementedError: - logging.debug('list_vm: no method in vm to handle "%s"' % tag) - # wrapping helpers - for helper in VM_TAG_HELPER_LIST: - htags = self._call_vm_helper(vm, helper) - for tag, val in htags.iteritems(): - if tags is None or tag in tags: - vm_info.update({tag:val}) - # save tags - result.append(vm_info) - - return result - - @pure - def stop_vm(self, vm_names=None, force=False): - ''' - Stop the specified list of vm names - This method do a soft shutdown on the Virtual Machine - If vm_names is None all vms in hypervisor will be stopped - - :param vm_names: the list of vm names to stop - :vm_names type: :class:`list` of strings - :param force: soft shutown or force poweroff - :type force: either True or False - ''' - if vm_names is None: - #fetch all vm names in hypervisor - vm_names = self.hv_handle.get_vm_names() - logging.debug('stop_vm: stopping vms %s' % vm_names) - for vm in vm_names: - try: - self.hv_handle.stop_vm(vm, force) - except VMError as err: - logging.warning('Error while stopping %s: %s' % (vm, err)) - else: - logging.info('stop_vm: vm %s stopping' % vm) - - @pure - def start_vm(self, vm_names=None): - ''' - Starts the specified list of vms - If vm_names is None all vms in the hypervisor will be started - - :param vm_names: the list of vms to start - :type vm_names: :class:`list` of strings - ''' - if vm_names is None: - vm_names = self.hv_handle.get_vm_names() - logging.debug('start_vm: starting vms %s' % vm_names) - for vm in vm_names: - try: - self.hv_handle.start_vm(vm) - except VMError as err: - logging.warning('Error while starting %s: %s' % (vm ,err)) - else: - logging.info('start_vm: vm %s starting' % vm) - - @pure - def suspend_vm(self, vm_names=None): - ''' - Suspends the specifed list of vms - If vm_names is None all vms in hypervisor will be suspended - - :param vm_names: the list of vms to suspend - :type vm_names: :class:`list` of strings - ''' - if vm_names is None: - vm_names = self.hv_handle.get_vm_names() - logging.debug('suspend_vm: suspending vms %s' % vm_names) - for vm in vm_names: - try: - self.hv_handle.suspend_vm(vm) - except VMError as err: - logging.info('Error while suspending %s: %s' % (vm, err)) - else: - logging.info('suspend_vm: vm %s suspended' % vm) - - @pure - def resume_vm(self, vm_names=None): - ''' - Resumes the specified list of vms - If vm_names is None all vms in hypervisor will be resumed - - :param vm_names: the list of vms to resume - :type vm_names: :class:`str` - ''' - if vm_names is None: - vm_names = self.hv_handle.get_vm_names() - logging.debug('resume_vm: resuming vms "%s"' % vm_names) - for vm in vm_names: - try: - self.hv_handle.resume_vm(vm) - except VMError as err: - logging.info('Error while resuming %s: %s' % (vm, err)) - else: - logging.info('resume_vm: vm %s resumed' % vm) - - @pure - def execute_command(self, command): - ''' - Executes the given command on the local hypervisor - - :param command: the command to execute as it would be typed on a shell - prompt - :type command: :class:`str` - ''' - result = self.hv_handle.local_execute(command) - return result diff --git a/ccnode/common.py b/ccnode/common.py new file mode 100644 index 0000000..8eaf41b --- /dev/null +++ b/ccnode/common.py @@ -0,0 +1,217 @@ + +import os +import re +import psutil +from subprocess import Popen, PIPE +from multiprocessing import cpu_count +from platform import platform, machine +from socket import gethostbyaddr, gethostname + + +class Host(object): + ''' + ''' + + +class LocalHost(Host): + ''' + ''' + + ARCH = { + 'i386' : 'x86', + 'i486' : 'x86', + 'i586' : 'x86', + 'i686' : 'x86', + 'x86_64' : 'x64', + } + + def get_hw_serial(self): + serial = None + try: + data = open('/sys/class/dmi/id/product_serial').read().strip() + if data: + serial = data + except: + pass + return serial + + def get_hw_vendor(self): + vendor = None + try: + data = open('/sys/class/dmi/id/sys_vendor').read().strip() + if data: + vendor = data + except: + pass + return vendor + + def get_hw_product(self): + product = None + try: + data = open('/sys/class/dmi/id/product_name').read().strip() + if data: + product = data + except: + pass + return product + + def get_hw_bios(self): + bios = '' + try: + bios_ver = open('/sys/class/dmi/id/bios_version').read().strip() + bios_date = open('/sys/class/dmi/id/bios_date').read().strip() + if bios_ver: + bios += bios_ver + if bios_date: + bios += ' (%s)' % bios_date + if not bios: + bios = None + except: + pass + return bios + + def get_name(self): + result = None + try: + hostname = gethostname() + fqdn = gethostbyaddr(hostname)[0] + result = fqdn if fqdn else hostname + except: + pass + return result + + def get_uname(self): + uname = None + try: + data = ' '.join(os.uname()) + if data: + uname = data + except: + pass + return uname + + def get_platform(self): + result = None + try: + p = platform() + if p: + result = p + except: + pass + return result + + def get_uptime(self): + uptime = None + try: + data = open("/proc/uptime").read().split() + if data: + uptime = int(float(data[0])) + except: + pass + return uptime + + def get_loadavg(self): + return ' '.join('%.2f' % load for load in os.getloadavg()) + + def get_arch(self): + arch = None + try: + a = machine() + if a in self.ARCH: + arch = ARCH[a] + except: + pass + return arch + + def get_cpu(self): + return cpu_count() + + def get_cpu_usage(self): + return '%.1f' % psutil.cpu_percent() + + def get_mem(self): + return psutil.avail_phymem() + psutil.used_phymem() + + def get_mem_free(self): + return psutil.avail_phymem() + + def get_mem_used(self): + return psutil.used_phymem() + + def get_disks(self): + disks = {} + try: + re_pattern = re.compile(r'([sh]d[a-z]+)') + found = [bd for bd in os.listdir('/sys/block/') + if re_pattern.match(bd)] + for disk in found: + fullname = os.path.join('/sys/block', disk, 'size') + size = int(open(fullname).read()) + if size > 0: + disks[disk] = size * 512 + except: + pass + return disks + + def power_shutdown(self): + return self.execute('/sbin/shutdown -h -P 0') + + def power_off(self): + return self.execute('/sbin/shutdown -h -P -n 0') + + def power_reboot(self): + return self.execute('/sbin/shutdown -r -f 0') + + def power_force_reboot(self): + return self.execute('/sbin/shutdown -r -n 0') + + def execute(self, command): + #FIXME: stop using shell=true and parse arguments with shlex.split() + return Popen(command, shell=True, + bufsize=-1, + stdin=PIPE, + stdout=PIPE, + stderr=PIPE).communicate() + + +class Hypervisor(LocalHost): + ''' + ''' + + def storage(self): + raise NotImplemented + + def vm_list(self): + raise NotImplemented + + def vm_list_running(self): + raise NotImplemented + + def vm_list_stopped(self): + raise NotImplemented + + def vm_list_paused(self): + raise NotImplemented + + def vm_get(self, name): + raise NotImplemented + + +class VM(Host): + ''' + ''' + + +class Storage(object): + ''' + ''' + + +class StoragePool(object): + ''' + ''' + + +class StorageVolume(object): + ''' + ''' diff --git a/ccnode/exceptions.py b/ccnode/exceptions.py index 9f07db9..78dd8d7 100644 --- a/ccnode/exceptions.py +++ b/ccnode/exceptions.py @@ -1,27 +1,46 @@ + +class CCException(Exception): + + def __init__(self, message, exception=None): + self._exception = exception + self._message = message + + def __str__(self): + if self._exception is not None: + return '[%s] %s' % (self._exception, self._message) + else: + return '%s' % self._message + +class FeatureNotImplemented(CCException): + pass + ''' -All errors in ccnode are implemented here as exceptions + Hosts ''' -class HypervisorError(Exception): - ''' - Exception class for handling hypervisor related errors - ''' +class HostError(CCException): + pass - def __init__(self, exception, message): - self.exception = exception - self.message = message - def __str__(self): - return '%s -> %s' % (self.message, self.exception) +class HypervisorError(HostError): + pass + -class VMError(Exception): - ''' - Exception class for handling vm related errors - ''' +class VMError(HostError): pass -class StorageError(Exception): - ''' - Base exception class for handling storage objects - ''' + +''' + Storage +''' + +class StorageError(CCException): + pass + + +class StoragePoolError(StorageError): + pass + + +class StorageVolumeError(StorageError): pass diff --git a/ccnode/handlers.py b/ccnode/handlers.py new file mode 100644 index 0000000..d3962fb --- /dev/null +++ b/ccnode/handlers.py @@ -0,0 +1,498 @@ + +from __init__ import __version__ +from common import LocalHost +from sjrpc.utils import RpcHandler +from sjrpc.utils import pure +from logging import debug, info +from exceptions import FeatureNotImplemented +from fnmatch import fnmatchcase + +try: + import kvm +except ImportError: + _MOD_KVM = False +else: + _MOD_KVM = True + + +class NodeHandler(RpcHandler): + ''' + Main node handler that exports the host capabilities to the server. + ''' + + def __init__(self, connection, detect_hv, allow_exec): + super(RpcHandler, self).__init__() + self._connection = connection + self._allow_cmd_exec = allow_exec + + if not detect_hv: + debug('Hypervisor detection disabled, running as regular' + ' node') + self._host_handle = LocalHost() + else: + if _MOD_KVM: + debug('Hypervisor detection...') + debug('Initializing connection to the KVM hypervisor') + self._host_handle = kvm.KvmHypervisor() + + self.EXEC_METHODS = ['execute_command', 'shutdown'] + + self.HV_TAG_MANDATORY = ['h'] + self.HV_TAG_MAP = { + 'version' : ( lambda o: True, + lambda o,t: str(__version__)), + 'h' : self._tag_map_direct('get_name'), + 'htype' : self._tag_map_direct('get_hv_type'), + 'status' : self._tag_map_direct('get_status'), + 'hserial' : self._tag_map_direct('get_hw_serial'), + 'hvendor' : self._tag_map_direct('get_hw_vendor'), + 'hmachine' : self._tag_map_direct('get_hw_product'), + 'hbios' : self._tag_map_direct('get_hw_bios'), + 'arch' : self._tag_map_direct('get_arch'), + 'platform' : self._tag_map_direct('get_platform'), + 'uname' : self._tag_map_direct('get_uname'), + 'uptime' : self._tag_map_direct('get_uptime'), + 'hvm' : self._tag_map_direct('get_hvm_available'), + 'libvirtver': self._tag_map_direct('get_hv_version'), + 'load' : self._tag_map_direct('get_loadavg'), + 'cpu' : self._tag_map_direct('get_cpu'), + 'cpucore' : self._tag_map_direct('get_cpu_core'), + 'cputhread' : self._tag_map_direct('get_cpu_threads'), + 'cpufreq' : self._tag_map_direct('get_cpu_frequency'), + 'cpuuse' : self._tag_map_direct('get_cpu_usage'), + 'mem' : self._tag_map_direct('get_mem'), + 'memfree' : self._tag_map_direct('get_mem_free'), + 'memused' : self._tag_map_direct('get_mem_used'), + 'disk' : self._tag_map_keys('get_disks'), + 'sto' : ( lambda o: hasattr(o, 'storage'), + lambda o,t: ' '.join( + getattr(o, 'storage')().pool_list())), + 'nvm' : self._tag_map_counter('vm_list'), + 'vmstarted' : self._tag_map_counter('vm_list_running'), + 'vmstopped' : self._tag_map_counter('vm_list_stopped'), + 'vmpaused' : self._tag_map_counter('vm_list_paused'), + } + self.HV_TAG_GLOB = { + 'disk*' : self._tag_map_helper(self._helper_hv_disk), + 'sto*' : self._tag_map_helper(self._helper_hv_sto), + } + + self.VM_TAG_MANDATORY = ['hv', 'h'] + self.VM_TAG_MAP = { + 'version' : ( lambda o: True, + lambda o,t: str(__version__)), + 'hv' : ( lambda o: hasattr(o, 'hypervisor'), + lambda o,t: o.hypervisor().get_name()), + # FIXME crappy tag implementation + 'status' : ( lambda o: True, + lambda o,t: 'running' if o.is_active() + else 'paused' if o.is_paused() + else 'stopped'), + 'h' : self._tag_map_direct('get_name'), + 'arch' : self._tag_map_direct('get_arch'), + 'cpu' : self._tag_map_direct('get_cpu'), + 'mem' : self._tag_map_direct('get_mem'), + 'memused' : self._tag_map_direct('get_mem_used'), + 'memfree' : self._tag_map_direct('get_mem_free'), + } + self.VM_TAG_GLOB = { + 'disk*' : self._tag_map_helper(self._helper_vm_disk), + } + + def __getitem__(self, name): + ''' + ''' + # filter the private members access + if name.startswith('_'): + raise KeyError('Remote name `%s` is private' % repr(name)) + # filter command execution methods + elif not self._allow_cmd_exec and name in self.EXEC_METHODS: + raise KeyError('Remote name `%s` is disabled by configuration' + % repr(name)) + else: + debug('Called %s.%s' % (self.__class__.__name__, name)) + return super(NodeHandler, self).__getitem__(name) + + def _tag_map_direct(self, method): + ''' + ''' + return ( lambda o: hasattr(o, method), + lambda o,t: getattr(o, method)()) + + def _tag_map_counter(self, method): + ''' + ''' + return ( lambda o: hasattr(o, method), + lambda o,t: len(getattr(o, method)())) + + def _tag_map_keys(self, method): + ''' + ''' + return ( lambda o: hasattr(o, method), + lambda o,t: ' '.join(getattr(o, method)().keys())) + + def _tag_map_helper(self, helper): + ''' + ''' + return ( lambda o, resolve=False: helper(o, resolve=resolve), + lambda o, tag_name=None, resolve=False: + helper(o, tag_name=tag_name, resolve=resolve)) + + def _helper_hv_disk(self, hv, tag_name=None, resolve=True): + ''' + ''' + result = {} + disks = hv.get_disks() + if len(disks): + result['disk'] = ' '.join(disks.keys()) + for name, size in disks.iteritems(): + if size: + result['disk%s_size' % name] = str(size) + if not result: + result = None + return result + + def _helper_hv_sto(self, hv, tag_name=None, resolve=True): + ''' + ''' + result = {} + if hasattr(hv, 'storage'): + pools = hv.storage().pool_list() + if len(pools): + result['sto'] = ' '.join(pools) + for pool_name in pools: + pool = hv.storage().pool_get(pool_name) + capa = pool.get_space_capacity() + if capa: + result['sto%s_size' % pool_name] = str(capa) + free = pool.get_space_free() + if free: + result['sto%s_free' % pool_name] = str(free) + used = pool.get_space_used() + if used: + result['sto%s_used' % pool_name] = str(used) + vol = pool.volume_list() + if vol: + result['sto%s_vol' % pool_name] = ' '.join(vol) + if not result: + result = None + return result + + def _helper_vm_disk(self, vm, tag_name=None, resolve=True): + ''' + ''' + result = {} + volumes = vm.get_volumes() + if len(volumes): + result['disk'] = ' '.join([str(i) for i in range(0, len(volumes))]) + for vol_id, vol in enumerate(volumes): + path = vol.get_path() + if path: + result['disk%i_path' % vol_id] = str(path) + capa = vol.get_space_capacity() + if capa: + result['disk%i_size' % vol_id] = str(capa) + if not result: + result = None + return result + + @pure + def node_tags(self, tags=None): + ''' + ''' + result = {} + debug('get_tags: server requested tags `%s`' % tags) + # return all tags if server does not request a subset + if tags is None: + # add simple tags + tags = self.HV_TAG_MAP.keys() + # add globbing tags + for pattern, handler in self.HV_TAG_GLOB.iteritems(): + # helper is available on the current host + if handler[0](self._host_handle): + debug('get_tags: host implements `%s`' % pattern) + # get tags from helper + htags = handler[0](self._host_handle, resolve=False) + # append all tags + tags.extend(htags) + debug('get_tags: no tag specified, expanded list to `%s`' % tags) + # add mandatory tags if missing in the list + else: + for mtag in self.HV_TAG_MANDATORY: + if mtag not in tags: + debug('get_tags: add missing mandatory tag `%s`' % mtag) + tags.append(mtag) + # query host + debug('get_tags: query host with tag list `%s`' % tags) + for tag in tags: + # first, test tag name againts list of plain name + if tag in self.HV_TAG_MAP: + debug('get_tags: plain mapping found for tag `%s`' % tag) + if self.HV_TAG_MAP[tag][0](self._host_handle): + # fetch tag data + q = self.HV_TAG_MAP[tag][1](self._host_handle, tag) + debug('get_tags: host returned `%s`' % q) + # append tag information + if q is not None: + # when a dict is returned, it may contain >1 tags + # in this case the real tag name is given by the + # wrapper and it may differ from the mapping name + if isinstance(q, dict): + for key, val in q.iteritems(): + result[key] = {} + result[key]['value'] = str(val) + # FIXME really fast + result[key]['ttl'] = -1 + # or there's only one value + else: + result[tag] = {} + result[tag]['value'] = str(q) + # FIXME really fast + result[tag]['ttl'] = -1 + else: + debug('get_tags: I wont return `%s`=`None`' % tag) + else: + debug('get_tags: tag `%s` is NOT implemented' % tag) + # if no direct tag mapping exists, test name against globbing a list + else: + debug('get_tags: searching for `%s` in globbing tags' % tag) + # iter on globbing patterns, and get helper references + # process the first globbing tag that match then exit because + # there should not exist two globbing pattern matching + # one tag, ideally + for pattern, handler in self.HV_TAG_GLOB.iteritems(): + # helper is available on the current host + if handler[0](self._host_handle, tag): + if fnmatchcase(tag, pattern): + debug('get_tags: processing tag `%s` with ' + 'pattern `%s`' % (tag, pattern)) + # get tags from helper + htags = handler[0](self._host_handle, tag) + # FIXME intead of extracting one tag, try not + # to build the whole list. Maybe it's too + # difficult and not worth to implement + if tag in htags: + debug('get_tags: found tag in helper result' + 'with value `%s`' % htags[tag]) + result[tag] = {} + result[tag]['value'] = str(htags[tag]) + # FIXME + result[tag]['ttl'] = -1 + break + return result + + @pure + def node_shutdown(self, reboot=True, gracefull=True): + ''' + ''' + info('shutdown: server requested shutdown of local host') + debug('shutdown: reboot=%s gracefull=%s' % (reboot, gracefull)) + if reboot: + method = 'power_reboot' if gracefull else 'power_force_reboot' + else: + method = 'power_shutdown' if gracefull else 'power_off' + if hasattr(self._host_handle, method): + result = getattr(self._host_handle, method)() + debug('shutdown: in progress ... action returned `%s`' % result) + return result + else: + debug('shutdown: unable to proceed, this feature is not available') + raise FeatureNotImplemented('host handler has no method `%s`' + % method) + + @pure + def vm_tags(self, vm_names=None, tags=None, resolve=True): + ''' + ''' + # list all VMs if the server did not provide names + debug('vm_list: server requested list of vm `%s`' % vm_names) + if vm_names is None: + vm_names = self._host_handle.vm_list() + debug('vm_list: no vm specified, expanded list to `%s`' % vm_names) + # return all tags if server does not request a subset + get_all = tags is None + debug('vm_list: server requested tags `%s`' % tags) + if get_all: + # add simple tags + tags = self.VM_TAG_MAP.keys() + debug('vm_list: no tag specified, expanded list to `%s`' % tags) + # add mandatory tags if missing in the list + else: + for mtag in self.VM_TAG_MANDATORY: + if mtag not in tags: + debug('vm_list: add missing mandatory tag `%s`' % mtag) + tags.append(mtag) + # query each vm + result = {} + for vm_name in vm_names: + vm_tag = {} + try: + # copy tag list for local modifications (globbing) + mytags = tags + # open a wrapper to the VM + debug('vm_list: fetching vm data for `%s`' % vm_name) + vm = self._host_handle.vm_get(vm_name) + # expand tag list with globbing tags + if get_all: + for pattern, handler in self.VM_TAG_GLOB.iteritems(): + # helper is available on this VM + if handler[0](vm): + debug('vm_list: vm implements `%s`' % pattern) + # get tags from helper + htags = handler[0](vm, resolve=False) + # append all tags + mytags.extend(htags) + # query the VM with each tag + for tag in mytags: + # first, search tag in plain mappings + if tag in self.VM_TAG_MAP: + debug('vm_list: plain mapping found for tag `%s`' % tag) + # proceed if tag can be resolved on this VM + if self.VM_TAG_MAP[tag][0](vm): + vm_tag[tag] = {} + # fetch data only if we only built the tag list + debug('vm_list: resolving tag %s`' % tag) + # call the wrapper mapping lambda + q = self.VM_TAG_MAP[tag][1](vm, tag) + debug('vm_list: query returned `%s`' % q) + # when a dict is returned, it may contain >1 tag + # in this case the real tag name is given by the + # wrapper and it may differ from the mapping nam + if isinstance(q, dict): + for key, val in q.iteritems(): + if val is not None: + if resolve: + vm_tag[key]['value'] = str(q) + # FIXME really fast + vm_tag[key]['ttl'] = -1 + else: + debug('vm_list: I wont return ' + '`%s`=`None`' % key) + # or there's only one value + elif q is not None: + if resolve: + vm_tag[tag]['value'] = str(q) + # FIXME really fast + vm_tag[tag]['ttl'] = -1 + else: + debug('vm_list: I wont return `%s`=`None`' + % tag) + # no tag mapping exist, test name against the globbing list + else: + debug('vm_list: searching for `%s` in globbing tags' + % tag) + # iter on globbing patterns, and get helper references + # process the first globbing tag that match then exit + # because there should not exist two globbing pattern + # matching one tag, ideally + for pattern, handler in self.VM_TAG_GLOB.iteritems(): + # helper is available on the current VM + if handler[0](vm, tag): + if fnmatchcase(tag, pattern): + debug('get_tags: processing tag `%s` with ' + 'pattern `%s`' % (tag, pattern)) + # get tags from helper + htags = handler[0](vm, tag) + # FIXME intead of extracting one tag, try + # not to build the whole list. Maybe it's + # too difficult and not worth implementing + if tag in htags: + debug('get_tags: found tag in helper ' + 'result with value `%s`' % htags[tag]) + vm_tag[tag] = {} + if resolve: + vm_tag[tag]['value'] = str( + htags[tag]) + # FIXME + vm_tag[tag]['ttl'] = -1 + break + # save the tag list + # FIXME: in case of exception, we won't return a single VM tag + result[vm_name] = vm_tag + except Exception as e: + debug('(%s) : %s' % (repr(e), e)) + return result + + @pure + def vm_stop(self, vm_names=None, force=False): + ''' + ''' + info('vm_stop: server requested stop of `%s`' % vm_names) + debug('vm_stop: force stop is `%s`' % force) + if vm_names is None: + vm_names = self._host_handle.vm_list_running() + debug('vm_stop: no vm specified, expanded list to `%s`' % vm_names) + for vm_name in vm_names: + try: + debug('vm_stop: fetching vm data for `%s`' % vm_name) + vm = self._host_handle.vm_get(vm_name) + if force: + debug('vm_stop: powering off `%s`' % vm_name) + vm.power_off() + else: + info('vm_stop: shutdown requested for `%s`' % vm_name) + vm.power_shutdown() + except: + pass + + @pure + def vm_start(self, vm_names=None): + ''' + ''' + info('vm_start: server requested start of `%s`' % vm_names) + if vm_names is None: + vm_names = self._host_handle.vm_list_stopped() + debug('vm_start: no vm specified, expanded list to `%s`' % vm_names) + for vm_name in vm_names: + try: + debug('vm_start: fetching vm data for `%s`' % vm_name) + vm = self._host_handle.vm_get(vm_name) + info('vm_start: powering on `%s`' % vm_name) + vm.power_on() + except: + pass + + @pure + def vm_suspend(self, vm_names=None): + ''' + ''' + info('vm_suspend: server requested suspend of `%s`' % vm_names) + if vm_names is None: + vm_names = self._host_handle.vm_list_running() + debug('vm_suspend: no vm specified, expanded list to `%s`' + % vm_names) + for vm_name in vm_names: + try: + debug('vm_suspend: fetching vm data for `%s`' % vm_name) + vm = self._host_handle.vm_get(vm_name) + info('vm_suspend: pause execution of `%s`' % vm_name) + vm.power_suspend() + except: + pass + + @pure + def vm_resume(self, vm_names=None): + ''' + ''' + info('vm_resume: server requested resume of `%s`' % vm_names) + if vm_names is None: + vm_names = self._host_handle.vm_list_running() + debug('vm_resume: no vm specified, expanded list to `%s`' + % vm_names) + for vm_name in vm_names: + try: + debug('vm_resume: fetching vm data for `%s`' % vm_name) + vm = self._host_handle.vm_get(vm_name) + info('vm_resume: resume execution of `%s`' % vm_name) + vm.power_resume() + except: + pass + + @pure + def execute_command(self, command): + ''' + ''' + info('execute_command: starting execution of `%s`' % command) + output = self._host_handle.execute(command) + info('execute_command: finished execution of `%s`' % command) + return output diff --git a/ccnode/interface.py b/ccnode/interface.py deleted file mode 100644 index 6f8f1b9..0000000 --- a/ccnode/interface.py +++ /dev/null @@ -1,313 +0,0 @@ -''' -This is the base interface to all the Cloud Control Hypervisor libraries. -It defines abstract classes that represent any kind of entity -we could handle with this library. - -We call our most basic entity a Host, basicly a couple (Machine, Operating -System) - -For instance a Host is whether a Hypervisor or a Virtual Machine, however other -type of hosts could easly be implemented -''' - -#TODO: Implements KvmHV methods and KvmVM methods - - -class Host(object): - ''' - Base host class: - A host is an abstraction of any type of machine - ''' - def get_name(self): - ''' - Returns the host's name - ''' - pass - - def get_memory_stats(self): - ''' - Returns a tuple representing memory usage - (free, used, total) - ''' - pass - - def get_status(self): - ''' - Returns host's status - ''' - pass - - def get_cpu_stats(self): - ''' - Returns cpu stats as a tuple representing: - (nb_cpu, nb_core_per_cpu, cpu_usage) - ''' - pass - - def get_network_conf(self): - ''' - Returns network configuration - ''' - pass - - def get_storage_conf(self): - ''' - Returns storage configuration - ''' - pass - - def get_storage_stats(self): - ''' - Returns storage statistics - ''' - pass - - def get_info(self): - ''' - get info about the host - get_cpu_stats - get_memory_stats - get_network_conf - get_storage_conf - ''' - pass - - - - -class Hypervisor(Host): - ''' - Base interface class for all hypervisor types and libraries - that interacts with them - ''' - - _id = 0 - - def __init__(self): - super(Hypervisor, self).__init__() - self.__class__._id += 1 - self.hv_type = 'None' - - def list_vms(self): - ''' - Lists current defined virtual machines in the hypervisor - Returns a dict of vms - ''' - pass - - def get_id(self): - ''' - Get id of the current hypervisor - Should always be 0 - ''' - return self._id - - def stop_vm(self, vm_name): - ''' - Stops the given virtual machinoe - - :param vm_name: name of the VM - :Returns: the name of the VM - ''' - pass - -class VM(Host): - ''' - Base Virtual Machine class - ''' - _id_count = 0 - - def __init__(self): - ''' - Initiate a Virtual Machine - Each VM instace has a unique ID - ''' - self.__class__._id_count += 1 - self._id = self.__class__._id_count - - def get_id(self): - ''' - Gets the id of the virtual machine - - :Returns: :class:`int` id of the VM - ''' - return self._id - - def get_status(self): - ''' - returns VM's status: - - running - - blocked in ressource - - paused by user - - being shutdown - - is shut off - - is crached - ''' - pass - - - def shutdown(self): - ''' - Shut down given vm - ''' - pass - - def suspend(self): - ''' - Suspend vm - ''' - pass - - def resume(self): - ''' - Resume vm - ''' - pass - - -class HVStorage(object): - ''' - Base class representing storage interface of a hypervisor - - :param hypervisor: in instance of :class:`Hypervisor` - - ''' - def get_pools(self): - ''' - Returns a dict of storage pools bound to the host - ''' - pass - - def get_volume_names(self, pool=None): - ''' - Returns volume names stored in this pool or all pools - ''' - pass - - def add_pool(self, name, pool_type): - ''' - Add new storage pool - - :param name: the name of the storage pool to add - :type name: :class:`str` - :param pool_type: The type of the pool : LVM Group, Phisical Disk, - NFS Server ... - ''' - pass - - def del_pool(self, name): - ''' - Delete a storage pool - ''' - pass - - def add_volume(self, pool, name, space): - ''' - Adds a volume to the specified pool - - :param pool: the pool in which to create the volume - :type pool: :class:`virStoragePool` - :param name: name of the new volume - :type name: :class:`str` - :param space: size of the new volume in gigabytes - :type space: :class:`int` - ''' - pass - - def del_volume(self, pool, name, wipe=False): - ''' - Deletes a volume in the specified pool - - :param pool: the pool in which delete the volume - :type pool: :class:`virStoragePool` - :param name: the name of the volume - :type name: :class:`str` - ''' - pass - - def find_space(self, new_space): - ''' - Tries to find a suitable chunk of space for volume allocation. - - :param new_space: a space size in gigabytes - :type new_space: :class:`str` - :return: a :class:`tuple` of best location or :class:`False` if no - pool is suitable - ''' - - def get_pool_name(self, pool): - ''' - Returns the name of this pool - - :param pool: the storage pool name - :type pool: libvirt.`virStoragePool` - :return: :class:`str` name of the pool - ''' - - def get_pool_state(self, pool): - ''' - Returns the running state of the pool - - :param pool: the storage pool name - :type pool: libvirt.`virStoragePool` - ''' - - def get_pool_space_total(self, pool): - ''' - Returns the storage capacity of this pool in bytes - - :param pool: the pool to get information from - :type pool: :class:`virStoragePool` - :return: :class:`int` of capacity in bytes - ''' - - def get_pool_space_available(self, pool): - ''' - Returns available space of this storage pool in bytes - - :param pool: the pool to get information from - :type pool: :class:`virStoragePool` - :return: :class:`int` of available free space in bytes - ''' - - def get_pool_space_used(self, pool): - ''' - Returns current storage pool usage in bytes - - :param pool: the pool to get information from - :type pool: :class:`virStoragePool` - :return: :class:`int` of used space in bytes - ''' - - def get_volume_path(self, pool, vol_name): - ''' - Returns the file path to the volume - - :param pool: the storage pool containing this volume - :type pool: libvirt.`virStoragePool` - :param volume_name: name of the pool volume - :type volume_name: :class:`str` - :return: :class:`str` path to the volume file - ''' - - def get_volume_allocation(self, pool, vol_name): - ''' - Returns the pool space used by this volume in bytes - - :param pool: the pool containing this volume from - :type pool: :class:`virStoragePool` - :param vol_name: name of the volume to query - :type vol_name: :class:`str` - :return: :class:`int` of allocation in bytes - ''' - - def get_volume_capacity(self, pool, vol_name): - ''' - Returns the capacity (usable space) of this volume in bytes - - :param pool: the pool containing this volume from - :type pool: :class:`virStoragePool` - :param vol_name: name of the volume to query - :type vol_name: :class:`str` - :return: :class:`int` of capacity in bytes - ''' diff --git a/ccnode/kvm.py b/ccnode/kvm.py index b9e881f..fa39609 100644 --- a/ccnode/kvm.py +++ b/ccnode/kvm.py @@ -1,26 +1,15 @@ -''' -All KVM technology specefic functions are implemented inside this module -Two entites belong to this module: a Kvm hypervisor or a Kvm virtual machine -''' - - -from libvirtwrapper import * -from exceptions import * -import subprocess -import hashlib +from libvirtwrapper import LibvirtHypervisor class KvmHypervisor(LibvirtHypervisor): ''' Base class of a Kvm Hypervisor - This class have a attribute hv_type for tagging purposes ''' _instance = None def __init__(self): super(KvmHypervisor, self).__init__('kvm') - self.hv_type = 'kvm' - + def __new__(cls, *args, **kwargs): ''' .. note:: diff --git a/ccnode/libvirtwrapper.py b/ccnode/libvirtwrapper.py index ee8dd5d..1638ed1 100644 --- a/ccnode/libvirtwrapper.py +++ b/ccnode/libvirtwrapper.py @@ -1,939 +1,456 @@ -#TODO: vm informations gathering !!! import libvirt -import sys -# we use psutils to get host informations we can't get with -# libvirt -import psutil -import multiprocessing -import os import re -from interface import * -from exceptions import * -from time import sleep +import psutil import xml.dom.minidom +from time import sleep +from common import Hypervisor, VM, Storage, StoragePool, StorageVolume -### -# Defined constants -### - -## -# Libvirt -## -# KVM_LIBVIRT_SESSION = 'qemu:///system' XEN_LIBVIRT_SESSION = 'xen:///' -######### -## States -######### - -# Hypervisor - -HV_ARCH_MAP = { - 'i686' : 'x86', - 'x86_64' : 'x64', -} - -# Virtual Machines - -VM_LIBVIRT_STATUS = ( - 'No state', - 'Running', - 'Blocked on resource', - 'Paused', - 'Shutting down ...', - 'Shutdown', - 'Crashed' - ) - -VM_STATUS = { - 0 : 'stopped', - 1 : 'running', - 2 : 'running', - 3 : 'paused', - 4 : 'running', - 5 : 'shutdown', - 6 : 'shutdown', -} - -VM_START_STATES = { - 'running': 1, - 'paused' : 0 - } - -# Storage Pools - -POOL_STATE = ( - 'Not Running', - 'Initializing pool, not available', - 'Running normally', - 'Running degraded', - 'Running, but not accessible' - ) - - -######################### -## Pool informations tags -######################### - -POOL_NAME = 'pool' -POOL_STATUS = 'pstatus' -POOL_TOTAL_SIZE = 'psize' -POOL_FREE_SPACE = 'pfree_space' -POOL_USED_SPACE = 'pused_space' -POOL_NUM_VOLUMES = 'num_vols' -POOL_BACKEND = 'backend' - -DEFAULT_VM_START = VM_START_STATES['running'] - -## -# Pretty size outputs -## - MEGABYTE_DIV = 1024 * 1024 GIGABYTE_DIV = 1024 * 1024 * 1024 KILOBYTE_DIV = 1024 -class LibvirtVm(VM): - ''' - Libvirt api access to Virtual Machine actions - All libvirt.virDomain calls must stay in this class - :param domain: a libvirt domain instance - :type domain: :class:`libvirt.virDomain` - :param hypervisor: a hypervisor reference object - :type hypervisor: :class:`LibvirtHypervisor` - ''' - _print_header = True +#### hypervisor - def __init__(self, domain, hypervisor): - ''' - ''' - super(LibvirtVm, self).__init__() - if isinstance(domain, libvirt.virDomain): - self._domain = domain - self.vm_info = {} - self._find_pid() +class LibvirtHypervisor(Hypervisor): + + def __init__(self, hv_type): + try: + if hv_type == 'kvm': + self._lvcon_handle = libvirt.open(KVM_LIBVIRT_SESSION) + else: + raise NotImplemented('Unknown hypervisor type') + except libvirt.libvirtError as error: + raise HypervisorError('libvirt cannot connect to hypervisor') + + self._hv_type = hv_type + self._sto_handle = LibvirtStorage(self) + self._vm_cache_running = {} + self._vm_cache_defined = {} + self._vm_cache = {} + + def _vm_cache_rebuild(self): + self._vm_cache_running = {} + self._vm_cache_defined = {} + self._vm_cache = {} + + for dom_id in self._lvcon_handle.listDomainsID(): + vm = LibvirtVm(self, self._lvcon_handle.lookupByID(dom_id)) + self._vm_cache_running[vm.get_name()] = vm + for dom_name in self._lvcon_handle.listDefinedDomains(): + vm = LibvirtVm(self, self._lvcon_handle.lookupByName(dom_name)) + self._vm_cache_defined[vm.get_name()] = vm + + self._vm_cache = self._vm_cache_running + self._vm_cache.update(self._vm_cache_defined) + + def get_hv_type(self): + return self._hv_type + + def get_hv_version(self): + return self._lvcon_handle.getVersion() + + def get_cpu_core(self): + return int(self.get_cpu() / self.get_cpu_threads()) + + def get_cpu_threads(self): + return self._lvcon_handle.getInfo()[7] + + def get_cpu_frequency(self): + return self._lvcon_handle.getInfo()[3] + + def storage(self): + return self._sto_handle + + def vm_list(self): + if not self._vm_cache: + self._vm_cache_rebuild() + return self._vm_cache.keys() + + def vm_list_running(self): + if not self._vm_cache_running: + self._vm_cache_rebuild() + running = [] + for vm_name in self._vm_cache_running: + vm = self.vm_get(vm_name) + if vm.is_active(): + running.append(vm_name) + return running + + def vm_list_stopped(self): + if not self._vm_cache_defined: + self._vm_cache_rebuild() + return self._vm_cache_defined.keys() + + def vm_list_paused(self): + if not self._vm_cache_running: + self._vm_cache_rebuild() + paused = [] + for vm_name in self._vm_cache_running: + vm = self.vm_get(vm_name) + if vm.is_paused(): + paused.append(vm_name) + return paused + + def vm_get(self, name): + if name in self.vm_list(): + return self._vm_cache[name] else: - raise TypeError('Need virDomain object given %s' % type(domain)) - self.set_hypervisor(hypervisor) + raise Exception() - def __cmp__(self, other): - ''' - We overload the compare method so that two virtual machines are - identical if they are bound to the same domain object - ''' - return cmp(self._domain, other._domain) - def _find_pid(self): - ''' - Finds the PID of the current vm - ''' - result = find_process_id(self._domain.UUIDString()) - if result: - self._pid = int(result.pop()) - else: - self._pid = None - - def get_pid(self): - return self._pid +#### storage - def set_hypervisor(self, hypervisor): +class LibvirtStorage(Storage): + + def __init__(self, hypervisor): if isinstance(hypervisor, LibvirtHypervisor): - self._hypervisor = hypervisor + self._hv_handle = hypervisor else: - raise TypeError('argument type error') - - def get_name(self): - return self._domain.name() - - def get_hv_name(self): - return self._hypervisor.get_name() + raise TypeError('Expected `%s` given `%s`' % (LibvirtHypervisor, + hypervisor)) + + self._pool_cache_running = {} + self._pool_cache_defined = {} + self._pool_cache = {} + + def _pool_cache_rebuild(self): + self._pool_cache_running = {} + self._pool_cache_defined = {} + self._pool_cache = {} + + for name in self._hv_handle._lvcon_handle.listStoragePools(): + pool = LibvirtStoragePool(self, + self._hv_handle._lvcon_handle.storagePoolLookupByName(name)) + self._pool_cache_running[pool.get_name()] = pool + for name in self._hv_handle._lvcon_handle.listDefinedStoragePools(): + pool = LibvirtStoragePool(self, + self._hv_handle._lvcon_handle.storagePoolLookupByName(name)) + self._pool_cache_defined[pool.get_name()] = pool + + self._pool_cache = self._pool_cache_running + self._pool_cache.update(self._pool_cache_defined) - def get_mem_used(self): - return self._domain.info()[2] * KILOBYTE_DIV + def pool_list(self): + if not self._pool_cache: + self._pool_cache_rebuild() + return self._pool_cache.keys() - def get_mem(self): - return self._domain.info()[1] * KILOBYTE_DIV + def pool_get(self, name): + if name in self.pool_list(): + return self._pool_cache[name] + else: + raise Exception() - def get_mem_free(self): - return (self.get_total_mem() - self.get_used_mem()) * KILOBYTE_DIV + def capacity(self): + capacity = 0 + for pool_name in self.pool_list(): + pool = self.pool_get(pool_name) + capacity += pool.get_space_capacity() + return capacity - def get_cpu(self): - return self._domain.info()[3] + def find_volumes(self, path=None, name=None): + volumes = [] + if path is not None or name is not None: + for pool_name in self.pool_list(): + pool = self.pool_get(pool_name) + for vol_name in pool.volume_list(): + vol = pool.volume_get(vol_name) + if (path is not None and vol.get_path() == path) \ + or (name is not None and vol.get_name() == name): + volumes.append(vol) + return volumes + + +class LibvirtStoragePool(StoragePool): - def get_cpu_percent(self): - self._find_pid() - if self._pid: - p = psutil.Process(self._pid) - sleep(0.7) - res = int(p.get_cpu_percent()) - res = 100 if res > 100 else res - return res + def __init__(self, storage, libvirt_pool): + if isinstance(storage, LibvirtStorage): + self._sto_handle = storage else: - return 0 - - def get_disks(self): - result = [] - # we parse xml description of the vm to get disk list - xml_string = self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE) - root = xml.dom.minidom.parseString(xml_string) - domain = root.getElementsByTagName('domain').pop() - devices = domain.getElementsByTagName('devices').pop() - # iter on "disk" devices - for disk in devices.getElementsByTagName('disk'): - # skip weird disks - if disk.getAttribute('device') == 'disk': - d = {} - # get type - dtype = disk.getAttribute('type') - # get disk path - if dtype == 'file': - d['path'] = disk.getElementsByTagName('source').pop().getAttribute('file') - elif dtype == 'block': - d['path'] = disk.getElementsByTagName('source').pop().getAttribute('dev') - # attributes only resolvable in the storage pool - if dtype in ['file', 'block']: - sto = self._hypervisor._storage - for pool_name, pool in sto.get_pools().iteritems(): - for vol_name in sto.get_volume_names(pool): - if sto.get_volume_path(pool, vol_name) == d['path']: - d['pool'] = pool_name - d['vol'] = vol_name - d['size'] = sto.get_volume_capacity(pool, vol_name) - result.append(d) - return result - - def get_nics(self): - result = [] - xml_string = self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE) - xroot = xml.dom.minidom.parseString(xml_string) - xdomain = xroot.getElementsByTagName('domain').pop() - xdevices = xdomain.getElementsByTagName('devices').pop() - # iter on "interface" devices - for iface in xdevices.getElementsByTagName('interface'): - d = {} - # get type - try: - d['type'] = iface.getAttribute('type') - except: - pass - # get hardware address - try: - xmac = iface.getElementsByTagName('mac').pop() - d['hwaddr'] = xmac.firstChild.nodeValue - except: - pass - # get target interface - try: - xtarget = iface.getElementsByTagName('target').pop() - d['target'] = xtarget.getAttribute('dev') - except: - pass - result.append(d) - return result + raise TypeError('Expected `%s` given `%s`' % (LibvirtStorage, + storage)) + if isinstance(libvirt_pool, libvirt.virStoragePool): + self._lvpool_handle = libvirt_pool + else: + raise TypeError('Expected `%s` given `%s`' % (libvirt.virStoragePool + , libvirt_pool)) + + self._vol_cache = {} - def get_arch(self): - result = None + def _vol_cache_rebuild(self): + self._vol_cache = {} + if self._lvpool_handle.isActive(): + for name in self._lvpool_handle.listVolumes(): + vol = LibvirtStorageVolume(self, + self._lvpool_handle.storageVolLookupByName(name)) + self._vol_cache[vol.get_name()] = vol + + def get_name(self): + name = None try: - xml_string = self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE) - xroot = xml.dom.minidom.parseString(xml_string) - xdomain = xroot.getElementsByTagName('domain').pop() - xos = xdomain.getElementsByTagName('os').pop() - xtype = xos.getElementsByTagName('type').pop() - arch = xtype.getAttribute('arch') - if arch in HV_ARCH_MAP: - result = HV_ARCH_MAP[arch] - except: + name = self._lvpool_handle.name() + except libvirtError: pass - return result - - def get_status(self): - return VM_STATUS[self._domain.info()[0]] + return name - def shutdown(self): + def get_space_capacity(self): try: - self._domain.shutdown() - except libvirt.libvirtError: - raise VMError('%s is not running !' % self.get_name()) + return self._lvpool_handle.info()[1] + except libvirt.libvirtError as e: + raise StoragePoolError("can't get pool information (%s)" % e) - def force_poweroff(self): + def get_space_free(self): try: - self._domain.destroy() - except libvirt.libvirtError: - raise VMError('%s is not running !' % self.get_name()) + return self._lvpool_handle.info()[3] + except libvirt.libvirtError as e: + raise StoragePoolError("can't get pool information (%s)" % e) - def start(self): + def get_space_used(self): try: - self._domain.create() - except libvirt.libvirtError: - raise VMError('%s is already running !' % self.get_name()) + return self._lvpool_handle.info()[2] + except libvirt.libvirtError as e: + raise StoragePoolError("can't get pool information (%s)" % e) - def suspend(self): - try: - self._domain.suspend() - except libvirt.libvirtError: - raise VMError('%s is not running !' % self.get_name()) + def volume_list(self): + if not self._vol_cache: + self._vol_cache_rebuild() + return self._vol_cache.keys() + + def volume_get(self, name): + if name in self.volume_list(): + return self._vol_cache[name] + else: + raise Exception() + + +class LibvirtStorageVolume(StorageVolume): + + def __init__(self, pool, libvirt_vol): + if isinstance(pool, LibvirtStoragePool): + self._pool_handle = pool + else: + raise TypeError('Expected `%s` given `%s`' % (LibvirtStoragePool, + pool)) + if isinstance(libvirt_vol, libvirt.virStorageVol): + self._lvvol_handle = libvirt_vol + else: + raise TypeError('Expected `%s` given `%s`' % (libvirt.virStorageVol, + libvirt_vol)) - def resume(self): + def get_name(self): + name = None try: - self._domain.resume() - except libvirt.libvirtError: - raise VMError('%s is not running !' % self.get_name()) + name = self._lvvol_handle.name() + except libvirtError: + pass + return name - def get_uuid(self): - ''' - Returns the uuid string of the vm - ''' - return self._domain.UUIDString() + def get_space_capacity(self): + capacity = None + try: + capacity = self._lvvol_handle.info()[1] + except libvirtError: + pass + return capacity - def __str__(self): - ''' - LibvirtVm object string representation - ''' - header = '%-10s | %-10s | %-10s | %-15s | %-10s\n\n' %\ - ('Name', 'UsedMem', 'VCpu', 'CpuUsage', 'Status') - self._fetch_memory_stats() - self.get_cpu_stats() - stats = '%-10s | %-10s | %-10s | %-15s | %-10s' %\ - (self.get_name(), self.used_memory, self.vcpu, self.cpu_use, - self.get_status()) - return header + stats - - -class LibvirtHypervisor(Hypervisor): - ''' - Libvirt api access to Hypervisor actions. - All libvirt calls to the hypervisor stay in this class - - :param hv_type: This is the hypervisor type, this parameter is - important as it specifies the hypervisor type used when - connecting to libvirt. - :type hv_type: :class:`str` - - .. note:: - hv_type can be any of kvm, xen, ... , other hypervisors, however only - one connection at a time is allowed - ''' - - def __init__(self, hv_type): - #FIXME: don't use hard coded hypervisor type in URI + def get_space_allocation(self): + allocated = None try: - if hv_type == 'kvm': - self._con_handle = libvirt.open(KVM_LIBVIRT_SESSION) - else: - raise NotImplemented('or unknown hypervisor type') - except libvirt.libvirtError as error: - raise HypervisorError(error, 'libvirt cannot connect to hypervisor') - #build storage objs - self._storage = LibvirtHVStorage(self) - #build vm objects - self._build_vm_list() - self.hv_info = {} - self.hv_type = hv_type - self.st_handle = LibvirtHVStorage(self) - - def get_name(self): - ''' - Returns hypervisor's name - ''' - return self._con_handle.getHostname() - - def get_hv_type(self): - return self.hv_type - - def get_hv_version(self): - return str(self._con_handle.getVersion()) - - def get_uname(self): - return ' '.join(os.uname()) + allocated = self._lvvol_handle.info()[2] + except libvirtError: + pass + return allocated - def get_uptime(self): - result = None + def get_path(self): + path = None try: - f = open("/proc/uptime") - data = f.read().split() - f.close() - result = str(int(float(data[0]))) - except: + path = self._lvvol_handle.path() + except libvirtError: pass - return result + return path - def get_loadavg(self): - result = None + def wipe(self): try: - result = ' '.join('%.2f' % load for load in os.getloadavg()) - except: + self._lvvvol_handle.wipe(0) + except libvirt.libvirtError: pass - return result - def _build_vm_list(self): - ''' - Builds the lists of all defined vms (running and offline) in the - current hypervisor - ''' - self._vm_list_running = {} - self._vm_list_offline = {} - self._vm_list = {} - - self._doms_running_ids = self._con_handle.listDomainsID() - self._doms_defined_names = self._con_handle.listDefinedDomains() - - for dom in self._doms_running_ids: - vm = LibvirtVm(self._con_handle.lookupByID(dom), self) - self._vm_list_running[vm.get_name()] = vm - - for dom in self._doms_defined_names: - vm = LibvirtVm(self._con_handle.lookupByName(dom), self) - self._vm_list_running[vm.get_name()] = vm - - self._vm_list = self._vm_list_running - self._vm_list.update(self._vm_list_offline) - - def get_arch_type(self): - ''' - Get archetcture type of hypervisor - ''' - result = None - arch = self._con_handle.getInfo()[0] - if arch in HV_ARCH_MAP: - result = HV_ARCH_MAP[arch] - return result - def get_nb_cpu(self): - ''' - Returns number of active cpus in hypervisor - ''' - return self._con_handle.getInfo()[2] +#### vm - def get_cpu_frequency(self): - ''' - Returns the frequency in MHZ of cpus in the hypervisor - ''' - return str(self._con_handle.getInfo()[3]) - - def get_nb_threads(self): - ''' - Number of threads per core - ''' - return self._con_handle.getInfo()[7] +class LibvirtVm(VM): - def get_mem(self): - return str(psutil.avail_phymem() + psutil.used_phymem()) + ARCH = { + 'i686' : 'x86', + 'x86_64' : 'x64', + } - def get_mem_free(self): - return str(psutil.avail_phymem()) - - def get_mem_used(self): - return str(psutil.used_phymem()) + STATUS = ( + 'No state', + 'Running', + 'Blocked on resource', + 'Paused', + 'Shutting down ...', + 'Shutdown', + 'Crashed', + ) + STATUS_STOPPED = [0, 5, 6] + STATUS_RUNNING = [1, 2 , 3, 4] + STATUS_PAUSED = [3] - def get_cpu_percent(self): - return str(int(psutil.cpu_percent())) + def __init__(self, hypervisor, domain): + super(LibvirtVm, self).__init__() + if isinstance(domain, libvirt.virDomain): + self._domain = domain + else: + raise TypeError('Need virDomain object given %s' % type(domain)) + + self._hv_handle = hypervisor + self._find_pid() - def get_cpu(self): - return str(multiprocessing.cpu_count()) + def _find_pid(self): + result = find_process_id(self.get_uuid()) + if result: + self._pid = int(result.pop()) + else: + self._pid = None - def get_disks(self): - result = {} + def hypervisor(self): + return self._hv_handle + + def power_on(self): try: - re_pattern = re.compile(r'([sh]d[a-z]+)') - found = [bd for bd in os.listdir('/sys/block/') if re_pattern.match(bd)] - for disk in found: - fullname = os.path.join('/sys/block', disk, 'size') - size = int(open(fullname).read()) - if size > 0: - result[disk] = size * 512 - except: - pass - return result + self._domain.create() + except libvirt.libvirtError: + raise VMError('`%s` is already running' % self.get_name()) - def get_storage_pools(self): - return self._storage.get_pools().keys() + def power_off(self): + try: + self._domain.destroy() + except libvirt.libvirtError: + raise VMError('`%s` is not running' % self.get_name()) - def get_storage_capacity(self, pool=None): - capacity = 0 - if pool is None: - # for all pool available - for pool in self._storage.get_pools().iteritems(): - capacity = capacity + self._storage.get_pool_space_total(pool[1]) - else: - # get the capacity of a specific pool - capacity = self._storage.get_pool_space_total( - self._storage.get_pools()[pool]) - return str(capacity) - - def get_storage_used(self, pool=None): - used = 0 - if pool is None: - # for all pool available - for pool in self._storage.get_pools().iteritems(): - used = used + self._storage.get_pool_space_used(pool[1]) - else: - # get the used space of a specific pool - used = self._storage.get_pool_space_used( - self._storage.get_pools()[pool]) - return str(used) - - def get_storage_free(self, pool=None): - free = 0 - if pool is None: - # for all pool available - for pool in self._storage.get_pools().iteritems(): - free = free + self._storage.get_pool_space_free(pool[1]) - else: - # get the free space of a specific pool - free = self._storage.get_pool_space_available( - self._storage.get_pools()[pool]) - return str(free) + def power_shutdown(self): + try: + self._domain.shutdown() + except libvirt.libvirtError: + raise VMError('`%s` is not running' % self.get_name()) - def get_storage_type(self, pool_name): - result = None + def power_suspend(self): try: - pool = self._storage.get_pools()[pool_name] - xroot = xml.dom.minidom.parseString(pool.XMLDesc(0)) - xpool = xroot.getElementsByTagName('pool').pop() - result = xpool.getAttribute('type') - except: - pass - return result + self._domain.suspend() + except libvirt.libvirtError: + raise VMError('`%s` is not running' % self.get_name()) - def get_storage_path(self, pool_name): - result = None + def power_resume(self): try: - pool = self._storage.get_pools()[pool_name] - xroot = xml.dom.minidom.parseString(pool.XMLDesc(0)) - xpool = xroot.getElementsByTagName('pool').pop() - xtarget = xpool.getElementsByTagName('target').pop() - xpath = xtarget.getElementsByTagName('path').pop() - result = xpath.firstChild.nodeValue - except: + self._domain.resume() + except libvirt.libvirtError: + raise VMError('`%s` is not running' % self.get_name()) + + def is_active(self): + active = None + try: + active = self._domain.info()[0] in self.STATUS_RUNNING + except libvirt.libvirtError: pass - return result + return active - def get_storage_volumes(self, pool_name): - result = None + def is_paused(self): + paused = None try: - pool = self._storage.get_pools()[pool_name] - result = " ".join(self._storage.get_volume_names(pool)) - except: + paused = self._domain.info()[0] in self.STATUS_PAUSED + except libvirt.libvirtError: pass - return result - - def get_status(self): - raise NotImplementedError() - - def get_vm_names(self): - ''' - Returns a list of :class:`LibvirtVm` objects defining all - current defined vms in the hypervisor - ''' - return self._vm_list.keys() - - def get_vm(self, vm_name): - return self._vm_list[vm_name] + return paused - def get_vm_count(self): - return len(self._vm_list) + def get_uuid(self): + return self._domain.UUIDString() - def start_vm(self, vm_name, start_options=DEFAULT_VM_START): - ''' - Starts the vm identified by name - - :param name: The name of the virtual machine - :type nane: :class:`str` - - :param start_options: Options flags while starting vm - :type start_options: TODO reference to constants - - ''' - try: - self._vm_list[vm_name].start() - except KeyError: - raise VMError('Virtual machine %s not found: ' % vm_name) + def get_name(self): + return self._domain.name() - def stop_vm(self, vm_name, force=False): - ''' - Poweroff the specifed vm with the specified options - - :param name: the name of the vm - :type name: :class:`str` - ''' - try: - vm = self._vm_list[vm_name] - vm.force_poweroff() if force else vm.shutdown() - except KeyError: - raise VMError('Virtual machine %s not found: ' % vm_name) - - def suspend_vm(self, vm_name): - ''' - Suspends the specifed vm - - :param name: the name of the vm - :type name: :class:`str` - ''' - try: - self._vm_list[vm_name].suspend() - except KeyError: - raise VMError('Virtual machine %s not found: ' % vm_name) + def get_pid(self): + return self._pid - def resume_vm(self, vm_name): - ''' - Resumes the specifed vm - - :param name: the name of the vm - :type name: :class:`str` - ''' - try: - self._vm_list[vm_name].resume() - except KeyError: - raise VMError('Virtual machine %s not found: ' % vm_name) - - def local_execute(self, command): - ''' - Excecutes the command given in command on the local host - Returns a tuple with (stdout, stderr) from the process that has - been executed - - ..warning:: This is a dangerous function as it gives the command given - in paramter to a shell prompt, anything can then be executed locally - - ..warning:: If the command given is a long processing one, this may be - a deadlock to the node be carefull ! - - :param command: the command to execute with it's arguments - :type command: :class:`str` - ''' - #FIXME: stop using shell=true and parse arguments with shlex.split() - p = subprocess.Popen(command, shell=True, - bufsize=-1, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - result = p.communicate() - return result - - def export_vm(self, name, target, port): - ''' - Migrates the given vm (name) to (target) hypervisor which is already - waiting for the migration - - :param name: the name of the vm - :type name: :class:`str` - :param target: the destination hypervisor - :type target: :class:`str` - :param port: the port on the destination to use for the transfer - :type port: :class:`str` - ''' - # We will be using subprocess to pipe the volume using dd in a netcat - # connection to the destination - # We send a hash of the volume before so that the target can checksum - # and validate integrity. - # This algorithm is just a proof of concept of the cold migration - # process it is likely that it does not anticipates some problems - - buff = 1024 * 1024 * 10 #10 Mo - path = [vm.get_disk_path() for vm in self._vm_list - if vm.get_name() == name].pop() - dd_cmd = ['dd', 'if=%s' % path] - - dd_process = subprocess.Popen(dd_cmd, bufsize=-1, - stdout=subprocess.PIPE) - nc_process = subprocess.Popen(['nc', target, port], - bufsize=-1, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - #We send dd output to nc - dd_out = dd_process.stdout - nc_in = nc_process.stdin - m = hashlib.md5() - print 'going to pipe loop' - try: - read = dd_out.read(buff) - except Exception as err: - raise IOError('error while reading dd process output %s' % err) - try: - while len(read) != 0: - print len(read) - m.update(read) - nc_in.write(read) - nc_in.flush() - read = dd_out.read(buff) - except Exception as err: - raise IOError('Error occured while writing to nc process %s' % err) - nc_in.close() - print m.hexdigest() - - def get_network_conf(self): - raise NotImplementedError - - def get_storage_stats(self): - raise NotImplementedError - - -#TODO: finish storage class, make tests, -class LibvirtHVStorage(HVStorage): - ''' - Base storage class using libvirt api for storage managment - Storage pools here are libvirt.virStoragePool instances - ''' - def __init__(self, hypervisor): - if isinstance(hypervisor, LibvirtHypervisor): - self.hv_handle = hypervisor - self._fetch_st_pools() - else: - raise TypeError('Expected %s given %s' % (LibvirtHypervisor, - hypervisor)) - - def _fetch_st_pools(self): - ''' - Sets an attribute self._pool_objs containging a list of libvirt - pool objects bound to the hypervisor - ''' - pools = [] - pools.extend(self.hv_handle._con_handle.listDefinedStoragePools()) - pools.extend(self.hv_handle._con_handle.listStoragePools()) - self._pools = dict([(p, pool_obj(self.hv_handle._con_handle, p)) \ - for p in pools]) - - def get_pools(self): - ''' - Returns a dict of storage pools bound to the host - ''' - pools = {} - for pool_name, pool in self._pools.iteritems(): - if pool.isActive(): - pools[pool_name] = pool - return pools - - def get_volume_names(self, pool=None): - ''' - Returns volume names stored in this pool or all pools - ''' - volumes = [] - try: - if pool is None: - for pool in self._pools.iteritems(): - volumes.extend(pool[1].listVolumes()) - else: - volumes = pool.listVolumes() - except libvirt.libvirtError as e: - raise StorageError("Failed to get volume list (%s)" % e) - return volumes - - def add_volume(self, pool, name, space): - ''' - Adds a volume to the specified pool - - :param pool: the pool in which to create the volume - :type pool: :class:`str` - :param name: name of the new volume - :type name: :class:`str` - :param space: size of the new volume in gigabytes - :type space: :class:`int` - ''' - xml_desc = """ - - %(vol_name)s - %(vol_size)u - - """ % { - "vol_name" : name, - "vol_size" : space * GIGABYTE_DIV, - } - try: - self._pools[pool].createXML(xml_desc, 0); - except libvirt.libvirtError as e: - raise StorageError("Failed to create the volume (%s)" % e) - - def del_volume(self, pool, name, wipe=False): - ''' - Deletes a volume in the specified pool - - :param pool: the pool in which delete the volume - :type pool: :class:`str` - :param name: the name of the volume - :type name: :class:`str` - ''' - try: - vol = pool.storageVolLookupByName(name) - except libvirt.libvirtError as e: - raise StorageError("Volume not found (%s)" % e) - if wipe: - try: - vol.wipe(0) - except libvirt.libvirtError as e: - raise StorageError("Failed to wipe volume, data integrity" - " is unknown (%s)" % e) - try: - vol.delete(0) - except libvirt.libvirtError as e: - raise StorageError("Failed to delete the volume, but it may be" - " wiped (%s)" % e) - - def find_space(self, new_space): - ''' - Tries to find a suitable chunk of space for volume allocation. - - :param new_space: a space size in gigabytes - :type new_space: :class:`int` - :return: a :class:`tuple` of best location or :class:`False` if no - pool is suitable - ''' - for pool in self._pools.iteritems(): - try: - # FIXME, crappy overhead delta - if new_space * GIGABYTE_DIV <= pool[1].info()[3] - MEGABYTE_DIV: - return (pool) - except libvirt.libvirtError as e: - raise StorageError("Can't get pool informations (%s)" % e) - return False - - def get_pool_name(self, pool): - ''' - Returns the name of this pool - - :param pool: the storage pool name - :type pool: libvirt.`virStoragePool` - :return: :class:`str` name of the pool - ''' - try: - return pool.name() - except libvirt.libvirtError as e: - raise StorageError("Can't get pool name (%s)" % e) - - def get_pool_state(self, pool): - ''' - Returns the running state of the pool - - :param pool: the storage pool name - :type pool: libvirt.`virStoragePool` - ''' + def get_arch(self): + arch = None try: - return POOL_STATE[pool.info()[0]] - except libvirt.libvirtError as e: - raise StorageError("Can't get pool state (%s)" % e) + xroot = xml.dom.minidom.parseString( + self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE)) + xdomain = xroot.getElementsByTagName('domain').pop() + xos = xdomain.getElementsByTagName('os').pop() + xtype = xos.getElementsByTagName('type').pop() + xarch = xtype.getAttribute('arch') + if xarch in self.ARCH: + arch = self.ARCH[xarch] + except: + pass + return arch - def get_pool_space_total(self, pool): - ''' - Returns the storage capacity of this pool in bytes - - :param pool: the pool to get information from - :type pool: :class:`virStoragePool` - :return: :class:`int` of capacity in bytes - ''' - try: - return pool.info()[1] - except libvirt.libvirtError as e: - raise StorageError("Can't get pool informations (%s)" % e) + def get_cpu_core(self): + return self._domain.info()[3] - def get_pool_space_available(self, pool): - ''' - Returns available space of this storage pool in bytes - - :param pool: the pool to get information from - :type pool: :class:`virStoragePool` - :return: :class:`int` of available free space in bytes - ''' - try: - return pool.info()[2] - except libvirt.libvirtError as e: - raise StorageError("Can't get pool informations (%s)" % e) + def get_cpu_usage(self): + usage = None + if self._pid is not None: + try: + p = psutil.Process(self._pid) + sleep(0.2) + usage = p.get_cpu_percent() + except: + pass + return usage - def get_pool_space_used(self, pool): - ''' - Returns current storage pool usage in bytes - - :param pool: the pool to get information from - :type pool: :class:`virStoragePool` - :return: :class:`int` of used space in bytes - ''' - try: - return pool.info()[3] - except libvirt.libvirtError as e: - raise StorageError("Can't get pool informations (%s)" % e) + def get_mem(self): + return self._domain.info()[1] * KILOBYTE_DIV - def get_volume_path(self, pool, vol_name): - ''' - Returns the file path to the volume - - :param pool: the storage pool containing this volume - :type pool: libvirt.`virStoragePool` - :param volume_name: name of the pool volume - :type volume_name: :class:`str` - :return: :class:`str` path to the volume file - ''' - try: - vol = pool.storageVolLookupByName(vol_name) - except libvirt.libvirtError as e: - raise StorageError("Can't find volume in pool (%s)" % e) - try: - return vol.path() - except libvirt.libvirtError as e: - raise StorageError("Volume has no path information (%s)" % e) - - def get_volume_allocation(self, pool, vol_name): - ''' - Returns the pool space used by this volume in bytes - - :param pool: the pool containing this volume from - :type pool: :class:`virStoragePool` - :param vol_name: name of the volume to query - :type vol_name: :class:`str` - :return: :class:`int` of allocation in bytes - ''' - try: - vol = pool.storageVolLookupByName(vol_name) - except libvirt.libvirtError as e: - raise StorageError("Volume not found (%s)" % e) - try: - return vol.info()[2] - except libvirt.libvirtError as e: - raise StorageError("Volume has no allocation information (%s)" % e) - - def get_volume_capacity(self, pool, vol_name): - ''' - Returns the capacity (usable space) of this volume in bytes + def get_mem_used(self): + return self._domain.info()[2] * KILOBYTE_DIV + + def get_mem_free(self): + return (self.get_mem() - self.get_mem_used()) * KILOBYTE_DIV - :param pool: the pool containing this volume from - :type pool: :class:`virStoragePool` - :param vol_name: name of the volume to query - :type vol_name: :class:`str` - :return: :class:`int` of capacity in bytes - ''' - try: - vol = pool.storageVolLookupByName(vol_name) - except libvirt.libvirtError as e: - raise StorageError("Volume not found (%s)" % e) + def get_volumes(self): + volumes = [] try: - return vol.info()[1] - except libvirt.libvirtError as e: - raise StorageError("Volume has no capacity information (%s)" % e) - - -#### Helper functions - -def map_process(process): - ''' - map each process object with it's pid and command line options - ''' - return (process.pid, process.cmdline) + xroot = xml.dom.minidom.parseString( + self._domain.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE)) + xdomain = xroot.getElementsByTagName('domain').pop() + xdevices = xdomain.getElementsByTagName('devices').pop() + # iter on "disk" devices + for xdisk in xdevices.getElementsByTagName('disk'): + try: + # disks we can handle + if xdisk.getAttribute('device') == 'disk': + # get type + d_type = xdisk.getAttribute('type') + # get backend path + if d_type == 'file': + d_path = xdisk.getElementsByTagName('source').pop()\ + .getAttribute('file') + elif d_type == 'block': + d_path = disk.getElementsByTagName('source').pop()\ + .getAttribute('dev') + # search the volume object + if d_type in ['file', 'block']: + volumes.append(self._hv_handle._sto_handle + .find_volumes(path=d_path).pop()) + except Exception as e: + print e + except: + pass + return volumes -def find_process_id(uuid): - ''' - Find the process id of a kvm process gevin an UUID of a virDomain object - Returns a list of one process id - ''' - return [p.pid for p in psutil.get_process_list() if - p.cmdline.__contains__(uuid)] -def pool_obj(con_handle, pool_name): - ''' - Creates a libvirt pool object given the name of the pool +#### helpers - :param con_handle: libvirt connection handle - :type con_handle: :class:`libvirt.virConnect` - ''' - return con_handle.storagePoolLookupByName(pool_name) +def find_process_id(cmd_subchain): + return [p.pid for p in psutil.get_process_list() + if p.cmdline.__contains__(cmd_subchain)] diff --git a/debian/control b/debian/control index 1f91675..bab7873 100644 --- a/debian/control +++ b/debian/control @@ -8,7 +8,7 @@ Standards-Version: 3.9.1 Package: cc-node Architecture: all -Depends: ${misc:Depends}, ${python:Depends}, python-sjrpc ( >= 6-1 ), python-psutil +Depends: ${misc:Depends}, ${python:Depends}, python-sjrpc ( >= 8 ), python-psutil, dmidecode Recommends: python-libvirt XB-Python-Version: ${python:Versions} Description: CloudControl node diff --git a/etc/cc-node.conf b/etc/cc-node.conf index 7e6f57a..344886a 100644 --- a/etc/cc-node.conf +++ b/etc/cc-node.conf @@ -1,15 +1,19 @@ [node] -# The target address and port of the CloudControl server: +# address and port of the CloudControl server address = #port = 1984 -# The login and password of the CloudControl account +# account created for this node on the server login = password = -# Verbosity level 0-3: +# logging verbosity level 0-3 verbosity = -# Enable hypervisor detection and connection +# hypervisor detection and connection, set to 'no' on regular hosts or when +# the node should not attempt to use local hypervisors detect_hypervisor = yes + +# allow remote command execution (or host shutdown/reboot) +command_execution = yes -- GitLab