From 71bd770a617f73a30cd41e6ab29c612c47205002 Mon Sep 17 00:00:00 2001 From: Thibault VINCENT Date: Thu, 3 Feb 2011 15:01:53 +0100 Subject: [PATCH] wonderful fixes - server reconnection - vm registering - real TTL - less contention - super cool fixes - blah blah ... --- bin/cc-node | 17 +- ccnode/__init__.py | 2 +- ccnode/ccnode.py | 51 +++- ccnode/common.py | 3 + ccnode/handlers.py | 568 +++++++++++++++++++++------------------ ccnode/libvirtwrapper.py | 84 +++--- etc/cc-node.conf | 4 + 7 files changed, 417 insertions(+), 312 deletions(-) diff --git a/bin/cc-node b/bin/cc-node index 6e81ab7..a50db8d 100755 --- a/bin/cc-node +++ b/bin/cc-node @@ -32,6 +32,7 @@ DEFAULT_CONFIGURATION = { 'detect_hypervisor': 'yes', #'ssl_cert': '', 'command_execution' : 'yes', + 'force_xen' : 'no', } MAX_AUTH_TIMEOUT = 10 @@ -84,7 +85,7 @@ def run_node(options): def authentication(node): timeout = 1 while node: - if node.manager.is_running(): + if node.get_manager().is_running(): logging.debug('Sending authentication request') result = node.authentify(options['login'], options['password']) if result: @@ -92,9 +93,8 @@ def run_node(options): return else: timeout += 0.1 - maxtimeout = MAX_AUTH_TIMEOUT - if timeout == maxtimeout: - timeout = maxtimeout + if timeout >= MAX_AUTH_TIMEOUT: + timeout = MAX_AUTH_TIMEOUT sleep(exp(timeout)) # start node @@ -105,22 +105,23 @@ def run_node(options): try: node = CCNode(options['address'], int(options['port']), options['detect_hypervisor'] == 'yes', - options['command_execution'] == 'yes') + options['command_execution'] == 'yes', + force_xen=(options['force_xen'] == 'yes')) except Exception as err: - logging.critical('Client initialization failure: `%s`' % err) + logging.critical('Client initialization failure: `%s`:`%s`' + % (repr(err), err)) raise err # start main loop and auth thread logging.debug('Starting authentication thread') auth_thread = threading.Thread(target=authentication, args=(node,), - name="Auth") + name='Auth') auth_thread.start() logging.debug('Starting node main loop') node.run() except Exception as err: if auth_thread: - auth_thread.cancel() del auth_thread auth_thread = None if node: diff --git a/ccnode/__init__.py b/ccnode/__init__.py index c113731..94e9750 100644 --- a/ccnode/__init__.py +++ b/ccnode/__init__.py @@ -7,4 +7,4 @@ TODO : rewrite based on older doc """ -__version__ = '9~dev' +__version__ = '9' diff --git a/ccnode/ccnode.py b/ccnode/ccnode.py index 7e37f95..4b76bd1 100644 --- a/ccnode/ccnode.py +++ b/ccnode/ccnode.py @@ -8,24 +8,27 @@ from sjrpc.client import SimpleRpcClient from sjrpc.utils import ConnectionProxy from sjrpc.core import RpcError import handlers - +from threading import Timer class CCNode(object): ''' Handle node initialization, connection to server, and authentication ''' - def __init__(self, server, port, hypervisor, exec_cmd, cert=None): + def __init__(self, server, port, hypervisor, exec_cmd, force_xen=False, + cert=None): ''' ''' - self.manager = SimpleRpcClient.from_addr(server, port, enable_ssl=True, - default_handler=handlers.NodeHandler(self, hypervisor, - exec_cmd)) - self.server = ConnectionProxy(self.manager) + self._scheduler_timer = None + self._handler = handlers.NodeHandler(self, hypervisor, exec_cmd, + force_xen) + self._manager = SimpleRpcClient.from_addr(server, port, enable_ssl=True, + default_handler=self._handler) + self._server = ConnectionProxy(self._manager) def run(self): ''' ''' - self.manager.run() + self._manager.run() def authentify(self, login, password): ''' @@ -33,22 +36,44 @@ class CCNode(object): logging.debug('Authenticating user %s with password <%s>' % (login, password)) try: - role = self.server.authentify(login, password) + role = self._server.authentify(login, password) + self._scheduler_run() except RpcError as err: if err.exception == 'AuthenticationError': logging.warning('Authentication error') else: logging.warning('Unknown error while authenticating: %s' % err) return False + except Exception as err: + logging.debug('Unhandled exception: `%s`' % err) else: - if role != 'hypervisor': + if role != 'hv': logging.warning('Bad role affected by server: %s' % role) - return False + raise Exception() + # FIXME this will not cause a server restart, node stays dead else: return True + + def _scheduler_run(self): + ''' + ''' + self._handler.scheduler_run() + # reset timer + del self._scheduler_timer + self._scheduler_timer = Timer(1, self._scheduler_run) + self._scheduler_timer.start() + def get_server(self): + ''' + ''' + return self._server + def get_manager(self): - return self.manager + ''' + ''' + return self._manager - def get_server(self): - return self.server \ No newline at end of file + def get_handler(self): + ''' + ''' + return self._handler \ No newline at end of file diff --git a/ccnode/common.py b/ccnode/common.py index e2bc4e2..77b3439 100644 --- a/ccnode/common.py +++ b/ccnode/common.py @@ -30,6 +30,9 @@ class LocalHost(Host): 'x86_64' : 'x64', } + def scheduler_run(self): + pass + def get_hw_serial(self): ''' ''' diff --git a/ccnode/handlers.py b/ccnode/handlers.py index 0505242..87f28a1 100644 --- a/ccnode/handlers.py +++ b/ccnode/handlers.py @@ -1,14 +1,16 @@ # -*- coding: utf-8 -*- +from fnmatch import fnmatchcase +from threading import Timer, Lock, RLock +from sjrpc.core import RpcError +from itertools import chain from __init__ import __version__ from sjrpc.utils import RpcHandler from sjrpc.utils import pure -from logging import debug, info -from fnmatch import fnmatchcase -from exceptions import FeatureNotImplemented +from logging import debug, critical, warning, info +from exceptions import CCException, FeatureNotImplemented from common import LocalHost -from threading import Timer -from sjrpc.core import RpcError + _MOD_KVM = True try: @@ -23,23 +25,24 @@ except ImportError: _MOD_XEN = False - class NodeHandler(RpcHandler): ''' Main node handler that exports the host capabilities to the server. ''' - def __init__(self, connection, detect_hv, allow_exec): + def __init__(self, connection, detect_hv=True, safe_mode=True, + force_xen=False): ''' ''' super(RpcHandler, self).__init__() self._connection = connection - self._allow_cmd_exec = allow_exec + self._safe_mode = safe_mode + # create interface with host self._host_handle = None if detect_hv: debug('Hypervisor detection in progress') - if _MOD_KVM: + if _MOD_KVM and not force_xen: debug('Initializing connection to the local KVM hypervisor') self._host_handle = kvm.KvmHypervisor() elif _MOD_XEN: @@ -53,74 +56,94 @@ class NodeHandler(RpcHandler): debug('Hypervisor detection disabled, running as regular node') self._host_handle = LocalHost() - self.EXEC_METHODS = ['execute_command', 'shutdown'] + # methods that execute administrative commands, to be banned when + # running in safe mode + self.UNSAFE_METHODS = ['execute_command', 'shutdown'] + # hypervisor tags self.HV_TAG_MANDATORY = ['h'] self.HV_TAG_MAP = { + # infinite TTL 'version' : ( lambda o: True, - lambda o,t: str(__version__)), - 'h' : self._tag_map_direct('get_name'), - 'htype' : self._tag_map_direct('get_hv_type'), - 'status' : self._tag_map_direct('get_status'), - 'hserial' : self._tag_map_direct('get_hw_serial'), - 'hvendor' : self._tag_map_direct('get_hw_vendor'), - 'hmachine' : self._tag_map_direct('get_hw_product'), - 'hbios' : self._tag_map_direct('get_hw_bios'), - 'arch' : self._tag_map_direct('get_arch'), - 'platform' : self._tag_map_direct('get_platform'), - 'uname' : self._tag_map_direct('get_uname'), - 'uptime' : self._tag_map_direct('get_uptime'), - 'hvm' : self._tag_map_direct('get_hvm_available'), - 'libvirtver': self._tag_map_direct('get_libvirt_version'), - 'hvver' : self._tag_map_direct('get_hv_version'), - 'load' : self._tag_map_direct('get_loadavg'), - 'cpu' : self._tag_map_direct('get_cpu'), - 'cpucore' : self._tag_map_direct('get_cpu_core'), - 'cputhread' : self._tag_map_direct('get_cpu_threads'), - 'cpufreq' : self._tag_map_direct('get_cpu_frequency'), - 'cpuuse' : self._tag_map_direct('get_cpu_usage'), - 'mem' : self._tag_map_direct('get_mem'), - 'memfree' : self._tag_map_direct('get_mem_free'), - 'memused' : self._tag_map_direct('get_mem_used'), - 'disk' : self._tag_map_keys('get_disks'), + lambda o,t: str(__version__), + -1), + 'libvirtver': self._tag_map_direct('get_libvirt_version', -1), + 'htype' : self._tag_map_direct('get_hv_type', -1), + 'hserial' : self._tag_map_direct('get_hw_serial', -1), + 'hvendor' : self._tag_map_direct('get_hw_vendor', -1), + 'hmachine' : self._tag_map_direct('get_hw_product', -1), + 'arch' : self._tag_map_direct('get_arch', -1), + 'hvm' : self._tag_map_direct('get_hvm_available', -1), + 'cpu' : self._tag_map_direct('get_cpu', -1), + 'cpucore' : self._tag_map_direct('get_cpu_core', -1), + 'cputhread' : self._tag_map_direct('get_cpu_threads', -1), + # one day + 'hbios' : self._tag_map_direct('get_hw_bios', 24*3600), + 'hvver' : self._tag_map_direct('get_hv_version', 24*3600), + 'platform' : self._tag_map_direct('get_platform', 24*3600), + 'uname' : self._tag_map_direct('get_uname', 24*3600), + 'cpufreq' : self._tag_map_direct('get_cpu_frequency', 24*3600), + 'mem' : self._tag_map_direct('get_mem', 24*3600), + 'disk' : self._tag_map_keys('get_disks', 24*3600), + 'h' : self._tag_map_direct('get_name', 24*3600), + # one hour + # one minute + 'uptime' : self._tag_map_direct('get_uptime', 60), + 'memfree' : self._tag_map_direct('get_mem_free', 60), + 'memused' : self._tag_map_direct('get_mem_used', 60), 'sto' : ( lambda o: hasattr(o, 'storage'), lambda o,t: ' '.join( - getattr(o, 'storage')().pool_list())), - 'nvm' : self._tag_map_counter('vm_list'), - 'vmstarted' : self._tag_map_counter('vm_list_running'), - 'vmstopped' : self._tag_map_counter('vm_list_stopped'), - 'vmpaused' : self._tag_map_counter('vm_list_paused'), + getattr(o, 'storage')().pool_list()), + 60), + # 5 seconds + 'cpuuse' : self._tag_map_direct('get_cpu_usage', 5), + 'load' : self._tag_map_direct('get_loadavg', 5), + 'nvm' : self._tag_map_counter('vm_list', 5), + 'vmstarted' : self._tag_map_counter('vm_list_running', 5), + 'vmstopped' : self._tag_map_counter('vm_list_stopped', 5), + 'vmpaused' : self._tag_map_counter('vm_list_paused', 5), } self.HV_TAG_GLOB = { - 'disk*' : self._tag_map_helper(self._helper_hv_disk), - 'sto*' : self._tag_map_helper(self._helper_hv_sto), + 'disk*' : self._tag_map_helper(self._helper_hv_disk, 24*3600), + 'sto*' : self._tag_map_helper(self._helper_hv_sto, 60), } + # guest VM tags self.VM_TAG_MANDATORY = ['hv', 'h'] self.VM_TAG_MAP = { + # infinite TTL 'version' : ( lambda o: True, - lambda o,t: str(__version__)), + lambda o,t: str(__version__), + -1), 'hv' : ( lambda o: hasattr(o, 'hypervisor'), - lambda o,t: o.hypervisor().get_name()), - # FIXME crappy tag implementation + lambda o,t: o.hypervisor().get_name(), + -1), + 'htype' : ( lambda o: hasattr(o, 'hypervisor'), + lambda o,t: o.hypervisor().get_hv_type(), + -1), + 'arch' : self._tag_map_direct('get_arch', -1), + 'h' : self._tag_map_direct('get_name', -1), + # one day + # one hour + 'cpu' : self._tag_map_direct('get_cpu_core', 3600), + 'mem' : self._tag_map_direct('get_mem', 3600), + 'memused' : self._tag_map_direct('get_mem_used', 3600), + 'memfree' : self._tag_map_direct('get_mem_free', 3600), + # one minute + # 5 seconds 'status' : ( lambda o: True, lambda o,t: 'running' if o.is_active() else 'paused' if o.is_paused() - else 'stopped'), - 'h' : self._tag_map_direct('get_name'), - 'arch' : self._tag_map_direct('get_arch'), - 'cpu' : self._tag_map_direct('get_cpu'), - 'mem' : self._tag_map_direct('get_mem'), - 'memused' : self._tag_map_direct('get_mem_used'), - 'memfree' : self._tag_map_direct('get_mem_free'), + else 'stopped', + 5), # FIXME crappy tag implementation + #'cpuuse' : self._tag_map_direct('get_cpu_usage'), } self.VM_TAG_GLOB = { - 'disk*' : self._tag_map_helper(self._helper_vm_disk), + 'disk*' : self._tag_map_helper(self._helper_vm_disk, 3600), } - # FIXME comment (populate the server with running VMs) + # FIXME self._register_vm = [] - self._register() def __getitem__(self, name): ''' @@ -129,37 +152,41 @@ class NodeHandler(RpcHandler): if name.startswith('_'): raise KeyError('Remote name `%s` is private' % repr(name)) # filter command execution methods - elif not self._allow_cmd_exec and name in self.EXEC_METHODS: + elif not self._safe_mode and name in self.UNSAFE_METHODS: raise KeyError('Remote name `%s` is disabled by configuration' % repr(name)) else: debug('Called %s.%s' % (self.__class__.__name__, name)) return super(NodeHandler, self).__getitem__(name) - def _tag_map_direct(self, method): + def _tag_map_direct(self, method, ttl): ''' ''' return ( lambda o: hasattr(o, method), - lambda o,t: getattr(o, method)()) + lambda o,t: getattr(o, method)(), + ttl) - def _tag_map_counter(self, method): + def _tag_map_counter(self, method, ttl): ''' ''' return ( lambda o: hasattr(o, method), - lambda o,t: len(getattr(o, method)())) + lambda o,t: len(getattr(o, method)()), + ttl) - def _tag_map_keys(self, method): + def _tag_map_keys(self, method, ttl): ''' ''' return ( lambda o: hasattr(o, method), - lambda o,t: ' '.join(getattr(o, method)().keys())) + lambda o,t: ' '.join(getattr(o, method)().keys()), + ttl) - def _tag_map_helper(self, helper): + def _tag_map_helper(self, helper, ttl): ''' ''' return ( lambda o, resolve=False: helper(o, resolve=resolve), lambda o, tag_name=None, resolve=False: - helper(o, tag_name=tag_name, resolve=resolve)) + helper(o, tag_name=tag_name, resolve=resolve), + ttl) def _helper_hv_disk(self, hv, tag_name=None, resolve=True): ''' @@ -219,53 +246,65 @@ class NodeHandler(RpcHandler): result = None return result - def _register(self): - #FIXME check is we are connected before - #debug("register: begin") - try: - vm_current = self._host_handle.vm_list() - for vm in vm_current: - if vm not in self._register_vm: - try: - self._connection.get_server().register(vm, 'vm') - except RpcError as e: - if e.exception == '#FIXME': - self._register_vm.append(vm) + def scheduler_run(self): + ''' + ''' + # call handler scheduler + if hasattr(self._host_handle, 'scheduler_run'): + self._host_handle.scheduler_run() + # (un)register sub nodes if this host has the capability + if hasattr(self._host_handle, 'vm_list'): + try: + vm_current = self._host_handle.vm_list() + for vm in vm_current: + if vm not in self._register_vm: + try: + info('registering vm `%s`' % vm) + self._connection.get_server().register(vm, 'vm') + except RpcError as e: + if e.exception == '#FIXME': + self._register_vm.append(vm) + else: + raise e else: - raise e - else: - self._register_vm.append(vm) - - for vm in self._register_vm: - if vm not in vm_current: - try: - self._connection.get_server().unregister(vm) - except RpcError as e: - if e.exception == '#FIXME': - vm_current.pop(vm) + self._register_vm.append(vm) + for vm in self._register_vm: + if vm not in vm_current: + try: + info('unregistering vm `%s`' % vm) + self._connection.get_server().unregister(vm) + except RpcError as e: + if e.exception == '#FIXME': + self._register_vm.remove(vm) + else: + raise e else: - raise e - else: - vm_current.pop(vm) - except Exception as e: - pass - finally: - #debug("register: finally 1") - self._register_timer = Timer(1.0, self._register) - self._register_timer.start() - #debug("register: finally 2") - #debug("register: end") + self._register_vm.remove(vm) + except Exception as e: + debug("REGISTER except `%s`:`%s`" % (repr(e), e)) + pass @pure - def get_tags(self, tags=None, resolve=True): + def get_tags(self, tags=None, noresolve_tags=None): ''' ''' result = {} - debug('get_tags: server requested tags `%s`' % tags) + debug('get_tags: server requested tags=`%s` noresolve_tags=`%s`' + % (tags, noresolve_tags)) + # build a single list of tags + mytags = {} + if tags: + for t in tags: + mytags[t] = True + if noresolve_tags: + for t in noresolve_tags: + if t not in mytags: + mytags[t] = False # return all tags if server does not request a subset - if tags is None: + if not mytags: # add simple tags - tags = self.HV_TAG_MAP.keys() + for t in self.HV_TAG_MAP.keys(): + mytags[t] = True # add globbing tags for pattern, handler in self.HV_TAG_GLOB.iteritems(): # helper is available on the current host @@ -274,71 +313,200 @@ class NodeHandler(RpcHandler): # get tags from helper htags = handler[0](self._host_handle, resolve=False) # append all tags - tags.extend(htags) - debug('get_tags: no tag specified, expanded list to `%s`' % tags) - # add mandatory tags if missing in the list + for t in htags: + mytags[t] = True + debug('get_tags: no tag specified, expanded list to `%s`' + % mytags.keys()) + # add mandatory tags if missing in the list, or set noresolve else: - for mtag in self.HV_TAG_MANDATORY: - if mtag not in tags: - debug('get_tags: add missing mandatory tag `%s`' % mtag) - tags.append(mtag) + for t in self.HV_TAG_MANDATORY: + if t not in mytags or not mytags[t]: + debug('get_tags: add/correct mandatory tag `%s`' % t) + mytags[t] = True # query host - debug('get_tags: query host with tag list `%s`' % tags) - for tag in tags: + debug('get_tags: query host with tag list `%s`' % mytags.keys()) + for tag, resolve in mytags.iteritems(): # first, test tag name againts list of plain name if tag in self.HV_TAG_MAP: debug('get_tags: plain mapping found for tag `%s`' % tag) if self.HV_TAG_MAP[tag][0](self._host_handle): - # fetch tag data - q = self.HV_TAG_MAP[tag][1](self._host_handle, tag) - debug('get_tags: host returned `%s`' % q) - # append tag information - if q is not None: - # when a dict is returned, it may contain >1 tags - # in this case the real tag name is given by the - # wrapper and it may differ from the mapping name - if isinstance(q, dict): - for key, val in q.iteritems(): - result[key] = {} - result[key]['value'] = str(val) - # FIXME really fast - result[key]['ttl'] = -1 - # or there's only one value - else: - result[tag] = {} + debug('get_tags: tag `%s` is available on host' % tag) + result[tag] = {} + result[tag]['ttl'] = self.HV_TAG_MAP[tag][2] + if resolve: + debug('get_tags: resolving now tag `%s`' % tag) + # fetch tag data + q = self.HV_TAG_MAP[tag][1](self._host_handle, tag) + debug('get_tags: host returned `%s`' % q) + if q is not None: + # append tag data result[tag]['value'] = str(q) - # FIXME really fast - result[tag]['ttl'] = -1 - else: - debug('get_tags: I wont return `%s`=`None`' % tag) + else: + debug('get_tags: I wont return `%s`=`None`' + % tag) else: debug('get_tags: tag `%s` is NOT implemented' % tag) - # if no direct tag mapping exists, test name against globbing a list + # if no direct tag mapping exists, test name against globbing + # a list else: debug('get_tags: searching for `%s` in globbing tags' % tag) # iter on globbing patterns, and get helper references - # process the first globbing tag that match then exit because + # process the first globbing tag that match then exit bcause # there should not exist two globbing pattern matching # one tag, ideally for pattern, handler in self.HV_TAG_GLOB.iteritems(): # helper is available on the current host if handler[0](self._host_handle, tag): + debug('get_tags: testing pattern `%s`' % pattern) if fnmatchcase(tag, pattern): debug('get_tags: processing tag `%s` with ' - 'pattern `%s`' % (tag, pattern)) + 'pattern `%s`' % (tag, pattern)) # get tags from helper - htags = handler[0](self._host_handle, tag) + htags = handler[1](self._host_handle, tag) # FIXME intead of extracting one tag, try not # to build the whole list. Maybe it's too # difficult and not worth to implement if tag in htags: debug('get_tags: found tag in helper result' - 'with value `%s`' % htags[tag]) + ' with value `%s`' % htags[tag]) result[tag] = {} - result[tag]['value'] = str(htags[tag]) - # FIXME - result[tag]['ttl'] = -1 + result[tag]['ttl'] = handler[2] + if resolve: + result[tag]['value'] = str(htags[tag]) + break + debug("get_tags: released lock") + return result + + def _sub_tag_list(self, sub_obj): + ''' + ''' + result = [] + # add simple tags + result.extend(self.VM_TAG_MAP.keys()) + # add globbing tags + for pattern, handler in self.VM_TAG_GLOB.iteritems(): + # helper is available on the current host + if handler[0](sub_obj): + debug('sub_tags: sub node implements `%s`' % pattern) + # get tags from helper + htags = handler[0](sub_obj, resolve=False) + debug('sub_tags: handler provides `%s`' % htags) + # append all tags + for t in htags.keys(): + result.append(t) + return result + + @pure + def sub_tags(self, sub_id, tags=None, noresolve_tags=None): + ''' + ''' + debug('sub_tags: server requested tags for `%s`' % sub_id) + if sub_id not in self._host_handle.vm_list(): + debug('sub_tags: sub node `%s` is unknown !' % sub_id) + raise CCException('sub node `%s` is unknown' % sub_id) + else: + # open a wrapper to the VM + debug('sub_tags: fetching vm data for `%s`' % sub_id) + sub = self._host_handle.vm_get(sub_id) + # build a single list of tags + debug('sub_tags: server requested tags `%s` + `%s`' + % (tags, noresolve_tags)) + available_tags = self._sub_tag_list(sub) + mytags = {} + # return all resolved tags case + if tags is None and noresolve_tags is None: + for t in available_tags: + mytags[t] = True + elif tags is None or noresolve_tags is None: + if tags is None: + for t in available_tags: + mytags[t] = True + for t in noresolve_tags: + mytags[t] = False + else: + for t in available_tags: + mytags[t] = False + for t in tags: + mytags[t] = True + else: + for t in noresolve_tags: + mytags[t] = False + for t in tags: + mytags[t] = True + debug('sub_tags: expanded list to `%s`' % mytags.keys()) + # add mandatory tags if missing in the list, or set noresolve + for t in self.VM_TAG_MANDATORY: + if t not in mytags or not mytags[t]: + debug('sub_tags: add/correct mandatory tag `%s`' % t) + mytags[t] = True + # query the subnode + result = {} + try: + ''' + # expand tag list with globbing tags + if get_all: + for pattern, handler in self.VM_TAG_GLOB.iteritems(): + # helper is available on this VM + if handler[0](vm): + debug('sub_tags: vm implements `%s`' % pattern) + # get tags from helper + htags = handler[0](vm, resolve=False) + # append all tags + mytags.extend(htags) + ''' + # query the VM with each tag + for tag, resolve in mytags.iteritems(): + # first, search tag in plain mappings + if tag in self.VM_TAG_MAP: + debug('sub_tags: plain mapping found for tag `%s`' + % tag) + # proceed if tag can be resolved on this VM + if self.VM_TAG_MAP[tag][0](sub): + result[tag] = {} + # FIXME + result[tag]['ttl'] = self.VM_TAG_MAP[tag][2] + if resolve: + debug('sub_tags: resolving tag %s`' % tag) + # call the wrapper mapping lambda + q = self.VM_TAG_MAP[tag][1](sub, tag) + debug('sub_tags: tag query returned `%s`' % q) + if q is not None: + if resolve: + result[tag]['value'] = str(q) + else: + debug('sub_tags: I wont return `%s`=`None`' + % tag) + # no tag mapping exist, test name against the globbing list + else: + debug('sub_tags: searching for `%s` in globbing tags' + % tag) + # iter on globbing patterns, and get helper references + # process the first globbing tag that match then exit + # because there should not exist two globbing pattern + # matching one tag, ideally + for pattern, handler in self.VM_TAG_GLOB.iteritems(): + # helper is available on the current VM + if handler[0](sub, tag): + if fnmatchcase(tag, pattern): + debug('get_tags: processing tag `%s` with ' + 'pattern `%s`' % (tag, pattern)) + # get tags from helper + htags = handler[0](sub, tag) + # FIXME intead of extracting one tag, try + # not to build the whole list. Maybe it's + # too difficult and not worth implementing + if tag in htags: + debug('get_tags: found tag in helper ' + 'result with value `%s`' % htags[tag]) + result[tag] = {} + result[tag]['ttl'] = self.VM_TAG_MAP[tag][2] + if resolve: + result[tag]['value'] = str( + htags[tag]) + break + except Exception as e: + debug('(%s) : %s' % (repr(e), e)) return result @pure @@ -360,119 +528,6 @@ class NodeHandler(RpcHandler): raise FeatureNotImplemented('host handler has no method `%s`' % method) - @pure - def vm_tags(self, vm_names=None, tags=None, resolve=True): - ''' - ''' - # list all VMs if the server did not provide names - debug('vm_list: server requested list of vm `%s`' % vm_names) - if vm_names is None: - vm_names = self._host_handle.vm_list() - debug('vm_list: no vm specified, expanded list to `%s`' % vm_names) - # return all tags if server does not request a subset - get_all = tags is None - debug('vm_list: server requested tags `%s`' % tags) - if get_all: - # add simple tags - tags = self.VM_TAG_MAP.keys() - debug('vm_list: no tag specified, expanded list to `%s`' % tags) - # add mandatory tags if missing in the list - else: - for mtag in self.VM_TAG_MANDATORY: - if mtag not in tags: - debug('vm_list: add missing mandatory tag `%s`' % mtag) - tags.append(mtag) - # query each vm - result = {} - for vm_name in vm_names: - vm_tag = {} - try: - # copy tag list for local modifications (globbing) - mytags = tags - # open a wrapper to the VM - debug('vm_list: fetching vm data for `%s`' % vm_name) - vm = self._host_handle.vm_get(vm_name) - # expand tag list with globbing tags - if get_all: - for pattern, handler in self.VM_TAG_GLOB.iteritems(): - # helper is available on this VM - if handler[0](vm): - debug('vm_list: vm implements `%s`' % pattern) - # get tags from helper - htags = handler[0](vm, resolve=False) - # append all tags - mytags.extend(htags) - # query the VM with each tag - for tag in mytags: - # first, search tag in plain mappings - if tag in self.VM_TAG_MAP: - debug('vm_list: plain mapping found for tag `%s`' % tag) - # proceed if tag can be resolved on this VM - if self.VM_TAG_MAP[tag][0](vm): - vm_tag[tag] = {} - # fetch data only if we only built the tag list - debug('vm_list: resolving tag %s`' % tag) - # call the wrapper mapping lambda - q = self.VM_TAG_MAP[tag][1](vm, tag) - debug('vm_list: query returned `%s`' % q) - # when a dict is returned, it may contain >1 tag - # in this case the real tag name is given by the - # wrapper and it may differ from the mapping nam - if isinstance(q, dict): - for key, val in q.iteritems(): - if val is not None: - if resolve: - vm_tag[key]['value'] = str(q) - # FIXME really fast - vm_tag[key]['ttl'] = -1 - else: - debug('vm_list: I wont return ' - '`%s`=`None`' % key) - # or there's only one value - elif q is not None: - if resolve: - vm_tag[tag]['value'] = str(q) - # FIXME really fast - vm_tag[tag]['ttl'] = -1 - else: - debug('vm_list: I wont return `%s`=`None`' - % tag) - # no tag mapping exist, test name against the globbing list - else: - debug('vm_list: searching for `%s` in globbing tags' - % tag) - # iter on globbing patterns, and get helper references - # process the first globbing tag that match then exit - # because there should not exist two globbing pattern - # matching one tag, ideally - for pattern, handler in self.VM_TAG_GLOB.iteritems(): - # helper is available on the current VM - if handler[0](vm, tag): - if fnmatchcase(tag, pattern): - debug('get_tags: processing tag `%s` with ' - 'pattern `%s`' % (tag, pattern)) - # get tags from helper - htags = handler[0](vm, tag) - # FIXME intead of extracting one tag, try - # not to build the whole list. Maybe it's - # too difficult and not worth implementing - if tag in htags: - debug('get_tags: found tag in helper ' - 'result with value `%s`' % htags[tag]) - vm_tag[tag] = {} - if resolve: - vm_tag[tag]['value'] = str( - htags[tag]) - # FIXME - vm_tag[tag]['ttl'] = -1 - break - # save the tag list - # FIXME: in case of exception, we won't return a single VM tag - result[vm_name] = vm_tag - except Exception as e: - debug('(%s) : %s' % (repr(e), e)) - return result - @pure def vm_stop(self, vm_names=None, force=False): ''' @@ -537,8 +592,7 @@ class NodeHandler(RpcHandler): info('vm_resume: server requested resume of `%s`' % vm_names) if vm_names is None: vm_names = self._host_handle.vm_list_running() - debug('vm_resume: no vm specified, expanded list to `%s`' - % vm_names) + debug('vm_resume: no vm specified, expanded list to `%s`'% vm_names) for vm_name in vm_names: try: debug('vm_resume: fetching vm data for `%s`' % vm_name) @@ -552,7 +606,7 @@ class NodeHandler(RpcHandler): def execute_command(self, command): ''' ''' - info('execute_command: starting execution of `%s`' % command) + warning('execute_command: starting execution of `%s`' % command) output = self._host_handle.execute(command) - info('execute_command: finished execution of `%s`' % command) + warning('execute_command: finished execution of `%s`' % command) return output diff --git a/ccnode/libvirtwrapper.py b/ccnode/libvirtwrapper.py index af7b3be..0f35478 100644 --- a/ccnode/libvirtwrapper.py +++ b/ccnode/libvirtwrapper.py @@ -4,8 +4,10 @@ import libvirt import re import psutil import xml.dom.minidom +from logging import debug, info from time import sleep from common import Hypervisor, VM, Storage, StoragePool, StorageVolume +from utils import RWLock KVM_LIBVIRT_SESSION = 'qemu:///system' @@ -27,9 +29,11 @@ class LibvirtHypervisor(Hypervisor): ''' try: if hv_type == 'kvm': + debug("LibvirtHypervisor: initialized as KVM") self._lvcon_handle = libvirt.open(KVM_LIBVIRT_SESSION) elif hv_type == 'xen': - self._lvcon_handle = libvirt.open(KVM_LIBVIRT_SESSION) + debug("LibvirtHypervisor: initialized as Xen") + self._lvcon_handle = libvirt.open(XEN_LIBVIRT_SESSION) else: raise NotImplemented('Unknown hypervisor type') except libvirt.libvirtError as error: @@ -40,23 +44,41 @@ class LibvirtHypervisor(Hypervisor): self._vm_cache_running = {} self._vm_cache_defined = {} self._vm_cache = {} + self._vm_cache_lock = RWLock() + + def scheduler_run(self): + self._cache_vm_rebuild() - def _vm_cache_rebuild(self): + def _cache_vm_rebuild(self): ''' ''' - self._vm_cache_running = {} - self._vm_cache_defined = {} - self._vm_cache = {} + running = {} + defined = {} + #debug("_vm_cache_rebuild: listing running domains") for dom_id in self._lvcon_handle.listDomainsID(): - vm = LibvirtVm(self, self._lvcon_handle.lookupByID(dom_id)) - self._vm_cache_running[vm.get_name()] = vm + #debug("domid=%i" % dom_id) + try: + vm = LibvirtVm(self, self._lvcon_handle.lookupByID(dom_id)) + running[vm.get_name()] = vm + except Exception as err: + debug("%s: %s" % (err,err)) + + #debug("_vm_cache_rebuild: listing defined domains") for dom_name in self._lvcon_handle.listDefinedDomains(): - vm = LibvirtVm(self, self._lvcon_handle.lookupByName(dom_name)) - self._vm_cache_defined[vm.get_name()] = vm + #debug("domid=%s" % dom_name) + try: + vm = LibvirtVm(self, + self._lvcon_handle.lookupByName(dom_name)) + defined[vm.get_name()] = vm + except Exception as err: + debug("%s: %s" % (err,err)) - self._vm_cache = self._vm_cache_running - self._vm_cache.update(self._vm_cache_defined) + with self._vm_cache_lock.write: + self._vm_cache_running = running + self._vm_cache_defined = defined + self._vm_cache = self._vm_cache_running + self._vm_cache.update(self._vm_cache_defined) def get_hv_type(self): ''' @@ -71,6 +93,7 @@ class LibvirtHypervisor(Hypervisor): data = self._lvcon_handle.getVersion() if data: version = data + except: pass return version @@ -110,48 +133,43 @@ class LibvirtHypervisor(Hypervisor): def vm_list(self): ''' ''' - if not self._vm_cache: - self._vm_cache_rebuild() - return self._vm_cache.keys() + with self._vm_cache_lock.read: + return self._vm_cache.keys() def vm_list_running(self): ''' ''' - if not self._vm_cache_running: - self._vm_cache_rebuild() running = [] - for vm_name in self._vm_cache_running: - vm = self.vm_get(vm_name) - if vm.is_active(): - running.append(vm_name) + with self._vm_cache_lock.read: + for vm_name, vm in self._vm_cache_running.iteritems(): + if vm.is_active(): + running.append(vm_name) return running def vm_list_stopped(self): ''' ''' - if not self._vm_cache_defined: - self._vm_cache_rebuild() - return self._vm_cache_defined.keys() + with self._vm_cache_lock.read: + return self._vm_cache_defined.keys() def vm_list_paused(self): ''' ''' - if not self._vm_cache_running: - self._vm_cache_rebuild() paused = [] - for vm_name in self._vm_cache_running: - vm = self.vm_get(vm_name) - if vm.is_paused(): - paused.append(vm_name) + with self._vm_cache_lock.read: + for vm_name, vm in self._vm_cache_running.iteritems(): + if vm.is_paused(): + paused.append(vm_name) return paused def vm_get(self, name): ''' ''' - if name in self.vm_list(): - return self._vm_cache[name] - else: - raise Exception() + with self._vm_cache_lock.read: + if name in self.vm_list(): + return self._vm_cache[name] + else: + raise Exception() #### storage diff --git a/etc/cc-node.conf b/etc/cc-node.conf index 344886a..7d9d10d 100644 --- a/etc/cc-node.conf +++ b/etc/cc-node.conf @@ -15,5 +15,9 @@ verbosity = # the node should not attempt to use local hypervisors detect_hypervisor = yes +# TO BE REMOVED IN LATER RELEASES +# force usage of Xen +force_xen = no + # allow remote command execution (or host shutdown/reboot) command_execution = yes -- GitLab