# -*- coding: utf-8 -*- from fnmatch import fnmatchcase from sjrpc.core import RpcError from __init__ import __version__ from sjrpc.utils import RpcHandler from sjrpc.utils import pure from logging import debug, warning, info from exceptions import HypervisorError from common import LocalHost _MOD_KVM = True try: import kvm except ImportError: _MOD_KVM = False _MOD_XEN = True try: import xen except ImportError: _MOD_XEN = False class NodeHandler(RpcHandler): ''' Main node handler that exports the host capabilities to the server. ''' def __init__(self, connection, detect_hv=True, safe_mode=True, force_xen=False): ''' ''' super(NodeHandler, self).__init__() self._connection = connection self._safe_mode = safe_mode # create interface with host self._host_handle = None if detect_hv: debug('Hypervisor detection in progress') if _MOD_KVM and not force_xen: debug('Initializing connection to the local KVM hypervisor') self._host_handle = kvm.KvmHypervisor() elif _MOD_XEN: debug('Initializing connection to the local Xen hypervisor') self._host_handle = xen.XenHypervisor() if self._host_handle is None: debug('Hypervisor detection failed') if not detect_hv or self._host_handle is None: debug('Hypervisor detection disabled, running as regular node') self._host_handle = LocalHost() # methods that execute administrative commands, to be banned when # running in safe mode self.UNSAFE_METHODS = ['execute_command', 'shutdown'] # hypervisor tags self.HV_TAG_MANDATORY = ['h'] self.HV_TAG_MAP = { # infinite TTL 'version' : ( lambda o: True, lambda o,t: str(__version__), -1), 'libvirtver': self._tag_map_direct('get_libvirt_version', -1), 'htype' : self._tag_map_direct('get_hv_type', -1), 'hserial' : self._tag_map_direct('get_hw_serial', -1), 'hvendor' : self._tag_map_direct('get_hw_vendor', -1), 'hmachine' : self._tag_map_direct('get_hw_product', -1), 'arch' : self._tag_map_direct('get_arch', -1), 'hvm' : self._tag_map_direct('get_hvm_available', -1), 'cpu' : self._tag_map_direct('get_cpu', -1), 'cpucore' : self._tag_map_direct('get_cpu_core', -1), 'cputhread' : self._tag_map_direct('get_cpu_thread', -1), # one day 'hbios' : self._tag_map_direct('get_hw_bios', 24*3600), 'hvver' : self._tag_map_direct('get_hv_version', 24*3600), 'platform' : self._tag_map_direct('get_platform', 24*3600), 'uname' : self._tag_map_direct('get_uname', 24*3600), 'cpufreq' : self._tag_map_direct('get_cpu_frequency', 24*3600), 'mem' : self._tag_map_direct('get_mem', 24*3600), 'disk' : self._tag_map_keys('get_disks', 24*3600), 'h' : self._tag_map_direct('get_name', 24*3600), # one hour # one minute 'memfree' : self._tag_map_direct('get_mem_free', 60), 'memused' : self._tag_map_direct('get_mem_used', 60), 'sto' : ( lambda o: hasattr(o, 'storage'), lambda o,t: ' '.join( getattr(o, 'storage')().pool_list()), 60), # 5 seconds 'uptime' : self._tag_map_direct('get_uptime', 5), 'cpuuse' : self._tag_map_direct('get_cpu_usage', 5), 'load' : self._tag_map_direct('get_loadavg', 5), 'nvm' : self._tag_map_counter('vm_list', 5), 'vmstarted' : self._tag_map_counter('vm_list_running', 5), 'vmstopped' : self._tag_map_counter('vm_list_stopped', 5), 'vmpaused' : self._tag_map_counter('vm_list_paused', 5), } self.HV_TAG_GLOB = { 'disk*' : self._tag_map_helper(self._helper_hv_disk, 24*3600), 'sto*' : self._tag_map_helper(self._helper_hv_sto, 60), } # guest VM tags self.VM_TAG_MANDATORY = ['hv', 'h'] self.VM_TAG_MAP = { # infinite TTL 'version' : ( lambda o: True, lambda o,t: str(__version__), -1), 'hv' : ( lambda o: hasattr(o, 'hypervisor'), lambda o,t: o.hypervisor().get_name(), -1), 'htype' : ( lambda o: hasattr(o, 'hypervisor'), lambda o,t: o.hypervisor().get_hv_type(), -1), 'arch' : self._tag_map_direct('get_arch', -1), 'h' : self._tag_map_direct('get_name', -1), # one day # one hour 'cpu' : self._tag_map_direct('get_cpu_core', 3600), 'mem' : self._tag_map_direct('get_mem', 3600), # one minute # 5 seconds 'status' : ( lambda o: True, lambda o,t: 'running' if o.is_active() else 'paused' if o.is_paused() else 'stopped', 5), # FIXME crappy tag implementation #'cpuuse' : self._tag_map_direct('get_cpu_usage'), } self.VM_TAG_GLOB = { 'disk*' : self._tag_map_helper(self._helper_vm_disk, 3600), } # FIXME self._register_vm = [] def __getitem__(self, name): ''' ''' # filter the private members access if name.startswith('_'): raise KeyError('Remote name `%s` is private' % repr(name)) # filter command execution methods elif not self._safe_mode and name in self.UNSAFE_METHODS: raise KeyError('Remote name `%s` is disabled by configuration' % repr(name)) else: debug('Called %s.%s' % (self.__class__.__name__, name)) return super(NodeHandler, self).__getitem__(name) def _tag_map_direct(self, method, ttl): ''' ''' return ( lambda o: hasattr(o, method), lambda o,t: getattr(o, method)(), ttl) def _tag_map_counter(self, method, ttl): ''' ''' return ( lambda o: hasattr(o, method), lambda o,t: len(getattr(o, method)()), ttl) def _tag_map_keys(self, method, ttl): ''' ''' return ( lambda o: hasattr(o, method), lambda o,t: ' '.join(getattr(o, method)().keys()), ttl) def _tag_map_helper(self, helper, ttl): ''' ''' return ( lambda o, resolve=False: helper(o, resolve=resolve), lambda o, tag_name=None, resolve=False: helper(o, tag_name=tag_name, resolve=resolve), ttl) def _helper_hv_disk(self, hv, tag_name=None, resolve=True): ''' ''' result = {} disks = hv.get_disks() if len(disks): result['disk'] = ' '.join(disks.keys()) for name, size in disks.iteritems(): if size: result['disk%s_size' % name] = str(size) if not result: result = None return result def _helper_hv_sto(self, hv, tag_name=None, resolve=True): ''' ''' result = {} if hasattr(hv, 'storage'): pools = hv.storage().pool_list() if len(pools): result['sto'] = ' '.join(pools) for pool_name in pools: pool = hv.storage().pool_get(pool_name) capa = pool.get_space_capacity() if capa: result['sto%s_size' % pool_name] = str(capa) free = pool.get_space_free() if free: result['sto%s_free' % pool_name] = str(free) used = pool.get_space_used() if used: result['sto%s_used' % pool_name] = str(used) vol = pool.volume_list() if vol: result['sto%s_vol' % pool_name] = ' '.join(vol) if not result: result = None return result def _helper_vm_disk(self, vm, tag_name=None, resolve=True): ''' ''' result = {} volumes = vm.get_volumes() if len(volumes): result['disk'] = ' '.join([str(i) for i in range(0, len(volumes))]) for vol_id, vol in enumerate(volumes): path = vol.get_path() if path: result['disk%i_path' % vol_id] = str(path) capa = vol.get_space_capacity() if capa: result['disk%i_size' % vol_id] = str(capa) if not result: result = None return result def scheduler_run(self): ''' ''' # call handler scheduler if hasattr(self._host_handle, 'scheduler_run'): self._host_handle.scheduler_run() # (un)register sub nodes if this host has the capability if hasattr(self._host_handle, 'vm_list'): try: vm_current = self._host_handle.vm_list() for vm in vm_current: if vm not in self._register_vm: try: info('registering vm `%s`' % vm) self._connection.get_server().register(vm, 'vm') except RpcError as e: if e.exception == '#FIXME': self._register_vm.append(vm) else: raise e else: self._register_vm.append(vm) for vm in self._register_vm: if vm not in vm_current: try: info('unregistering vm `%s`' % vm) self._connection.get_server().unregister(vm) except RpcError as e: if e.exception == '#FIXME': self._register_vm.remove(vm) else: raise e else: self._register_vm.remove(vm) except Exception as e: debug("REGISTER except `%s`:`%s`" % (repr(e), e)) pass ################################## # Tag query ################################## @pure def get_tags(self, tags=None, noresolve_tags=None): ''' ''' result = {} info('get_tags: server requested tags=`%s` noresolve_tags=`%s`', tags, noresolve_tags) # build a single dict of tags, boolean means "resolve" mytags = {} if tags: for t in tags: mytags[t] = True if noresolve_tags: for t in noresolve_tags: if t not in mytags: mytags[t] = False # return all tags if server does not request a subset if not mytags: # add simple tags for t in self.HV_TAG_MAP.keys(): mytags[t] = True # add globbing tags for pattern, handler in self.HV_TAG_GLOB.iteritems(): try: # helper is available on the current host if handler[0](self._host_handle): debug('get_tags: host implements `%s`', pattern) # get tags from helper htags = handler[0](self._host_handle, resolve=False) # append all tags for t in htags: mytags[t] = True except Exception as err: warning('get_tags: `%s` -> `%s`', repr(err), err) debug('get_tags: no tag specified, expanded list to `%s`', mytags.keys()) # add mandatory tags if missing in the list, or set noresolve else: for t in self.HV_TAG_MANDATORY: if t not in mytags or not mytags[t]: debug('get_tags: add/correct mandatory tag `%s`', t) mytags[t] = True # query host info('get_tags: query host with tag list `%s`', mytags.keys()) for tag, resolve in mytags.iteritems(): # first, test tag name againts list of plain name if tag in self.HV_TAG_MAP: debug('get_tags: plain mapping found for tag `%s`', tag) try: if self.HV_TAG_MAP[tag][0](self._host_handle): debug('get_tags: tag `%s` is available on host', tag) result[tag] = {} result[tag]['ttl'] = self.HV_TAG_MAP[tag][2] if resolve: debug('get_tags: resolving now tag `%s`', tag) # fetch tag data q = self.HV_TAG_MAP[tag][1](self._host_handle, tag) debug('get_tags: host returned `%s`', q) if q is not None: # append tag data result[tag]['value'] = str(q) else: debug('get_tags: I wont return `%s`=`None`',tag) else: debug('get_tags: tag `%s` is NOT implemented', tag) except Exception as err: warning('get_tags: `%s` -> `%s`', repr(err), err) # if no direct tag mapping exists, test name against globbing # a list else: debug('get_tags: searching for `%s` in globbing tags', tag) # iter on globbing patterns, and get helper references # process the first globbing tag that match then exit bcause # there should not exist two globbing pattern matching # one tag, ideally for pattern, handler in self.HV_TAG_GLOB.iteritems(): try: # helper is available on the current host if handler[0](self._host_handle, tag): debug('get_tags: testing pattern `%s`', pattern) if fnmatchcase(tag, pattern): debug('get_tags: processing tag `%s` with ' 'pattern `%s`', tag, pattern) # get tags from helper htags = handler[1](self._host_handle, tag) # FIXME intead of extracting one tag, try not # to build the whole list. Maybe it's too # difficult and not worth to implement if tag in htags: debug('get_tags: found tag in helper result' ' with value `%s`', htags[tag]) result[tag] = {} result[tag]['ttl'] = handler[2] if resolve: result[tag]['value'] = str(htags[tag]) break except Exception as err: warning('get_tags: `%s` -> `%s`', repr(err), err) return result def _sub_tag_list(self, sub_obj): ''' ''' result = [] # add simple tags result.extend(self.VM_TAG_MAP.keys()) # add globbing tags for pattern, handler in self.VM_TAG_GLOB.iteritems(): try: # helper is available on the current host if handler[0](sub_obj): debug('sub_tags: sub node implements `%s`' % pattern) # get tags from helper htags = handler[0](sub_obj, resolve=False) debug('sub_tags: handler provides `%s`' % htags) # append all tags for t in htags.keys(): result.append(t) except Exception as err: warning('_sub_tag_list: `%s` -> `%s`', repr(err), err) return result @pure def sub_tags(self, sub_id, tags=None, noresolve_tags=None): ''' ''' info('sub_tags: server requested tags for `%s`', sub_id) if sub_id not in self._host_handle.vm_list(): warning('sub_tags: sub node `%s` is unknown !', sub_id) raise HypervisorError('sub node `%s` is unknown' % sub_id) else: # open a wrapper to the VM debug('sub_tags: fetching vm data for `%s`', sub_id) sub = self._host_handle.vm_get(sub_id) # build a single list of tags info('sub_tags: server requested tags `%s` + `%s`', tags, noresolve_tags) available_tags = self._sub_tag_list(sub) mytags = {} # return all resolved tags case if tags is None and noresolve_tags is None: for t in available_tags: mytags[t] = True elif tags is None or noresolve_tags is None: if tags is None: for t in available_tags: mytags[t] = True for t in noresolve_tags: mytags[t] = False else: for t in available_tags: mytags[t] = False for t in tags: mytags[t] = True else: for t in noresolve_tags: mytags[t] = False for t in tags: mytags[t] = True debug('sub_tags: expanded list to `%s`', mytags.keys()) # add mandatory tags if missing in the list, or set noresolve for t in self.VM_TAG_MANDATORY: if t not in mytags or not mytags[t]: debug('sub_tags: add/correct mandatory tag `%s`' % t) mytags[t] = True # query the subnode result = {} try: # query the VM with each tag for tag, resolve in mytags.iteritems(): # first, search tag in plain mappings if tag in self.VM_TAG_MAP: info('sub_tags: plain mapping found for tag `%s`', tag) # proceed if tag can be resolved on this VM try: if self.VM_TAG_MAP[tag][0](sub): result[tag] = {} result[tag]['ttl'] = self.VM_TAG_MAP[tag][2] if resolve: debug('sub_tags: resolving tag `%s`', tag) # call the wrapper mapping lambda q = self.VM_TAG_MAP[tag][1](sub, tag) info('sub_tags: tag query returned `%s`', q) if q is not None: if resolve: result[tag]['value'] = str(q) else: warning('sub_tags: I wont return `%s`=`None`' , tag) except Exception as err: warning('sub_tags: plain: `%s` -> `%s`', repr(err), err) # no tag mapping exist, test name against the globbing list else: info('sub_tags: searching for `%s` in globbing tags', tag) # iter on globbing patterns, and get helper references # process the first globbing tag that match then exit # because there should not exist two globbing pattern # matching one tag, ideally for pattern, handler in self.VM_TAG_GLOB.iteritems(): try: # helper is available on the current VM if handler[0](sub, tag): if fnmatchcase(tag, pattern): debug('sub_tags: processing tag `%s` with ' 'pattern `%s`' % (tag, pattern)) # get tags from helper htags = handler[1](sub, tag) # FIXME intead of extracting one tag, try # not to build the whole list. Maybe it's # too difficult and not worth implementing if tag in htags: info('sub_tags: found tag in helper ' 'result with value `%s`' % htags[tag]) result[tag] = {} result[tag]['ttl'] = handler[2] if resolve: result[tag]['value'] = str( htags[tag]) break except Exception as err: warning('sub_tags: glob: `%s` -> `%s`', repr(err), err) except Exception as err: warning('sub_tags: global: `%s` -> `%s`' % (repr(err), err)) return result ################################## # Host control ################################## @pure def node_shutdown(self, reboot=True, gracefull=True): ''' ''' warning('node_shutdown: server requested shutdown of local host') info('node_shutdown: reboot=%s gracefull=%s', reboot, gracefull) if reboot: method = 'power_reboot' if gracefull else 'power_force_reboot' else: method = 'power_shutdown' if gracefull else 'power_off' if hasattr(self._host_handle, method): result = getattr(self._host_handle, method)() info('node_shutdown: in progress ... action returned `%s`', result) return result else: error('node_shutdown: unable to proceed, this feature is not available') raise NotImplementedError('host handler has no method `%s`' %method) @pure def execute_command(self, command): ''' ''' error('execute_command: starting execution of `%s`' % command) output = self._host_handle.execute(command) info('execute_command: finished execution of `%s`' % command) return output ################################## # VM management ################################## @pure def vm_define(self, data, format='xml'): ''' ''' warning('vm_define: server requested creation of a new VM') debug('vm_define: with description data `%s`' % data) if hasattr(self._host_handle, 'vm_define'): name = self._host_handle.vm_define(data) debug('vm_define: new VM has name `%s`' % name) return name else: raise NotImplementedError('host do not support VM creation') @pure def vm_undefine(self, name): ''' ''' warning('vm_undefine: server requested deletion of `%s`' % name) if hasattr(self._host_handle, 'vm_get'): vm = self._host_handle.vm_get(name) if hasattr(vm, 'undefine'): vm.undefine() else: raise NotImplementedError('VM object has not method `undefine`') else: raise NotImplementedError('host do not support ') @pure def vm_export(self, name, format='xml'): ''' ''' warning('vm_export: server requested configuration of `%s`' % name) if hasattr(self._host_handle, 'vm_get'): vm = self._host_handle.vm_get(name) if hasattr(vm, 'get_config'): return vm.get_config() else: raise NotImplementedError('VM object has not method `get_config`') else: raise NotImplementedError('host handler has not method `vm_get`') @pure def vm_stop(self, vm_names=None, force=False): ''' ''' warning('vm_stop: server requested stop of `%s`' % vm_names) debug('vm_stop: force stop is `%s`' % force) if vm_names is None: vm_names = self._host_handle.vm_list_running() debug('vm_stop: no vm specified, expanded list to `%s`' % vm_names) for vm_name in vm_names: try: debug('vm_stop: fetching vm data for `%s`' % vm_name) vm = self._host_handle.vm_get(vm_name) if force: debug('vm_stop: powering off `%s`' % vm_name) vm.power_off() else: info('vm_stop: shutdown requested for `%s`' % vm_name) vm.power_shutdown() except: pass @pure def vm_start(self, vm_names=None): ''' ''' warning('vm_start: server requested start of `%s`' % vm_names) if vm_names is None: vm_names = self._host_handle.vm_list_stopped() debug('vm_start: no vm specified, expanded list to `%s`' % vm_names) for vm_name in vm_names: try: debug('vm_start: fetching vm data for `%s`' % vm_name) vm = self._host_handle.vm_get(vm_name) info('vm_start: powering on `%s`' % vm_name) vm.power_on() except: pass @pure def vm_suspend(self, vm_names=None): ''' ''' warning('vm_suspend: server requested suspend of `%s`' % vm_names) if vm_names is None: vm_names = self._host_handle.vm_list_running() debug('vm_suspend: no vm specified, expanded list to `%s`' % vm_names) for vm_name in vm_names: try: debug('vm_suspend: fetching vm data for `%s`' % vm_name) vm = self._host_handle.vm_get(vm_name) info('vm_suspend: pause execution of `%s`' % vm_name) vm.power_suspend() except: pass @pure def vm_resume(self, vm_names=None): ''' ''' warning('vm_resume: server requested resume of `%s`' % vm_names) if vm_names is None: vm_names = self._host_handle.vm_list_running() debug('vm_resume: no vm specified, expanded list to `%s`'% vm_names) for vm_name in vm_names: try: debug('vm_resume: fetching vm data for `%s`' % vm_name) vm = self._host_handle.vm_get(vm_name) info('vm_resume: resume execution of `%s`' % vm_name) vm.power_resume() except: pass @pure def execute_command(self, command): ''' ''' warning('execute_command: starting execution of `%s`' % command) output = self._host_handle.execute(command) warning('execute_command: finished execution of `%s`' % command) return output