Newer
Older
from logging import debug, error, warning, info
from sjrpc.utils import RpcHandler
from sjrpc.utils import pure
from jobs import ReceiveFileJob, SendFileJob
from __init__ import __version__
try:
import kvm
except ImportError:
_MOD_KVM = False
_MOD_XEN = True
try:
import xen
except ImportError:
_MOD_XEN = False
class NodeHandler(RpcHandler):
'''
Main node handler that exports the host capabilities to the server.
'''
def __init__(self, connection, detect_hv=True, safe_mode=True,
force_xen=False):
super(NodeHandler, self).__init__()
self._connection = connection
self._host_handle = None
if detect_hv:
debug('Hypervisor detection in progress')
debug('Initializing connection to the local KVM hypervisor')
self._host_handle = kvm.KvmHypervisor()
elif _MOD_XEN:
debug('Initializing connection to the local Xen hypervisor')
self._host_handle = xen.XenHypervisor()
if self._host_handle is None:
debug('Hypervisor detection failed')
if not detect_hv or self._host_handle is None:
debug('Hypervisor detection disabled, running as regular node')
self._host_handle = LocalHost()
# methods that execute administrative commands, to be banned when
# running in safe mode
self.UNSAFE_METHODS = ['execute_command', 'shutdown']
self.HV_TAG_MANDATORY = ['h']
self.HV_TAG_MAP = {
'version' : ( lambda o: True,
lambda o,t: str(__version__),
-1),
'libvirtver': self._tag_map_direct('get_libvirt_version', -1),
'htype' : self._tag_map_direct('get_hv_type', -1),
'hserial' : self._tag_map_direct('get_hw_serial', -1),
'hvendor' : self._tag_map_direct('get_hw_vendor', -1),
'arch' : self._tag_map_direct('get_arch', -1),
'hvm' : self._tag_map_direct('get_hvm_available', -1),
'cpu' : self._tag_map_direct('get_cpu', -1),
'cpulogical': self._tag_map_direct('get_cpu_thread', -1),
'chaserial' : self._tag_map_direct('get_chassis_serial', -1),
'chaasset' : self._tag_map_direct('get_chassis_asset', -1),
# one day
'hbios' : self._tag_map_direct('get_hw_bios', 24*3600),
'hvver' : self._tag_map_direct('get_hv_version', 24*3600),
'platform' : self._tag_map_direct('get_platform', 24*3600),
'os' : self._tag_map_direct('get_system', 24*3600),
'uname' : self._tag_map_direct('get_uname', 24*3600),
'cpufreq' : self._tag_map_direct('get_cpu_frequency', 24*3600),
'mem' : self._tag_map_direct('get_mem', 24*3600),
'disk' : self._tag_map_keys('get_disks', 24*3600),
'h' : self._tag_map_direct('get_name', 24*3600),
# one hour
# one minute
'memfree' : self._tag_map_direct('get_mem_free', 60),
'memused' : self._tag_map_direct('get_mem_used', 60),
'sto' : ( lambda o: hasattr(o, 'storage'),
lambda o,t: ' '.join(
getattr(o, 'storage')().pool_list()),
60),
# 5 seconds
'uptime' : self._tag_map_direct('get_uptime', 5),
'cpuuse' : self._tag_map_direct('get_cpu_usage', 5),
'load' : self._tag_map_direct('get_loadavg', 5),
'nvm' : self._tag_map_counter('vm_list', 5),
'vmstarted' : self._tag_map_counter('vm_list_running', 5),
'vmstopped' : self._tag_map_counter('vm_list_stopped', 5),
'vmpaused' : self._tag_map_counter('vm_list_paused', 5),
}
self.HV_TAG_GLOB = {
'disk*' : self._tag_map_helper(self._helper_hv_disk, 24*3600),
'sto*' : self._tag_map_helper(self._helper_hv_sto, 60),
self.VM_TAG_MANDATORY = ['hv', 'h']
self.VM_TAG_MAP = {
'hv' : ( lambda o: hasattr(o, 'hypervisor'),
lambda o,t: o.hypervisor().get_name(),
-1),
'htype' : ( lambda o: hasattr(o, 'hypervisor'),
lambda o,t: o.hypervisor().get_hv_type(),
-1),
'arch' : self._tag_map_direct('get_arch', -1),
'h' : self._tag_map_direct('get_name', -1),
# one day
# one hour
'cpu' : self._tag_map_direct('get_cpu_core', 3600),
'mem' : self._tag_map_direct('get_mem', 3600),
'memmax' : self._tag_map_direct('get_mem_max', 3600),
'vncport' : self._tag_map_direct('get_vnc_port', 3600),
'status' : ( lambda o: True,
lambda o,t: 'running' if o.is_active()
else 'paused' if o.is_paused()
else 'stopped',
5), # FIXME crappy tag implementation
#'cpuuse' : self._tag_map_direct('get_cpu_usage'),
}
self.VM_TAG_GLOB = {
'disk*' : self._tag_map_helper(self._helper_vm_disk, 3600),
'nic*' : self._tag_map_helper(self._helper_vm_nic, 3600),
self._register_vm = []
def __getitem__(self, name):
'''
'''
# filter the private members access
if name.startswith('_'):
raise KeyError('Remote name `%s` is private' % repr(name))
# filter command execution methods
elif not self._safe_mode and name in self.UNSAFE_METHODS:
raise KeyError('Remote name `%s` is disabled by configuration'
% repr(name))
else:
debug('Called %s.%s' % (self.__class__.__name__, name))
return super(NodeHandler, self).__getitem__(name)
'''
'''
return ( lambda o: hasattr(o, method),
'''
'''
return ( lambda o: hasattr(o, method),
'''
'''
return ( lambda o: hasattr(o, method),
lambda o,t: ' '.join(getattr(o, method)().keys()),
ttl)
'''
'''
return ( lambda o, resolve=False: helper(o, resolve=resolve),
lambda o, tag_name=None, resolve=False:
helper(o, tag_name=tag_name, resolve=resolve),
ttl)
def _helper_hv_disk(self, hv, tag_name=None, resolve=True):
'''
'''
result = {}
disks = hv.get_disks()
if len(disks):
result['disk'] = ' '.join(disks.keys())
for name, size in disks.iteritems():
result['disk%s_size' % name] = str(size)
if not result:
result = None
return result
def _helper_hv_sto(self, hv, tag_name=None, resolve=True):
'''
'''
result = {}
if hasattr(hv, 'storage'):
pools = hv.storage().pool_list()
if len(pools):
result['sto'] = ' '.join(pools)
for pool_name in pools:
pool = hv.storage().pool_get(pool_name)
capa = pool.get_space_capacity()
result['sto%s_size' % pool_name] = str(capa)
free = pool.get_space_free()
result['sto%s_free' % pool_name] = str(free)
used = pool.get_space_used()
result['sto%s_used' % pool_name] = str(used)
vol = pool.volume_list()
if vol:
result['sto%s_vol' % pool_name] = ' '.join(vol)
if not result:
result = None
return result
def _helper_vm_disk(self, vm, tag_name=None, resolve=True):
'''
'''
result = {}
volumes = vm.get_volumes()
if len(volumes):
result['disk'] = ' '.join([str(i) for i in range(0, len(volumes))])
for vol_id, vol in enumerate(volumes):
name = vol.get_name()
if name:
result['disk%i_vol' % vol_id] = str(name)
pool = vol.get_pool()
if pool:
result['disk%i_pool' % vol_id] = str(pool.name())
path = vol.get_path()
if path:
result['disk%i_path' % vol_id] = str(path)
capa = vol.get_space_capacity()
result['disk%i_size' % vol_id] = str(capa)
if not result:
result = None
return result
def _helper_vm_nic(self, vm, tag_name=None, resolve=True):
'''
'''
result = {}
nics = vm.get_nics()
if len(nics):
result['nic'] = ' '.join([str(i) for i in range(0, len(nics))])
for nic_id, nic in enumerate(nics):
mac = nic.get('mac')
if mac:
result['nic%i_mac' % nic_id] = str(mac)
model = nic.get('model')
if model:
result['nic%i_model' % nic_id] = str(model)
source = nic.get('source')
if source:
result['nic%i_source' % nic_id] = str(source)
if not result:
result = None
return result
if hasattr(self._host_handle, 'scheduler_run'):
self._host_handle.scheduler_run()
# (un)register sub nodes if this host has the capability
if hasattr(self._host_handle, 'vm_list'):
try:
vm_current = self._host_handle.vm_list()
for vm in vm_current:
if vm not in self._register_vm:
try:
info('registering vm `%s`' % vm)
self._connection.get_server().register(vm, 'vm')
except RpcError as e:
if e.exception == '#FIXME':
self._register_vm.append(vm)
else:
raise e
self._register_vm.append(vm)
for vm in self._register_vm:
if vm not in vm_current:
try:
info('unregistering vm `%s`' % vm)
self._connection.get_server().unregister(vm)
except RpcError as e:
if e.exception == '#FIXME':
self._register_vm.remove(vm)
else:
raise e
self._register_vm.remove(vm)
except Exception as e:
debug("REGISTER except `%s`:`%s`" % (repr(e), e))
pass
##################################
# Tag query
##################################
'''
'''
result = {}
info('server requested tags=`%s` noresolve_tags=`%s`', tags,
noresolve_tags)
# build a single dict of tags, boolean means "resolve"
mytags = {}
if tags:
for t in tags:
mytags[t] = True
if noresolve_tags:
for t in noresolve_tags:
if t not in mytags:
mytags[t] = False
# return all tags if server does not request a subset
# add simple tags
for t in self.HV_TAG_MAP.keys():
mytags[t] = True
# add globbing tags
for pattern, handler in self.HV_TAG_GLOB.iteritems():
try:
# helper is available on the current host
if handler[0](self._host_handle):
# get tags from helper
htags = handler[0](self._host_handle, resolve=False)
# append all tags
for t in htags:
mytags[t] = True
except Exception as err:
debug('error adding globbing tags `%r`:`%s`', err, err)
debug('no tag specified, expanded list to `%s`', mytags.keys())
# add mandatory tags if missing in the list, or set noresolve
for t in self.HV_TAG_MANDATORY:
if t not in mytags or not mytags[t]:
# query host
info('query host with tag list `%s`', mytags.keys())
# first, test tag name againts list of plain name
if tag in self.HV_TAG_MAP:
debug('A1: plain mapping found for tag `%s`', tag)
try:
if self.HV_TAG_MAP[tag][0](self._host_handle):
debug('B1: tag `%s` is available on host', tag)
result[tag] = {}
result[tag]['ttl'] = self.HV_TAG_MAP[tag][2]
if resolve:
# fetch tag data
q = self.HV_TAG_MAP[tag][1](self._host_handle, tag)
debug('D1: host returned `%s`=`%s`', tag, q)
if q is not None:
# append tag data
result[tag]['value'] = str(q)
else:
debug('E1: I wont return `%s`=`None`', tag)
debug('tag `%s` is not implemented !!!', tag)
except Exception as err:
debug('error with tag mapping `%r`:`%s`', err, err)
# if no direct tag mapping exists, test name against globbing
# a list
debug('A2: searching for `%s` in globbing tags', tag)
# iter on globbing patterns, and get helper references
# process the first globbing tag that match then exit bcause
# there should not exist two globbing pattern matching
# one tag, ideally
for pattern, handler in self.HV_TAG_GLOB.iteritems():
try:
# helper is available on the current host
if handler[0](self._host_handle, tag):
if fnmatchcase(tag, pattern):
debug('C2: processing tag `%s` with pattern `%s`',
tag, pattern)
# get tags from helper
htags = handler[1](self._host_handle, tag)
# FIXME intead of extracting one tag, try not
# to build the whole list. Maybe it's too
# difficult and not worth to implement
if tag in htags:
debug('D2: found tag in helper result with'
' value `%s`', htags[tag])
result[tag] = {}
result[tag]['ttl'] = handler[2]
result[tag]['value'] = str(htags[tag])
break
except Exception as err:
debug('error with globbing tag `%r`:`%s`', err, err)
return result
def _sub_tag_list(self, sub_obj):
'''
'''
result = []
# add simple tags
result.extend(self.VM_TAG_MAP.keys())
# add globbing tags
for pattern, handler in self.VM_TAG_GLOB.iteritems():
try:
# helper is available on the current host
if handler[0](sub_obj):
debug('sub node implements `%s`' % pattern)
# get tags from helper
htags = handler[0](sub_obj, resolve=False)
# append all tags
for t in htags.keys():
result.append(t)
except Exception as err:
debug('error while listing sub node tags `%r`:`%s`', err, err)
return result
@pure
def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
'''
'''
info('server requested tags for `%s` with tags=`%s` noresolve_tags=`%s`'
, sub_id, tags, noresolve_tags)
debug('sub node `%s` is unknown !', sub_id)
raise HypervisorError('sub node `%s` is unknown' % sub_id)
sub = self._host_handle.vm_get(sub_id)
# build a single list of tags
debug('server requested tags `%s` + `%s`', tags, noresolve_tags)
available_tags = self._sub_tag_list(sub)
mytags = {}
# return all resolved tags case
if tags is None and noresolve_tags is None:
for t in available_tags:
mytags[t] = True
elif tags is None or noresolve_tags is None:
if tags is None:
for t in available_tags:
mytags[t] = True
for t in noresolve_tags:
mytags[t] = False
else:
for t in available_tags:
mytags[t] = False
for t in tags:
mytags[t] = True
else:
for t in noresolve_tags:
mytags[t] = False
for t in tags:
mytags[t] = True
debug('expanded list to `%s`', mytags.keys())
# add mandatory tags if missing in the list, or set noresolve
for t in self.VM_TAG_MANDATORY:
if t not in mytags or not mytags[t]:
debug('add/correct mandatory tag `%s`' % t)
mytags[t] = True
# query the subnode
result = {}
try:
# query the VM with each tag
for tag, resolve in mytags.iteritems():
# first, search tag in plain mappings
if tag in self.VM_TAG_MAP:
debug('A1: plain mapping found for tag `%s`', tag)
try:
if self.VM_TAG_MAP[tag][0](sub):
result[tag] = {}
result[tag]['ttl'] = self.VM_TAG_MAP[tag][2]
if resolve:
# call the wrapper mapping lambda
q = self.VM_TAG_MAP[tag][1](sub, tag)
debug('C1: tag query returned `%s`=`%s`', tag, q)
if q is not None:
if resolve:
result[tag]['value'] = str(q)
else:
warning('D1: I wont return `%s`=`None`', tag)
except Exception as err:
debug('error with plain mapping `%r`:`%s`', err, err)
# no tag mapping exist, test name against the globbing list
else:
debug('A2: searching for `%s` in globbing tags', tag)
# iter on globbing patterns, and get helper references
# process the first globbing tag that match then exit
# because there should not exist two globbing pattern
# matching one tag, ideally
for pattern, handler in self.VM_TAG_GLOB.iteritems():
try:
# helper is available on the current VM
if handler[0](sub, tag):
if fnmatchcase(tag, pattern):
debug('B2: processing tag `%s` with pattern'
' `%s`' % (tag, pattern))
# get tags from helper
htags = handler[1](sub, tag)
# FIXME intead of extracting one tag, try
# not to build the whole list. Maybe it's
# too difficult and not worth implementing
if tag in htags:
debug('C2: found tag in helper result with'
' value `%s`' % htags[tag])
result[tag] = {}
result[tag]['ttl'] = handler[2]
if resolve:
result[tag]['value'] = str(
htags[tag])
break
except Exception as err:
debug('error with globbing tag `%r`:`%s`', err, err)
except Exception as err:
return result
##################################
# Host control
##################################
@pure
def node_shutdown(self, reboot=True, gracefull=True):
'''
'''
info('server requested shutdown of local host with options '
'reboot=`%s` gracefull=`%s`', reboot, gracefull)
if reboot:
method = 'power_reboot' if gracefull else 'power_force_reboot'
else:
method = 'power_shutdown' if gracefull else 'power_off'
if hasattr(self._host_handle, method):
result = getattr(self._host_handle, method)()
info('in progress... action returned `%s`', result)
return result
else:
info('unable to proceed, this feature is not available')
raise NotImplementedError('host handler has no method `%s`' %method)
@pure
def execute_command(self, command):
'''
'''
info('starting execution of `%s`' % command)
info('finished execution of `%s`' % command)
##################################
# VM management
##################################
@pure
def vm_define(self, data, format='xml'):
'''
'''
info('server requested creation of a new VM')
debug('description data is `%s`', data)
if hasattr(self._host_handle, 'vm_define'):
name = self._host_handle.vm_define(data)
return name
else:
raise NotImplementedError('host do not support VM creation')
@pure
def vm_undefine(self, name):
'''
'''
info('server requested deletion of `%s`', name)
if hasattr(self._host_handle, 'vm_get'):
vm = self._host_handle.vm_get(name)
if hasattr(vm, 'undefine'):
vm.undefine()
else:
raise NotImplementedError('VM object has not method `undefine`')
else:
raise NotImplementedError('host do not support ')
@pure
def vm_export(self, name, format='xml'):
'''
'''
info('server requested configuration of `%s`', name)
if hasattr(self._host_handle, 'vm_get'):
vm = self._host_handle.vm_get(name)
if hasattr(vm, 'get_config'):
return vm.get_config()
else:
raise NotImplementedError('VM object has not method `get_config`')
else:
raise NotImplementedError('host handler has not method `vm_get`')
def vm_stop(self, vm_names=None, force=False):
'''
'''
info('server requested stop of `%s`, force is `%s`', vm_names, force)
if vm_names is None:
vm_names = self._host_handle.vm_list_running()
debug('no vm specified, expanded list to `%s`', vm_names)
for vm_name in vm_names:
try:
debug('fetching vm data for `%s`', vm_name)
vm = self._host_handle.vm_get(vm_name)
if force:
vm.power_off()
else:
debug('shutdown requested for `%s`', vm_name)
vm.power_shutdown()
except:
pass
@pure
def vm_start(self, vm_names=None):
'''
'''
info('server requested start of `%s`', vm_names)
if vm_names is None:
vm_names = self._host_handle.vm_list_stopped()
debug('no vm specified, expanded list to `%s`', vm_names)
for vm_name in vm_names:
try:
debug('fetching vm data for `%s`', vm_name)
vm = self._host_handle.vm_get(vm_name)
vm.power_on()
except:
pass
@pure
def vm_suspend(self, vm_names=None):
'''
'''
info('server requested suspend of `%s`', vm_names)
if vm_names is None:
vm_names = self._host_handle.vm_list_running()
debug('no vm specified, expanded list to `%s`',vm_names)
for vm_name in vm_names:
try:
debug('fetching vm data for `%s`', vm_name)
vm = self._host_handle.vm_get(vm_name)
vm.power_suspend()
except:
pass
@pure
def vm_resume(self, vm_names=None):
'''
'''
info('server requested resume of `%s`', vm_names)
if vm_names is None:
vm_names = self._host_handle.vm_list_paused()
debug('no vm specified, expanded list to `%s`', vm_names)
for vm_name in vm_names:
try:
debug('fetching vm data for `%s`', vm_name)
vm = self._host_handle.vm_get(vm_name)
vm.power_resume()
except:
pass
##################################
# Storage control
##################################
size = int(size)
if hasattr(self._host_handle, 'storage'):
pool = self._host_handle.storage().pool_get(pool)
pool.volume_create(name, size)
else:
raise NotImplementedError('host handler has no storage support')
@pure
def vol_delete(self, pool, name, wipe=False):
pool = self._host_handle.storage().pool_get(pool)
vol = pool.volume_get(name)
if wipe:
vol.wipe()
vol.delete()
else:
raise NotImplementedError('host handler has no storage support')
@pure
def vol_export(self, pool, name, raddr, rport):
if hasattr(self._host_handle, 'storage'):
# get device path info
sto = self._host_handle.storage()
vol_path = sto.pool_get(pool).volume_get(name).get_path()
# create job
job = SendFileJob(jmgr, vol_path, raddr, rport)
job.start_now()
jid = job.get_id()
# wait for job completion
while self._host_handle.jobmgr.is_job_running(jid):
sleep(2)
# return job report
res = {}
res['id'] = jid
res['log'] = job.get_log()
if jmgr.is_job_finished(jid):
res['checksum'] = job.get_checksum()
return res
else:
raise NotImplementedError('host handler has no storage support')
@pure
def vol_import(self, pool, name):
'''
'''
if hasattr(self._host_handle, 'storage'):
# get device path info
sto = self._host_handle.storage()
vol_path = sto.pool_get(pool).volume_get(name).get_path()
# create job
job = ReceiveFileJob(self._host_handle.jobmgr, vol_path)
# prepare job now, so we can fetch the port number below
job.prepare()
job.start_now()
# return job info
res = {}
res['id'] = job.get_id()
res['port'] = job.get_port()
return res
else:
raise NotImplementedError('host handler has no storage support')
@pure
def vol_import_wait(self, jid):
'''
'''
jid = int(jid)
jmgr = self._host_handle.jobmgr
while jmgr.is_job_running(jid):
sleep(2)
# get the job
job = jmgr.get_job(jid)
# return job report
res = {}
res['id'] = job.get_id()
res['log'] = job.get_log()
if jmgr.is_job_finished(job.get_id()):
res['checksum'] = job.get_checksum()
return res
@pure
def vol_import_list(self):
raise NotImplementedError()
@pure
def vol_import_cancel(self, jid):
jmgr = self._host_handle.jobmgr
jmgr.cancel(jid)
##################################
# Job management
##################################
@pure
def job_list(self):
'''
List all existing jobs sorted by state, with ID and type.
'''
return self._host_handle.jobmgr.list()
@pure
def job_log(self, jid):
'''
Get log messages of a given job (human friendly text string with
carriage return).
'''
# get the job
job = self._host_handle.jobmgr.get_job(jid)
# return log string
return job.get_log()