Commit 64e152fe authored by Thibault VINCENT's avatar Thibault VINCENT
Browse files

Huge rewrite, new tags, better node support on non-hv hosts

parent 49741d52
Loading
Loading
Loading
Loading
+7 −4
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@ DEFAULT_CONFIGURATION = {
    'verbosity': '0',
    'detect_hypervisor': 'yes',
    #'ssl_cert': '',
    'command_execution' : 'yes',
}

MAX_AUTH_TIMEOUT = 10
@@ -62,7 +63,8 @@ def run_node(options):
    
    try:
        node = CCNode(options['address'], int(options['port']),
                      options['detect_hypervisor'] == 'yes')
                      options['detect_hypervisor'] == 'yes',
                      options['command_execution'] == 'yes')
    except Exception as err:
        logging.critical('Starting fail: %s' % err)
        return
@@ -104,6 +106,7 @@ def run_node(options):
        logging.critical('Node failed: %s' % err)
        return


if __name__ == '__main__':

    usage = 'usage: %prog [OPTION...]'
@@ -117,7 +120,7 @@ if __name__ == '__main__':
    
    cliopts, args = op.parse_args()
    
    # Reading the config file:
    # read the config file
    config = ConfigParser.SafeConfigParser()
    config.read(cliopts.config)
    try:
@@ -127,7 +130,7 @@ if __name__ == '__main__':
                         "in '%s'\n" % cliopts.config)
        sys.exit(1)
    
    # Applying default config file options:
    # applying default config file options
    for opt, default in DEFAULT_CONFIGURATION.iteritems():
        if opt not in options or not options[opt]:
            if default is None:
+2 −58
Original line number Diff line number Diff line
@@ -2,65 +2,9 @@
#coding:utf-8

"""
This API abstracts all calls to any hypervisor type.

This API is split in four parts:
 * Global Interface
 * Libvirt Wrapper
 * KVM, XEN, ... hypervisor interface
 * Exceptions

Every entity in the hypervisor library has a common ancestor
:class:`interface.Host`.

The inheritance diagram shown below reflects the relationship between
each class.


.. graphviz::

    digraph {
        subgraph cluster_interface {
            "Host" [shape=box];
            "Hypervisor" [shape=box];
            "VM" [shape=box];
            "Host" -> "Hypervisor";
            "Host" -> "VM";
            label = "interface";
            style = filled;
            color = "#e6fbff";
        }
        subgraph cluster_libvirt {
            "LibvirtHypervisor" [shape=box];
            "LibvirtVm" [shape=box];
            "Hypervisor" -> "LibvirtHypervisor";
            "VM" -> "LibvirtVm";
            style=filled;
            color="#ffe6ea";
            label = "libvirt";
        }
        subgraph cluster_hypervisor_calls {
            "KvmHypervisor" [shape=box];
            "XenHypervisor" [shape=box];
            "LibvirtHypervisor" -> "KvmHypervisor";
            "LibvirtHypervisor" -> "XenHypervisor";
            label = "Hypervisor Calls";
            labelloc = "b";
            style = filled;
            color = "#fffcc9";
        }
        subgraph cluster_vm_calls {
            "KvmVm" [shape=box];
            "XenVm" [shape=box];
            "LibvirtVm" -> "KvmVm";
            "LibvirtVm" -> "XenVm";
            label = "Virtual Machine Calls";
            labelloc = "b";
            style = filled;
            color = "#fffcc9";
        }
    }
TODO : rewrite based on older doc

"""

__version__ = 7
 No newline at end of file
__version__ = '8~dev+revamp'
+6 −4
Original line number Diff line number Diff line

import socket
import threading
import ssl
@@ -5,16 +6,17 @@ import logging
from sjrpc.client import SimpleRpcClient
from sjrpc.utils import ConnectionProxy
from sjrpc.core import RpcError
from ccnodehandlers import *
import handlers


class CCNode(object):
    '''
    This class handles node initialization and connection and authentication
    Handle node initialization, connection to server, and authentication
    '''
    def __init__(self, server, port, hypervisor, cert=None):
    def __init__(self, server, port, hypervisor, exec_cmd, cert=None):
        self.manager = SimpleRpcClient.from_addr(server, port, enable_ssl=True,
                                default_handler=NodeHandler(self, hypervisor))
                default_handler=handlers.NodeHandler(self, hypervisor,
                                                                    exec_cmd))
        self.server = ConnectionProxy(self.manager)

    def run(self):

ccnode/ccnodehandlers.py

deleted100644 → 0
+0 −386
Original line number Diff line number Diff line
# Gerer le cas ou le mdp n'est pa valable
#TODO: catch livbirt disconnection errors and handle it 


from __init__ import __version__
from interface import Host
from kvm import *
from sjrpc.utils import RpcHandler
from sjrpc.utils import pure
from sjrpc.core import RpcError
from exceptions import VMError
import logging

############
## Constants
############

# Logging errors

TAG_METHOD_ERROR = 'No method found in hypervisor to handle tag %s'
TAG_NOT_FOUND_ERROR = 'Tag %s requested but unknown in node handler'

# Tags to method mappings

# Hypervisor

HV_TAG_WRAP_MAP = {
    'h'         : 'get_name',
    'hv'        : 'get_name',
    'hvtype'    : 'get_hv_type',
    'libvirtver': 'get_hv_version',
    'arch'      : 'get_arch_type',
    'uname'     : 'get_uname',
    'uptime'    : 'get_uptime',
    'load'      : 'get_loadavg',
    'cpu'       : 'get_cpu',
    'cpufreq'   : 'get_cpu_frequency',
    'cpuuse'    : 'get_cpu_percent',
    'status'    : 'get_status',
    'mem'       : 'get_mem',
    'memused'   : 'get_mem_used',
    'memfree'   : 'get_mem_free',
    'nvm'       : 'get_vm_count',
}

HV_TAG_HELPER_LIST = [
    'disk',
    'sto',
]

# Vm

VM_TAG_WRAP_MAP = {
    'h'         : 'get_name',
    'vm'        : 'get_name',
    #'hv'        : 'get_hv_name',
    'arch'      : 'get_arch',
    'cpu'       : 'get_cpu',
    'status'    : 'get_status',
    'mem'       : 'get_mem',
    'memused'   : 'get_memused',
    'memfree'   : 'get_memfree',
}

VM_TAG_HELPER_LIST = [
    'disk',
    'nic',
]


class NodeHandler(RpcHandler):
    '''
    This is the main node handler that exports the hypervisor
    capabilities to any connected server. In other terms this is
    the node interface to the server.
    '''

    def __init__(self, connection, hypervisor):
        super(RpcHandler, self).__init__()
        self._connection = connection
        #do not detect hypervisor if connection is disable in configuration
        if not hypervisor:
            logging.debug('Hypervisor detection disabled, running as regular'
              ' node')
            self.hv_handle = Host()
        else:
            #we set our hypervisor handle
            #FIXME: must be aware of hypervisor type
            logging.debug('Initializing a connection on the hypervisor')
            self.hv_handle = KvmHypervisor()
            logging.debug('Initialized connection to a KVM hypervisor')
    
    def _call_hv_method(self, method):
        '''
        Calls the given method in the :class:`Hypervisor` instance
        Returns the result of the method called

        :param method: the method to be called
        :type method: :class:`str`
        '''
        logging.debug('called _call_hv_method(%s)' % method)
        return getattr(self.hv_handle, HV_TAG_WRAP_MAP[method])()
    
    def _call_hv_helper(self, method):
        logging.debug('called _call_hv_helper(%s)' % method)
        return getattr(self, "_helper_hv_" + method)()
    
    def _call_vm_method(self, vm_name, tag):
        '''
        Calls the given method in vm instance
        Returns the result of the method called

        :param vm: The vm object in which call the method
        :vm type: :class:`VM`
        :param method: the method to be called
        :type method: :class:`str`
        '''
        logging.debug('called _call_vm_method(%s,%s)' % (vm_name, tag))
        vm_obj = self.hv_handle.get_vm(vm_name)
        return getattr(vm_obj, VM_TAG_WRAP_MAP[tag])()
    
    def _call_vm_helper(self, vm_name, helper):
        logging.debug('called _call_vm_helper(%s,%s)' % (vm_name, helper))
        vm_obj = self.hv_handle.get_vm(vm_name)
        return getattr(self, "_helper_vm_" + helper)(vm_obj)
    
    def _helper_hv_sto(self):
        logging.debug('called _helper_hv_sto()')
        result = {}
        hv = self.hv_handle
        # fetch pool list
        pools = hv.get_storage_pools()
        result['sto'] = " ".join(pools)
        # get pool info
        for pool in pools:
            # storage type
            result['sto%s_type' % pool] = hv.get_storage_type(pool)
            # path to the pool base
            result['sto%s_path' % pool] = hv.get_storage_path(pool)
            # pool size
            result['sto%s_size' % pool] = str(hv.get_storage_capacity(pool))
            # pool used space
            result['sto%s_used' % pool] = str(hv.get_storage_used(pool))
            # pool free space
            result['sto%s_free' % pool] = str(hv.get_storage_free(pool))
            # pool volume list
            result['sto%s_vol' % pool] = hv.get_storage_volumes(pool)
        return result
    
    def _helper_hv_disk(self):
        logging.debug('called _helper_hv_disk()')
        result = {}
        disks = self.hv_handle.get_disks()
        result['disk'] = " ".join(disks.keys())
        for disk, size in disks.iteritems():
            result['disk%s_size' % disk] = str(size)
        return result
    
    def _helper_vm_disk(self, vm):
        logging.debug('called _helper_vm_disk(%s)' % vm.get_name())
        result = {}
        # fetch disk list
        disks = vm.get_disks()
        logging.debug('_helper_vm_disk: disk list "%s"' % disks)
        if len(disks):
            result['disk'] = " ".join(str(id) for id in range(0, len(disks)))
            # add disk info
            for n, disk in enumerate(disks):
                # disk path to real storage
                result['disk%i_path' % n] = disk['path']
                # disk size
                result['disk%i_size' % n] = str(disk['size'])
                # disk pool
                result['disk%i_pool' % n] = disk['pool']
                # disk volume
                result['disk%i_vol' % n] = disk['vol']
        logging.debug('_helper_vm_disk returns "%s"' % result)
        return result
    
    def _helper_vm_nic(self, vm):
        logging.debug('called _helper_vm_nic(%s)' % vm.get_name())
        result = {}
        # fetch NIC list
        nics = vm.get_nics()
        logging.debug('_helper_vm_nic: NIC list "%s"' % nics)
        if len(nics):
            result['nic'] = " ".join(str(id) for id in range(0, len(nics)))
            # add NIC info
            for n, nic in enumerate(nics):
                try:
                    result['nic%i_type' % n] = nic['type']
                except:
                    pass
                try:
                    result['nic%i_hwaddr' % n] = nic['hwaddr']
                except:
                    pass
                try:
                    result['nic%i_source' % n] = nic['source']
                except:
                    pass
                try:
                    result['nic%i_target' % n] = nic['target']
                except:
                    pass
                try:
                    result['nic%i_tx' % n] = nic['tx']
                except:
                    pass
                try:
                    result['nic%i_rx' % n] = nic['rx']
                except:
                    pass
        logging.debug('_helper_vm_nic returns "%s"' % result)
        return result
    
    @pure
    def get_tags(self, tags=None):
        '''
        Return tags bound to hypervisor specified in tags parameter otherwise
        returns all

        :param tags: list of tags or None
        :type tags: :class:`list` 
        '''
        result = {}
        if tags is not None:
            tags = set(tags)
            tags |= set(('h', 'hv'))
        hv_tags = HV_TAG_WRAP_MAP.keys() if tags is None else tags
        # static tags
        result['version'] = str(__version__)
        # direct wrapping
        for tag in hv_tags:
            try:
                result[tag] = str(self._call_hv_method(tag))
            except AttributeError:
                logging.warning(TAG_METHOD_ERROR % tag)
            except KeyError:
                logging.warning(TAG_NOT_FOUND_ERROR % tag)
            except NotImplementedError:
                logging.debug('get_tags: no method in hypervisor to handle "%s"' % tag)
        # tag aggregations from wrapping helpers
        for helper in HV_TAG_HELPER_LIST:
            htags = self._call_hv_helper(helper)
            for tag, val in htags.iteritems():
                if tags is None or tag in tags:
                    result.update({tag:val})
        return result

    @pure
    def list_vm(self, vm_names=None, tags=None):
        '''
        Return a list of vm tags
        If vm_names is None or [] returns all vms
        If tags is None or [] returns only vm name tag

        :param vm_names: vm names
        :type vm_names: :class:`list` of strings
        :param tags: tags to be returned
        :type tags: :class:`list` of strings
        '''
        result = []
        if tags is not None:
            tags = set(tags)
            tags |= set(('h', 'vm'))
        vm_tags = VM_TAG_WRAP_MAP.keys() if tags is None else tags
         # iter on all VMs or only requested names
        for vm in self.hv_handle.get_vm_names() if vm_names is None else vm_names:
            vm_info = {}
            # direct wrapping
            for tag in vm_tags:
                try:
                    vm_info[tag] = str(self._call_vm_method(vm, tag))
                except AttributeError:
                    logging.warning(TAG_METHOD_ERROR % tag)
                except KeyError:
                    logging.warning(TAG_NOT_FOUND_ERROR % tag)
                except NotImplementedError:
                    logging.debug('list_vm: no method in vm to handle "%s"' % tag)
            # wrapping helpers
            for helper in VM_TAG_HELPER_LIST:
                htags = self._call_vm_helper(vm, helper)
                for tag, val in htags.iteritems():
                    if tags is None or tag in tags:
                        vm_info.update({tag:val})
            # save tags
            result.append(vm_info)
        
        return result

    @pure
    def stop_vm(self, vm_names=None, force=False):
        '''
        Stop the specified list of vm names
        This method do a soft shutdown on the Virtual Machine
        If vm_names is None all vms in hypervisor will be stopped

        :param vm_names: the list of vm names to stop
        :vm_names type: :class:`list` of strings
        :param force: soft shutown or force poweroff
        :type force: either True or False
        '''
        if vm_names is None:
            #fetch all vm names in hypervisor
            vm_names = self.hv_handle.get_vm_names()
        logging.debug('stop_vm: stopping vms %s' % vm_names)
        for vm in vm_names:
            try:
                self.hv_handle.stop_vm(vm, force)
            except VMError as err:
                logging.warning('Error while stopping %s: %s' % (vm, err))
            else:
                logging.info('stop_vm: vm %s stopping' % vm)

    @pure
    def start_vm(self, vm_names=None):
        '''
        Starts the specified list of vms
        If vm_names is None all vms in the hypervisor will be started
        
        :param vm_names: the list of vms to start
        :type vm_names: :class:`list` of strings
        '''
        if vm_names is None:
            vm_names = self.hv_handle.get_vm_names()
        logging.debug('start_vm: starting vms %s' % vm_names)
        for vm in vm_names:
            try:
                self.hv_handle.start_vm(vm)
            except VMError as err:
                logging.warning('Error while starting %s: %s' % (vm ,err))
            else:
                logging.info('start_vm: vm %s starting' % vm)

    @pure
    def suspend_vm(self, vm_names=None):
        '''
        Suspends the specifed list of vms
        If vm_names is None all vms in hypervisor will be suspended

        :param vm_names: the list of vms to suspend
        :type vm_names: :class:`list` of strings
        '''
        if vm_names is None:
            vm_names = self.hv_handle.get_vm_names()
        logging.debug('suspend_vm: suspending vms %s' % vm_names)
        for vm in vm_names:
            try:
                self.hv_handle.suspend_vm(vm)
            except VMError as err:
                logging.info('Error while suspending %s: %s' % (vm, err))
            else:
                logging.info('suspend_vm: vm %s suspended' % vm)
                
    @pure
    def resume_vm(self, vm_names=None):
        '''
        Resumes the specified list of vms
        If vm_names is None all vms in hypervisor will be resumed

        :param vm_names: the list of vms to resume
        :type vm_names: :class:`str`
        '''
        if vm_names is None:
            vm_names = self.hv_handle.get_vm_names()
        logging.debug('resume_vm: resuming vms "%s"' % vm_names)
        for vm in vm_names:
            try:
                self.hv_handle.resume_vm(vm)
            except VMError as err:
                logging.info('Error while resuming %s: %s' % (vm, err))
            else:
                logging.info('resume_vm: vm %s resumed' % vm)
            
    @pure
    def execute_command(self, command):
        '''
        Executes the given command on the local hypervisor

        :param command: the command to execute as it would be typed on a shell
            prompt
        :type command: :class:`str`
        '''
        result = self.hv_handle.local_execute(command)
        return result

ccnode/common.py

0 → 100644
+217 −0
Original line number Diff line number Diff line

import os
import re
import psutil
from subprocess import Popen, PIPE
from multiprocessing import cpu_count
from platform import platform, machine
from socket import gethostbyaddr, gethostname


class Host(object):
    '''
    '''


class LocalHost(Host):
    '''
    '''
    
    ARCH = {
        'i386' : 'x86',
        'i486' : 'x86',
        'i586' : 'x86',
        'i686' : 'x86',
        'x86_64' : 'x64',
    }
    
    def get_hw_serial(self):
        serial = None
        try:
            data = open('/sys/class/dmi/id/product_serial').read().strip()
            if data:
                serial = data
        except:
            pass
        return serial
    
    def get_hw_vendor(self):
        vendor = None
        try:
            data = open('/sys/class/dmi/id/sys_vendor').read().strip()
            if data:
                vendor = data
        except:
            pass
        return vendor
    
    def get_hw_product(self):
        product = None
        try:
            data = open('/sys/class/dmi/id/product_name').read().strip()
            if data:
                product = data
        except:
            pass
        return product
    
    def get_hw_bios(self):
        bios = ''
        try:
            bios_ver = open('/sys/class/dmi/id/bios_version').read().strip()
            bios_date = open('/sys/class/dmi/id/bios_date').read().strip()
            if bios_ver:
                bios += bios_ver
            if bios_date:
                bios += ' (%s)' % bios_date
            if not bios:
                bios = None
        except:
            pass
        return bios
    
    def get_name(self):
        result = None
        try:
            hostname = gethostname()
            fqdn = gethostbyaddr(hostname)[0]
            result = fqdn if fqdn else hostname
        except:
            pass
        return result
    
    def get_uname(self):
        uname = None
        try:
            data = ' '.join(os.uname())
            if data:
                uname = data
        except:
            pass
        return uname
    
    def get_platform(self):
        result = None
        try:
            p = platform()
            if p:
                result = p
        except:
            pass
        return result
    
    def get_uptime(self):
        uptime = None
        try:
            data = open("/proc/uptime").read().split()
            if data:
                uptime = int(float(data[0]))
        except:
            pass
        return uptime
    
    def get_loadavg(self):
        return ' '.join('%.2f' % load for load in os.getloadavg())
    
    def get_arch(self):
        arch = None
        try:
            a = machine()
            if a in self.ARCH:
                arch = ARCH[a]
        except:
            pass
        return arch
    
    def get_cpu(self):
        return cpu_count()
    
    def get_cpu_usage(self):
        return '%.1f' % psutil.cpu_percent()
    
    def get_mem(self):
        return psutil.avail_phymem() + psutil.used_phymem()
    
    def get_mem_free(self):
        return psutil.avail_phymem()
    
    def get_mem_used(self):
        return psutil.used_phymem()
    
    def get_disks(self):
        disks = {}
        try:
            re_pattern = re.compile(r'([sh]d[a-z]+)')
            found = [bd for bd in os.listdir('/sys/block/')
                                                        if re_pattern.match(bd)]
            for disk in found:
                fullname = os.path.join('/sys/block', disk, 'size')
                size = int(open(fullname).read())
                if size > 0:
                    disks[disk] = size * 512
        except:
            pass
        return disks
    
    def power_shutdown(self):
        return self.execute('/sbin/shutdown -h -P 0')
    
    def power_off(self):
        return self.execute('/sbin/shutdown -h -P -n 0')
    
    def power_reboot(self):
        return self.execute('/sbin/shutdown -r -f 0')
    
    def power_force_reboot(self):
        return self.execute('/sbin/shutdown -r -n 0')
    
    def execute(self, command):
        #FIXME: stop using shell=true and parse arguments with shlex.split()
        return Popen(command, shell=True,
                     bufsize=-1,
                     stdin=PIPE,
                     stdout=PIPE,
                     stderr=PIPE).communicate()


class Hypervisor(LocalHost):
    '''
    '''
    
    def storage(self):
        raise NotImplemented
    
    def vm_list(self):
        raise NotImplemented
    
    def vm_list_running(self):
        raise NotImplemented

    def vm_list_stopped(self):
        raise NotImplemented
        
    def vm_list_paused(self):
        raise NotImplemented
    
    def vm_get(self, name):
        raise NotImplemented


class VM(Host):
    '''
    '''


class Storage(object):
    '''
    '''


class StoragePool(object):
    '''
    '''


class StorageVolume(object):
    '''
    '''
Loading