Skip to content
Commits on Source (59)
*.pyc *.pyc
doc/_build/* doc/_build/*
*.swp *.swp
*.swo
*.log *.log
test_*.py test_*.py
.ropeproject/ .ropeproject/
...@@ -10,7 +10,7 @@ from getpass import getpass ...@@ -10,7 +10,7 @@ from getpass import getpass
from pwd import getpwnam from pwd import getpwnam
from grp import getgrnam from grp import getgrnam
from optparse import OptionParser from optparse import OptionParser
from ccserver.conf import CCConf from cloudcontrol.server.conf import CCConf
DEFAULT_ACCOUNT_DIRECTORY = '/var/lib/cc-server/' DEFAULT_ACCOUNT_DIRECTORY = '/var/lib/cc-server/'
DEFAULT_ROLE = 'cli' DEFAULT_ROLE = 'cli'
...@@ -65,7 +65,7 @@ if __name__ == '__main__': ...@@ -65,7 +65,7 @@ if __name__ == '__main__':
if options.god: if options.god:
conf.add_right(args[0], '', '*', 'allow', 0) conf.add_right(args[0], '', '*', 'allow', 0)
# Chown the files: # Chown the files:
uid = getpwnam(CHOWN_USER).pw_uid uid = getpwnam(CHOWN_USER).pw_uid
gid = getgrnam(CHOWN_GROUP).gr_gid gid = getgrnam(CHOWN_GROUP).gr_gid
......
#!/usr/bin/env python #!/usr/bin/env python
#coding=utf8 #coding=utf8
from cloudcontrol.common.helpers.logger import patch_logging; patch_logging()
import os import os
import sys import sys
import atexit import atexit
...@@ -13,10 +15,9 @@ from pwd import getpwnam ...@@ -13,10 +15,9 @@ from pwd import getpwnam
from grp import getgrnam from grp import getgrnam
from daemon import DaemonContext from daemon import DaemonContext
from daemon.pidlockfile import PIDLockFile
from ccserver.ccserver import CCServer from cloudcontrol.server.server import CCServer
from ccserver import __version__ from cloudcontrol.server import __version__
DEFAULT_CONFIG_FILE = '/etc/cc-server.conf' DEFAULT_CONFIG_FILE = '/etc/cc-server.conf'
...@@ -26,7 +27,7 @@ DEFAULT_CONFIGURATION = { ...@@ -26,7 +27,7 @@ DEFAULT_CONFIGURATION = {
'user': '', 'user': '',
'group': '', 'group': '',
'pidfile': '', 'pidfile': '',
'umask': '0177', 'umask': '077',
'port': 1984, 'port': 1984,
'debug': False, 'debug': False,
'account_db': None, # None = mandatory option 'account_db': None, # None = mandatory option
...@@ -34,7 +35,7 @@ DEFAULT_CONFIGURATION = { ...@@ -34,7 +35,7 @@ DEFAULT_CONFIGURATION = {
'ssl_cert': None, 'ssl_cert': None,
'ssl_key': None, 'ssl_key': None,
'maxcon': 600, 'maxcon': 600,
'maxidle': 30, 'maxidle': 120,
} }
...@@ -69,16 +70,21 @@ def run_server(options): ...@@ -69,16 +70,21 @@ def run_server(options):
if options['stdout']: if options['stdout']:
handler = logging.StreamHandler() handler = logging.StreamHandler()
fmt = EncodingFormatter('[%(asctime)s] '
'\x1B[30;47m%(name)s\x1B[0m '
'\x1B[30;42m%(levelname)s\x1B[0m: '
'%(message)s')
else: else:
facility = logging.handlers.SysLogHandler.LOG_DAEMON facility = logging.handlers.SysLogHandler.LOG_DAEMON
handler = logging.handlers.SysLogHandler(address='/dev/log', handler = logging.handlers.SysLogHandler(address='/dev/log',
facility=facility) facility=facility)
fmt = EncodingFormatter('%(name)s: %(levelname)s %(message)s')
fmt = EncodingFormatter('cc-server (%(name)s): %(levelname)s %(message)s')
handler.setFormatter(fmt) handler.setFormatter(fmt)
logger.addHandler(handler) logger.addHandler(handler)
server = CCServer(conf_dir=options['account_db'], server = CCServer(logger.getChild('cc-server'),
conf_dir=options['account_db'],
maxcon=int(options['maxcon']), maxcon=int(options['maxcon']),
maxidle=int(options['maxidle']), maxidle=int(options['maxidle']),
port=int(options['port']), port=int(options['port']),
......
#!/usr/bin/env python
#coding=utf8
'''
CloudControl server libraries.
'''
__version__ = '24~dev'
#!/usr/bin/env python
#coding=utf8
'''
Local client representation classes.
'''
from threading import Lock
from datetime import datetime
from sjrpc.utils import ConnectionProxy
class CCClient(object):
'''
Represent a single client connected to the server.
'''
def __init__(self, login, role, server, connection):
# The login of the client:
self.login = login
# The role of the client:
self.role = role
# The server binded to this client:
self.server = server
# The connection of the client (public attribute):
self.connection = connection
# The date of connection of the client:
self._connection_date = datetime.now()
# The date of the last action:
self._last_action = datetime.now()
# The connection proxy:
self.proxy = ConnectionProxy(connection)
# Jobs lock for this client:
self.lock = Lock()
def get_uptime(self):
'''
Get the uptime of the client connection in seconds.
:return: uptime of the client
'''
dt = datetime.now() - self._connection_date
return dt.seconds + dt.days * 86400
def get_idle(self):
'''
Get the idle time of the client connection in seconds.
:return: idle of the client
'''
dt = datetime.now() - self._last_action
return dt.seconds + dt.days * 86400
def top(self):
'''
Reset the last_action date to now.
'''
self._last_action = datetime.now()
def get_ip(self):
peer = self.connection.getpeername()
return ':'.join(peer.split(':')[:-1])
def shutdown(self):
'''
Shutdown the connection to the client.
'''
self.server.rpc.unregister(self.connection, shutdown=True)
def get_tags(self):
'''
Get all server defined tags.
'''
tags = {}
tags['con'] = self.get_uptime()
tags['ip'] = self.get_ip()
return tags
def get_remote_tags(self, tag):
return self.connection.call('get_tags', (tag,))[tag]
from ccserver.clients import Client
class BootstrapClient(Client):
""" A bootstrap client connected to the cc-server.
"""
ROLE = 'bootstrap'
Client.register_client_class(BootstrapClient)
#!/usr/bin/env python
#coding=utf8
'''
Jobs management on the server.
'''
from __future__ import absolute_import
import re
import logging
import time
from datetime import datetime
from threading import Thread, Lock
from sjrpc.core import AsyncWatcher
from ccserver.exceptions import (BadJobTypeError, UnknownJobError, JobError,
UnknownObjectError)
from ccserver.utils import AcquiresAllOrNone
class JobCancelError(Exception):
'''
Exception used by jobs to stop it when a cancel signal is sent.
'''
pass
class BaseJob(dict, Thread, object):
'''
A base class to define a job.
The standards job items are:
* id: id of the job
* status: message explaining the current job status
* done: True if the job is done
* cancelled: True if job has been cancelled by user
* created: job date of creation
* ended: job date of end (or None if done = False)
* duration: duration in seconds of the job (processed on export)
* author: author login of the job
:param manager: the :class:`JobsManager` instance.
'''
def __init__(self, manager, *args, **kwargs):
# Initialize the inherited classes:
dict.__init__(self, *args, **kwargs)
Thread.__init__(self)
# The manager of this job:
self.manager = manager
# Define default job properties:
self['status'] = 'pending'
self['done'] = False
self['cancelled'] = False
self['created'] = datetime.now()
self['ended'] = None
self['duration'] = 0
#~ assert self.get('author') is not None, 'author is not defined'
# List of actions to do by the rollback method:
self._wayback = []
# Set the thread name:
self.name = 'job-%s' % self['id']
def __hash__(self):
return self['id'].__hash__()
def __setitem__(self, key, value):
if key == 'id':
raise KeyError('Key %r in read-only.' % key)
else:
super(BaseJob, self).__setitem__(key, value)
def __delitem__(self, key):
if key == 'id':
raise KeyError('Key %r in read-only.' % key)
else:
super(BaseJob, self).__delitem__(key)
def report(self, status, done=None):
'''
Report the status of the job.
:param status: the status to set to the job
:param done: is the job done, None to keep current value
'''
self['status'] = status
if done is not None:
self['ended'] = datetime.now()
self['done'] = done
def run(self):
'''
Run the job itself.
'''
try:
self.job()
except JobCancelError as err:
self._rollback('%s' % err)
except Exception as err:
logging.error('Error while executing job: %s, %r', err, err)
self._rollback('%s' % err)
else:
self.report('success', done=True)
def job(self):
'''
Method to override to define the job's behavior.
'''
pass
def checkpoint(self, func=None):
'''
Check if job is not cancelled, else raise the CancellJobError. Also
add the provided function (optionnal) to the wayback list.
:param func: callable to add to the wayback list.
'''
if self['cancelled']:
raise JobCancelError('Job has been cancelled by user')
if func is not None:
self._wayback.append(func)
def _rollback(self, error):
'''
Rollback the job.
'''
self.report('rollbacking')
try:
for func in reversed(self._wayback):
func()
except Exception as err:
self.report('rollback failed: %s' % err, done=True)
else:
self.report('cancelled: %s' % error, done=True)
def cancel(self):
'''
Cancel the job.
.. note::
You can override this method to trigger an action when user cancel
the job.
'''
if self['done']:
raise JobError('Job is done')
if self['cancelled']:
raise JobError('Job is already cancelled')
self['cancelled'] = True
self.report('cancelling')
def export(self, props=None):
'''
Export the job in a simple dict format.
'''
exported = {}
for key, val in self.iteritems():
if key.startswith('_') or (props is not None and key not in props):
continue
if isinstance(val, datetime):
val = int(time.mktime(val.timetuple()))
if key == 'duration':
if self['done']:
dt = self['ended'] - self['created']
val = dt.seconds + dt.days * 86400
else:
now = datetime.now()
dt = now - self['created']
val = dt.seconds + dt.days * 86400
exported[key] = str(val)
return exported
class KillClientJob(BaseJob):
'''
A job used to kill connected accounts.
Mandatory items:
* account: the account login to kill
Optional items:
* gracetime: time before to kill the user
'''
def job(self):
gracetime = self.get('gracetime')
account = self.get('account')
assert account is not None, 'Account not specified'
if gracetime is not None:
time.sleep(int(gracetime))
self.checkpoint()
self.manager.server.kill(account)
class KillOldCliJob(BaseJob):
'''
Typically an hidden job used to kill clients who are connected/idle since
too much time.
Mandatory items:
* maxcon: maximum connection time in minutes
* maxidle: maximum idle time in minutes
Optional items:
* delay: delay in secondes between two checks (default 1m)
'''
DEFAULT_DELAY = 60
def job(self):
maxcon = self.get('maxcon')
assert maxcon is not None, 'maxcon is None'
maxidle = self.get('maxidle')
assert maxidle is not None, 'maxidle is None'
delay = self.get('delay', self.DEFAULT_DELAY)
while True:
self.checkpoint()
for client in self.manager.server.iterclients('cli'):
if client.uptime > (maxcon * 60):
self.manager.server.kill(client.login)
#TODO: handle idleing.
time.sleep(delay)
class BaseMigrationJob(BaseJob):
'''
Base class for migration jobs.
'''
def _check_status(self, vm_id, status):
'''
Check the status of the VM.
'''
answer = self.manager.server.list('id=%s&status=%s' % (vm_id, status))
return bool(answer)
class ColdMigrationJob(BaseMigrationJob):
'''
A cold vm migration job.
Mandatory items:
* vm_name: name of the vm to migrate
* hv_source: name of the hv which execute the VM
* hv_dest: the destination hypervisor
* author: login of the author cli
'''
def job(self):
vm_id = '%s.%s' % (self['hv_source'], self['vm_name'])
self['title'] = 'Cold migration %s --> %s' % (vm_id, self['hv_dest'])
logging.info('Job-%s: Started migration for %s', self['id'], vm_id)
# Cancel the job if the user has not the right to migrate the vm or to
# select an hypervisor as destination:
right_check = self.manager.server.check
tql = 'id=%s' % vm_id
if not right_check(self['author'], 'coldmigrate', tql):
raise JobCancelError('author have no rights to migrate this VM')
tql = 'id=%s' % self['hv_dest']
if not right_check(self['author'], 'coldmigrate_dest', tql):
raise JobCancelError('author have no right to migrate to this hv')
# Update the VM object:
vm = self.manager.server.db.get_by_id(vm_id)
if vm is None:
raise JobCancelError('Source VM not found')
# Get the source and destination hv clients:
try:
source = self.manager.server.get_client(self['hv_source'])
except KeyError:
raise JobCancelError('source hypervisor is not connected')
try:
dest = self.manager.server.get_client(self['hv_dest'])
except KeyError:
raise JobCancelError('destination hypervisor is not connected')
self.checkpoint()
self.report('waiting lock for source and dest hypervisors')
logging.info('Job-%s: Trying to acquire locks', self['id'])
with AcquiresAllOrNone(source.hvlock, dest.hvlock):
logging.info('Job-%s: Locks acquired', self['id'])
self.checkpoint()
if not vm['status'] == 'stopped':
raise JobCancelError('vm is not stopped')
# Create storages on destination:
self.report('create volumes')
for disk in vm.get('disk', '').split():
# Getting informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
size = vm.get('disk%s_size' % disk)
assert pool is not None, 'pool tag doesn\'t exists'
assert name is not None, 'name tag doesn\'t exists'
assert size is not None, 'size tag doesn\'t exists'
# Create the volume on destination:
dest.proxy.vol_create(pool, name, int(size))
logging.info('Job-%s: Created volume %s/%s on destination '
'hypervisor', self['id'], pool, name)
# Rollback stuff for this action:
def rb_volcreate():
dest.proxy.vol_delete(pool, name)
self.checkpoint(rb_volcreate)
# Define VM:
self.report('define vm')
logging.info('Job-%s: XML configuration transfert', self['id'])
vm_config = source.proxy.vm_export(self['vm_name'])
dest.proxy.vm_define(vm_config)
# Rollback stuff for vm definition:
def rb_define():
dest.proxy.vm_undefine(self['vm_name'])
self.checkpoint(rb_define)
# Copy all source disk on destination disk:
for disk in vm.get('disk', '').split():
self._copy_disk(source, dest, vm, disk)
# At this point, if operation is a success, all we need is just to
# cleanup source hypervisor from disk and vm. This operation *CAN'T*
# be cancelled or rollbacked if anything fails (unlikely). The
# migration must be considered as a success, and the only way to
# undo this is to start a new migration in the other way.
# Delete the rollback list.
# This is mandatory to avoid data loss if the cleanup
# code below fail.
self._wayback = []
# Cleanup the disks:
for disk in vm.get('disk', '').split():
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
source.proxy.vol_delete(pool, name)
# Cleanup the VM:
source.proxy.vm_undefine(self['vm_name'])
logging.info('Job-%s: Migration completed with success', self['id'])
def _copy_disk(self, source, dest, vm, disk):
'''
Copy the specified disk name of the vm from source to dest.
'''
# Get informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
logging.info('Job-%s: Started copy for %s/%s', self['id'], pool, name)
self.report('copy %s/%s' % (pool, name))
# Make the copy and wait for it end:
xferprop = dest.proxy.vol_import(pool, name)
# Register the cancel function:
def cancel_xfer():
dest.proxy.vol_import_cancel(xferprop['id'])
self['func_cancel_xfer'] = cancel_xfer
# Wait for the end of transfert:
watcher = AsyncWatcher()
watcher.register(source.conn, 'vol_export', pool, name, dest.ip, xferprop['port'])
watcher.register(dest.conn, 'vol_import_wait', xferprop['id'])
msgs = watcher.wait()
del self['func_cancel_xfer']
# Compare checksum of two answers:
checksums = []
assert len(msgs) == 2
for msg in msgs:
if msg.get('error') is not None:
msg = 'error while copy: %s' % msg['error']['message']
raise JobCancelError(msg)
else:
checksums.append(msg['return'].get('checksum'))
self.checkpoint()
if checksums[0] != checksums[1]:
raise JobCancelError('checksum mismatches')
def cancel(self):
if self.get('func_cancel_xfer') is not None:
self.get('func_cancel_xfer')()
super(ColdMigrationJob, self).cancel()
class HotMigrationJob(BaseMigrationJob):
'''
A hot vm migration job.
Mandatory items:
* vm_name: name of the vm to migrate
* hv_source: name of the hv which execute the VM
* hv_dest: the destination hypervisor
* author: login of the author cli
'''
def job(self):
vm_id = '%s.%s' % (self['hv_source'], self['vm_name'])
self['title'] = 'Hot migration %s --> %s' % (vm_id, self['hv_dest'])
logging.info('Job-%s: Started hot migration for %s', self['id'], vm_id)
# Cancel the job if the user has not the right to migrate the vm or to
# select an hypervisor as destination:
right_check = self.manager.server.check
tql = 'id=%s' % vm_id
if not right_check(self['author'], 'hotmigrate', tql):
raise JobCancelError('author have no rights to migrate this VM')
tql = 'id=%s' % self['hv_dest']
if not right_check(self['author'], 'hotmigrate_dest', tql):
raise JobCancelError('author have no right to migrate to this hv')
# Update the VM object:
vm = self.manager.server.db.get_by_id(vm_id)
if vm is None:
raise JobCancelError('Source VM not found')
# Get the source and destination hv clients:
try:
source = self.manager.server.get_client(self['hv_source'])
except KeyError:
raise JobCancelError('source hypervisor is not connected')
try:
dest = self.manager.server.get_client(self['hv_dest'])
except KeyError:
raise JobCancelError('destination hypervisor is not connected')
self.checkpoint()
self.report('waiting lock for source and dest hypervisors')
logging.info('Job-%s: Trying to acquire locks', self['id'])
with AcquiresAllOrNone(source.hvlock, dest.hvlock):
logging.info('Job-%s: Locks acquired', self['id'])
self.checkpoint()
if not self._check_status(vm_id, 'running'):
raise JobCancelError('vm is not started')
to_cleanup = []
# Create storages on destination and start synchronization:
disks = vm.get('disk', '').split()
for disk in disks:
to_cleanup += self._sync_disk(vm, disk, source, dest)
# Libvirt tunnel setup:
tunres_src = source.proxy.tun_setup()
def rb_tun_src():
source.proxy.tun_destroy(tunres_src)
self.checkpoint(rb_tun_src)
tunres_dst = dest.proxy.tun_setup(local=False)
def rb_tun_dst():
dest.proxy.tun_destroy(tunres_dst)
self.checkpoint(rb_tun_dst)
source.proxy.tun_connect(tunres_src, tunres_dst, dest.ip)
dest.proxy.tun_connect_hv(tunres_dst, migration=False)
# Migration tunnel setup:
migtunres_src = source.proxy.tun_setup()
def rb_migtun_src():
source.proxy.tun_destroy(migtunres_src)
self.checkpoint(rb_migtun_src)
migtunres_dst = dest.proxy.tun_setup(local=False)
def rb_migtun_dst():
dest.proxy.tun_destroy(migtunres_dst)
self.checkpoint(rb_migtun_dst)
source.proxy.tun_connect(migtunres_src, migtunres_dst, dest.ip)
dest.proxy.tun_connect_hv(migtunres_dst, migration=True)
# Initiate the live migration:
self.report('migration in progress')
source.proxy.vm_migrate_tunneled(self['vm_name'], tunres_src,
migtunres_src)
# At this point, if operation is a success, all we need is just to
# cleanup source hypervisor from disk and vm. This operation *CAN'T*
# be cancelled or rollbacked if anything fails (unlikely). The
# migration must be considered as a success, and the only way to
# undo this is to start a new migration in the other way.
# Delete the rollback list.
# This is mandatory to avoid data loss if the cleanup
# code below fail.
self.report('cleanup')
self._wayback = []
source.proxy.tun_destroy(tunres_src)
dest.proxy.tun_destroy(tunres_dst)
source.proxy.tun_destroy(migtunres_src)
dest.proxy.tun_destroy(migtunres_dst)
for cb_cleanup in reversed(to_cleanup):
cb_cleanup()
# Cleanup the disks:
for disk in disks:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
source.proxy.vol_delete(pool, name)
logging.info('Job-%s: Migration completed with success', self['id'])
def _sync_disk(self, vm, disk, source, dest):
to_cleanup = []
# getting informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
size = vm.get('disk%s_size' % disk)
assert pool is not None, 'pool tag doesn\'t exists'
assert name is not None, 'name tag doesn\'t exists'
assert size is not None, 'size tag doesn\'t exists'
status_msg = 'sync volume %s/%s (%%s)' % (pool, name)
self.report(status_msg % 'creation')
# create the volume on destination:
dest.proxy.vol_create(pool, name, int(size))
logging.info('job-%s: created volume %s/%s on destination '
'hypervisor', self['id'], pool, name)
# rollback stuff for this action:
def rb_volcreate():
dest.proxy.vol_delete(pool, name)
self.checkpoint(rb_volcreate)
# setup the drbd synchronization with each hypervisors:
self.report(status_msg % 'setup')
res_src = source.proxy.drbd_setup(pool, name)
def rb_setupsrc():
source.proxy.drbd_shutdown(res_src)
self.checkpoint(rb_setupsrc)
to_cleanup.append(rb_setupsrc)
res_dst = dest.proxy.drbd_setup(pool, name)
def rb_setupdst():
dest.proxy.drbd_shutdown(res_dst)
self.checkpoint(rb_setupdst)
to_cleanup.append(rb_setupdst)
# start connection of drbd:
self.report(status_msg % 'connect')
source.proxy.drbd_connect(res_src, res_dst, dest.ip)
dest.proxy.drbd_connect(res_dst, res_src, source.ip)
# setup topology as primary/secondary:
source.proxy.drbd_role(res_src, True)
dest.proxy.drbd_role(res_dst, False)
# Wait the end of the synchronization:
sync_running = True
while sync_running:
status = dest.proxy.drbd_sync_status(res_dst)
if status['done']:
sync_running = False
self.report(status_msg % 'sync %s%%' % status['completion'])
time.sleep(2)
dest.proxy.drbd_role(res_dst, True)
source.proxy.drbd_takeover(res_src, True)
def rb_takeover_src():
source.proxy.drbd_takeover(res_src, False)
self.checkpoint(rb_takeover_src)
to_cleanup.append(rb_takeover_src)
dest.proxy.drbd_takeover(res_dst, True)
def rb_takeover_dst():
dest.proxy.drbd_takeover(res_dst, False)
self.checkpoint(rb_takeover_dst)
to_cleanup.append(rb_takeover_dst)
return to_cleanup
class CloneJob(BaseMigrationJob):
'''
A clone job.
Mandatory items:
* vm_name: name of the vm to migrate
* new_vm_name: the name of the cloned vm
* hv_source: name of the hv which execute the VM
* hv_dest: the destination hypervisor
* author: login of the author cli
'''
def job(self):
vm_id = '%s.%s' % (self['hv_source'], self['vm_name'])
self['title'] = 'Clone %s --> %s' % (vm_id, self['hv_dest'])
logging.info('Job-%s: Started clone for %s', self['id'], vm_id)
# Cancel the job if the user has not the right to clone the vm or to
# select an hypervisor as destination:
right_check = self.manager.server.check
tql = 'id=%s' % vm_id
if not right_check(self['author'], 'clone', tql):
raise JobCancelError('author have no rights to migrate this VM')
tql = 'id=%s' % self['hv_dest']
if not right_check(self['author'], 'clone_dest', tql):
raise JobCancelError('author have no right to migrate to this hv')
# Update the VM object:
try:
self.manager.server.objects.update(ids=(vm_id,))
vm = self.manager.server.objects.get_by_id(vm_id)
except UnknownObjectError:
raise JobCancelError('Cloned VM not found')
# Get the source and destination hv clients:
try:
source = self.manager.server.get_connection(self['hv_source'])
except KeyError:
raise JobCancelError('source hypervisor is not connected')
try:
dest = self.manager.server.get_connection(self['hv_dest'])
except KeyError:
raise JobCancelError('destination hypervisor is not connected')
self.checkpoint()
self.report('waiting lock for source and dest hypervisors')
logging.info('Job-%s: Trying to acquire locks', self['id'])
with AcquiresAllOrNone(source.lock, dest.lock):
logging.info('Job-%s: Locks acquired', self['id'])
self.checkpoint()
# Create storages on destination:
old_new_disk_mapping = {} # Mapping between old and new disk names
self.report('create volumes')
for disk in vm.get('disk', '').split():
# Getting informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
size = vm.get('disk%s_size' % disk)
assert pool is not None, 'pool tag doesn\'t exists'
assert name is not None, 'name tag doesn\'t exists'
assert size is not None, 'size tag doesn\'t exists'
# Change the name of the disk:
old_name = name
if name.startswith(self['vm_name']):
suffix = name[len(self['vm_name']):]
name = self['new_vm_name'] + suffix
else:
name = '%s_%s' % (self['new_vm_name'], name)
fulloldname = '/dev/%s/%s' % (pool, old_name)
fullnewname = '/dev/%s/%s' % (pool, name)
old_new_disk_mapping[fulloldname] = fullnewname
# Create the volume on destination:
dest.proxy.vol_create(pool, name, int(size))
logging.info('Job-%s: Created volume %s/%s on destination '
'hypervisor (was %s)', self['id'], pool, name,
old_name)
# Rollback stuff for this action:
def rb_volcreate():
dest.proxy.vol_delete(pool, name)
self.checkpoint(rb_volcreate)
# Define VM:
self.report('define vm')
logging.info('Job-%s: XML configuration transfert', self['id'])
vm_config = source.proxy.vm_export(self['vm_name'])
# Change vm configuration XML to update it with new name:
new_vm_config = self._update_xml(vm_config, self['vm_name'],
self['new_vm_name'],
old_new_disk_mapping)
dest.proxy.vm_define(new_vm_config)
# Rollback stuff for vm definition:
def rb_define():
dest.proxy.vm_undefine(name)
self.checkpoint(rb_define)
# Copy all source disk on destination disk:
for disk in vm.get('disk', '').split():
self._copy_disk(source, dest, vm, disk, name)
logging.info('Job-%s: Clonage completed with success', self['id'])
def _update_xml(self, vm_config, old_name, name, old_new_name_mapping):
'''
Update the XML definition of the VM with the new vm name, the new vm
disk, and remove the uuid tag.
'''
vm_config = vm_config.replace('<name>%s</name>' % old_name,
'<name>%s</name>' % name)
vm_config = re.sub('<uuid>.*?</uuid>\n', '', vm_config)
# delete MAC address, then it will be regenerated by libvirt
vm_config = re.sub('<mac .*?/>\n', '', vm_config)
for old, new in old_new_name_mapping.iteritems():
vm_config = vm_config.replace("='%s'" % old,
"='%s'" % new)
return vm_config
def _copy_disk(self, source, dest, vm, disk, new_disk):
'''
Copy the specified disk name of the vm from source to dest.
'''
# Get informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
logging.info('Job-%s: Started copy for %s/%s to %s/%s',
self['id'], pool, name, pool, new_disk)
self.report('copy %s/%s' % (pool, name))
# Make the copy and wait for it end:
xferprop = dest.proxy.vol_import(pool, new_disk)
# Register the cancel function:
def cancel_xfer():
dest.proxy.vol_import_cancel(xferprop['id'])
self['func_cancel_xfer'] = cancel_xfer
# Wait for the end of transfert:
cids = set()
cids.add(source.connection.async_call('vol_export', pool, name,
dest.get_ip(), xferprop['port']))
cids.add(dest.connection.async_call('vol_import_wait', xferprop['id']))
msgs = self.manager.server.manager.wait(frozenset(cids))
del self['func_cancel_xfer']
# Compare checksum of two answers:
checksums = []
assert len(msgs) == 2
for msg in msgs:
if msg.get('error') is not None:
msg = 'error while copy: %s' % msg['error']['message']
raise JobCancelError(msg)
else:
checksums.append(msg.get('checksum'))
self.checkpoint()
if checksums[0] != checksums[1]:
raise JobCancelError('checksum mismatches')
def cancel(self):
if self.get('func_cancel_xfer') is not None:
self.get('func_cancel_xfer')()
super(CloneJob, self).cancel()
class JobsManager(object):
'''
Manage the current job list.
:param server: The :class:`CCServer` instance.
'''
JOBS_TYPES = {
'kill': KillClientJob,
'kill_oldcli': KillOldCliJob,
'cold_migrate': ColdMigrationJob,
'hot_migrate': HotMigrationJob,
'clone': CloneJob,
}
def __init__(self, server):
# The main job dict, the keys are id of jobs.
self._jobs = {}
# The server:
self.server = server
# The id of the next job and it's lock:
self._current_id = 1
self._current_id_lock = Lock()
def create(self, jtype, **kwargs):
'''
Create a new job.
:param jtype: the type of the new job
:param \*\*kwargs: arguments to pass to the job
:raise BadJobTypeError: when invalid jtype is passed
'''
jobid = self.get_id()
jobtype = JobsManager.JOBS_TYPES.get(jtype)
if jobtype is None:
raise BadJobTypeError('Invalid job type %r' % jtype)
job = jobtype(self, id=jobid, **kwargs)
self._jobs[jobid] = job
job.daemon = True
job.start()
return job
def get_id(self):
'''
Get the current id and increment the counter.
'''
with self._current_id_lock:
jobid = self._current_id
self._current_id += 1
return jobid
def cancel(self, jobid):
'''
Cancel the provided job.
'''
job = self._jobs.get(jobid)
if job is None:
raise UnknownJobError('Invalid job id: %r' % jobid)
elif job.get('_hidden', False):
raise UnknownJobError('Invalid job id: %r (hidden)' % jobid)
else:
job.cancel()
def purge(self):
'''
Purge all done jobs.
'''
for job in self._jobs.values():
if job['done']:
del self._jobs[job['id']]
def iterjobs(self, show_done=True, show_running=True):
'''
Iter over jobs.
:param done: If set, iterate over done or not done jobs.
'''
for job in self._jobs.itervalues():
if (show_done and job['done'] or show_running and not job['done']
and not job.get('_hidden')):
yield job
#!/usr/bin/env python
#coding=utf8
'''
OrderedSet Python implementation.
This snippet of code is taken from http://code.activestate.com/recipes/576694/
Written by Raymond Hettinger's and licenced under the MIT Licence.
Comments:
Runs on Py2.6 or later (and runs on 3.0 or later without any modifications).
Implementation based on a doubly linked link and an internal dictionary.
This design gives OrderedSet the same big-Oh running times as regular sets
including O(1) adds, removes, and lookups as well as O(n) iteration.
'''
import collections
KEY, PREV, NEXT = range(3)
class OrderedSet(collections.MutableSet):
def __init__(self, iterable=None):
self.end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.map = {} # key --> [key, prev, next]
if iterable is not None:
self |= iterable
def __len__(self):
return len(self.map)
def __contains__(self, key):
return key in self.map
def add(self, key):
if key not in self.map:
end = self.end
curr = end[PREV]
curr[NEXT] = end[PREV] = self.map[key] = [key, curr, end]
def discard(self, key):
if key in self.map:
key, prev, next = self.map.pop(key)
prev[NEXT] = next
next[PREV] = prev
def __iter__(self):
end = self.end
curr = end[NEXT]
while curr is not end:
yield curr[KEY]
curr = curr[NEXT]
def __reversed__(self):
end = self.end
curr = end[PREV]
while curr is not end:
yield curr[KEY]
curr = curr[PREV]
def pop(self, last=True):
if not self:
raise KeyError('set is empty')
key = next(reversed(self)) if last else next(iter(self))
self.discard(key)
return key
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, list(self))
def __eq__(self, other):
if isinstance(other, OrderedSet):
return len(self) == len(other) and list(self) == list(other)
return set(self) == set(other)
def __del__(self):
self.clear() # remove circular references
__import__('pkg_resources').declare_namespace(__name__)
""" CloudControl server libraries.
"""
__version__ = '24~rc1'
...@@ -4,15 +4,13 @@ This package store classes representing each client's role and the associated ...@@ -4,15 +4,13 @@ This package store classes representing each client's role and the associated
sjRPC handler. sjRPC handler.
""" """
import logging
from datetime import datetime from datetime import datetime
from sjrpc.utils import ConnectionProxy from sjrpc.utils import ConnectionProxy
from ccserver.handlers import CCHandler, listed from cloudcontrol.server.handlers import CCHandler, listed
from ccserver.exceptions import RightError from cloudcontrol.server.exceptions import RightError
from ccserver.db import RemoteTag from cloudcontrol.server.db import RemoteTag
from cloudcontrol.common.tql.db.tag import CallbackTag from cloudcontrol.common.tql.db.tag import CallbackTag
...@@ -26,7 +24,7 @@ class RegisteredCCHandler(CCHandler): ...@@ -26,7 +24,7 @@ class RegisteredCCHandler(CCHandler):
return super(RegisteredCCHandler, self).__getitem__(name) return super(RegisteredCCHandler, self).__getitem__(name)
def on_disconnect(self, conn): def on_disconnect(self, conn):
logging.info('Client %s disconnected', self.client.login) self.logger.info('Client %s disconnected', self.client.login)
self.client.shutdown() self.client.shutdown()
def check(self, method, tql=None): def check(self, method, tql=None):
...@@ -88,29 +86,28 @@ class Client(object): ...@@ -88,29 +86,28 @@ class Client(object):
ROLE = None ROLE = None
RPC_HANDLER = RegisteredCCHandler RPC_HANDLER = RegisteredCCHandler
KILL_ALREADY_CONNECTED = False
roles = {} roles = {}
def __init__(self, login, server, connection, tql_object): def __init__(self, logger, login, server, connection):
self.logger = logger
self._login = login self._login = login
self._server = server self._server = server
self._connection = connection self._connection = connection
self._tql_object = tql_object
self._handler = self.RPC_HANDLER(self) self._handler = self.RPC_HANDLER(self)
self._proxy = ConnectionProxy(self._connection.rpc) self._proxy = ConnectionProxy(self._connection.rpc)
self._last_action = datetime.now() self._last_action = datetime.now()
self._connection_date = datetime.now() self._connection_date = datetime.now()
self._tql_object = None
# Set the role's handler for the client:
self._connection.rpc.set_handler(self._handler)
# Remote tags registered: # Remote tags registered:
self._remote_tags = set() self._remote_tags = set()
# Register the server defined client tags: def _get_tql_object(self):
self._tql_object.register(CallbackTag('con', lambda: self.uptime, ttl=0)) """ Get the TQL object of the client from the cc-server tql database.
self._tql_object.register(CallbackTag('idle', lambda: self.idle, ttl=0)) """
self._tql_object.register(CallbackTag('ip', lambda: self.ip)) return self._server.db.get(self.login)
@classmethod @classmethod
def register_client_class(cls, class_): def register_client_class(cls, class_):
...@@ -119,8 +116,8 @@ class Client(object): ...@@ -119,8 +116,8 @@ class Client(object):
cls.roles[class_.ROLE] = class_ cls.roles[class_.ROLE] = class_
@classmethod @classmethod
def from_role(cls, role, login, server, connection, tql_object): def from_role(cls, role, logger, login, server, connection):
return cls.roles[role](login, server, connection, tql_object) return cls.roles[role](logger, login, server, connection)
# #
# Properties # Properties
...@@ -144,6 +141,12 @@ class Client(object): ...@@ -144,6 +141,12 @@ class Client(object):
""" """
return self._login return self._login
@property
def role(self):
""" Return the role of this client.
"""
return self.ROLE
@property @property
def server(self): def server(self):
""" Return the cc-server binded to this client. """ Return the cc-server binded to this client.
...@@ -182,16 +185,27 @@ class Client(object): ...@@ -182,16 +185,27 @@ class Client(object):
peer = self.conn.getpeername() peer = self.conn.getpeername()
return ':'.join(peer.split(':')[:-1]) return ':'.join(peer.split(':')[:-1])
def get_tags(self, tags): # DEPRECATED def attach(self):
""" Get tags on the remote node. """ Attach the client to the server.
:param tags: tags is the list of tags to fetch
""" """
return self._connection.call('get_tags', tags)
# Set the role's handler for the client:
self.conn.rpc.set_handler(self._handler)
# Register the client in tql database:
self._tql_object = self._get_tql_object()
# Register the server defined client tags:
self._tql_object.register(CallbackTag('con', lambda: self.uptime, ttl=0))
self._tql_object.register(CallbackTag('idle', lambda: self.idle, ttl=0))
self._tql_object.register(CallbackTag('ip', lambda: self.ip))
def shutdown(self): def shutdown(self):
""" Shutdown the connection to the client. """ Shutdown the connection to the client.
""" """
# Disable the client handler:
self.conn.rpc.set_handler(None)
# Unregister all remote tags: # Unregister all remote tags:
for tag in self._remote_tags.copy(): for tag in self._remote_tags.copy():
self.tags_unregister(tag) self.tags_unregister(tag)
...@@ -209,14 +223,11 @@ class Client(object): ...@@ -209,14 +223,11 @@ class Client(object):
""" """
self._last_action = datetime.now() self._last_action = datetime.now()
def get_remote_tags(self, tag): # DEPRECATED
return self.conn.call('get_tags', (tag,))[tag]
def async_remote_tags(self, watcher, robj, tags): def async_remote_tags(self, watcher, robj, tags):
""" Asynchronously update tags from the remote client using """ Asynchronously update tags from the remote client using
specified watcher. specified watcher.
""" """
watcher.register(self.conn, 'get_tags', tags, _data=robj) watcher.register(self.conn, 'get_tags', tags, _data=(tags, robj))
def tags_register(self, name, ttl=None, value=None): def tags_register(self, name, ttl=None, value=None):
""" Register a new remote tag for the client. """ Register a new remote tag for the client.
......
from cloudcontrol.server.clients import Client
from cloudcontrol.server.clients.host import HostClient
from cloudcontrol.server.db import SObject
from cloudcontrol.common.tql.db.tag import StaticTag
class BootstrapClient(HostClient):
""" A bootstrap client connected to the cc-server.
"""
ROLE = 'bootstrap'
def _get_tql_object(self):
tql_object = SObject(self.login)
tql_object.register(StaticTag('r', self.role))
self._server.db.register(tql_object)
return tql_object
@property
def login(self):
return '%s.%s' % (self._login, self.conn.get_fd())
@property
def role(self):
return 'host'
def shutdown(self):
super(BootstrapClient, self).shutdown()
# Also, remote the object from the db:
self._server.db.unregister(self.login)
Client.register_client_class(BootstrapClient)
import logging
from collections import defaultdict
from sjrpc.core import RpcError from sjrpc.core import RpcError
from sjrpc.core.protocols import TunnelProtocol
from cloudcontrol.common.datastructures.orderedset import OrderedSet
from ccserver.orderedset import OrderedSet from cloudcontrol.server.conf import CCConf
from ccserver.conf import CCConf from cloudcontrol.server.exceptions import (ReservedTagError, BadObjectError,
from ccserver.exceptions import (AlreadyRegistered, AuthenticationError, BadRoleError, NotConnectedAccountError,
RightError, ReservedTagError, BadObjectError, CloneError)
BadRoleError, NotConnectedAccountError, from cloudcontrol.server.election import Elector
CloneError)
from ccserver.election import Elector from cloudcontrol.server.handlers import listed, Reporter
from cloudcontrol.server.clients import Client, RegisteredCCHandler
from ccserver.handlers import listed, Reporter
from ccserver.clients import Client, RegisteredCCHandler
from cloudcontrol.common.tql.db.tag import StaticTag from cloudcontrol.common.tql.db.tag import StaticTag
MIGRATION_TYPES = {'cold': 'cold_migrate', MIGRATION_TYPES = {'cold': 'cold_migrate',
...@@ -21,38 +16,44 @@ MIGRATION_TYPES = {'cold': 'cold_migrate', ...@@ -21,38 +16,44 @@ MIGRATION_TYPES = {'cold': 'cold_migrate',
class CliHandler(RegisteredCCHandler): class CliHandler(RegisteredCCHandler):
""" Handler binded to 'cli' role. """ Handler binded to the 'cli' role.
Summary of methods: Summary of methods:
================ ================================ ============= .. currentmodule:: cloudcontrol.server.clients.cli
Method name Description Right(s)
================ ================================ ============= .. autosummary::
list list objects list
start start a vm start CliHandler.list
stop stop a vm stop CliHandler.start
destroy destroy a vm destroy CliHandler.stop
pause suspend a vm pause CliHandler.destroy
resume resume a paused vm resume CliHandler.pause
passwd change password of accounts passwd CliHandler.resume
addaccount add a new account addaccount CliHandler.passwd
copyaccount copy an account addaccount CliHandler.addaccount
addtag add a tag to accounts addtag CliHandler.copyaccount
deltag remove a tag from accounts deltag CliHandler.addtag
tags show tags of accounts tags CliHandler.deltag
delaccount delete an account delaccount CliHandler.tags
close close an account close CliHandler.delaccount
declose declose an account declose CliHandler.close
kill kill a connected account kill CliHandler.declose
rights show rights of accounts rights CliHandler.kill
addright add right rules to accounts addright CliHandler.rights
delright remove right rules from accounts delright CliHandler.addright
execute execute remote command on hosts execute CliHandler.delright
shutdown shutdown a connected client shutdown CliHandler.execute
jobs show jobs jobs CliHandler.shutdown
cancel cancel a running job cancel CliHandler.jobs
jobspurge remove done jobs from jobs list jobspurge CliHandler.cancel
================ ================================ ============= CliHandler.jobspurge
CliHandler.console
CliHandler.rshell
CliHandler.rshell_resize
CliHandler.rshell_wait
CliHandler.forward
CliHandler.dbstats
""" """
@listed @listed
...@@ -63,7 +64,7 @@ class CliHandler(RegisteredCCHandler): ...@@ -63,7 +64,7 @@ class CliHandler(RegisteredCCHandler):
""" """
self.check('list', query) self.check('list', query)
logging.debug('Executed list function with query %s', query) self.logger.debug('Executed list function with query %s', query)
objects = self.server.list(query) objects = self.server.list(query)
order = OrderedSet(['id']) order = OrderedSet(['id'])
#if tags is not None: #if tags is not None:
...@@ -140,10 +141,7 @@ class CliHandler(RegisteredCCHandler): ...@@ -140,10 +141,7 @@ class CliHandler(RegisteredCCHandler):
self.check('undefine', query) self.check('undefine', query)
#FIXME: When tag globbing will be implemented, the list of tags to objects = self.server.list(query, show=('r', 'p', 'h', 'disk*',))
# show will be: r, p, h, disk*
# I ask "all tags" pending implementation.
objects = self.server.list(query, show=('*',))
errs = Reporter() errs = Reporter()
for obj in objects: for obj in objects:
if obj['r'] != 'vm': if obj['r'] != 'vm':
...@@ -234,7 +232,6 @@ class CliHandler(RegisteredCCHandler): ...@@ -234,7 +232,6 @@ class CliHandler(RegisteredCCHandler):
errs = Reporter() errs = Reporter()
with self.conf: with self.conf:
for obj in objects: for obj in objects:
print obj
if 'a' not in obj: if 'a' not in obj:
errs.error(obj['id'], 'not an account') errs.error(obj['id'], 'not an account')
continue continue
...@@ -246,7 +243,7 @@ class CliHandler(RegisteredCCHandler): ...@@ -246,7 +243,7 @@ class CliHandler(RegisteredCCHandler):
' to %s' % (tags[tag_name], tag_value)) ' to %s' % (tags[tag_name], tag_value))
# Update the object db (update the tag value): # Update the object db (update the tag value):
dbobj = self.server.db.get(obj['id']) dbobj = self.server.db.get(obj['id'])
dbobj[tag_name].set_value(tag_value) dbobj[tag_name].value = tag_value
else: else:
errs.success(obj['id'], 'tag created') errs.success(obj['id'], 'tag created')
# Update the object db (create the tag): # Update the object db (create the tag):
...@@ -355,6 +352,8 @@ class CliHandler(RegisteredCCHandler): ...@@ -355,6 +352,8 @@ class CliHandler(RegisteredCCHandler):
errs.success(obj['id'], 'closed') errs.success(obj['id'], 'closed')
self.server.conf.add_tag(obj['a'], 'close', 'yes') self.server.conf.add_tag(obj['a'], 'close', 'yes')
dbobj = self.server.db.get(obj['id'])
dbobj.register(StaticTag('close', 'yes'), override=True)
self.server.jobs.create('kill', author=self.client.login, self.server.jobs.create('kill', author=self.client.login,
account=obj['a'], gracetime=1) account=obj['a'], gracetime=1)
...@@ -379,9 +378,11 @@ class CliHandler(RegisteredCCHandler): ...@@ -379,9 +378,11 @@ class CliHandler(RegisteredCCHandler):
tags = self.conf.show(obj['a'])['tags'] tags = self.conf.show(obj['a'])['tags']
if 'close' in tags: if 'close' in tags:
errs.success(obj['id'], 'account declosed') errs.success(obj['id'], 'account declosed')
self.conf.remove_tag(obj['a'], 'close')
dbobj = self.server.db.get(obj['id'])
dbobj.unregister('close', override=True)
else: else:
errs.warn(obj['id'], 'account not closed') errs.warn(obj['id'], 'account not closed')
self.conf.remove_tag(obj['a'], 'close')
return errs.get_dict() return errs.get_dict()
...@@ -667,10 +668,36 @@ class CliHandler(RegisteredCCHandler): ...@@ -667,10 +668,36 @@ class CliHandler(RegisteredCCHandler):
'hv_dest': dest['id'], 'hv_dest': dest['id'],
'author': self.client.login}) 'author': self.client.login})
@listed
def console(self, tql):
""" Start a remote console on object matching the provided tql.
:param tql: tql matching only one object on which start the console
:return: the label of the created tunnel
"""
self.check('console', tql)
objects = self.server.list(tql, show=('r', 'p', 'h'))
if len(objects) != 1:
raise NotImplementedError('Console only support one tunnel at time for now')
errs = Reporter()
for obj in objects:
if obj['r'] in ('vm',):
client = self.server.get_client(obj['p'])
srv_to_host_tun = client.console(obj['h'])
cli_tun = self.client.register_tunnel('console', client, srv_to_host_tun)
errs.success(obj['id'], 'tunnel started.', output=cli_tun.label)
else:
errs.error(obj['id'], 'bad role')
return errs.get_dict()
@listed @listed
def rshell(self, tql): def rshell(self, tql):
""" Start a remote shell on object matching the provided tql. """ Start a remote shell on object matching the provided tql.
:param tql: tql matching only one object on which start the rshell
:return: the label of the created tunnel
""" """
self.check('rshell', tql)
objects = self.server.list(tql, show=('r', 'p')) objects = self.server.list(tql, show=('r', 'p'))
if len(objects) != 1: if len(objects) != 1:
raise NotImplementedError('Rshell only support one tunnel at time for now') raise NotImplementedError('Rshell only support one tunnel at time for now')
...@@ -681,16 +708,21 @@ class CliHandler(RegisteredCCHandler): ...@@ -681,16 +708,21 @@ class CliHandler(RegisteredCCHandler):
srv_to_host_tun = client.rshell() srv_to_host_tun = client.rshell()
cli_tun = self.client.register_tunnel('rshell', client, srv_to_host_tun) cli_tun = self.client.register_tunnel('rshell', client, srv_to_host_tun)
errs.success(obj['id'], 'tunnel started.', output=cli_tun.label) errs.success(obj['id'], 'tunnel started.', output=cli_tun.label)
elif obj['r'] in ('vm', ):
raise NotImplementedError('rshell on vm not implemented')
else: else:
errs.error(obj['id'], 'bad role') errs.error(obj['id'], 'bad role')
return errs.get_dict() return errs.get_dict()
@listed @listed
def rshell_resize(self, label, row, col, xpixel, ypixel): def rshell_resize(self, label, row, col, xpixel, ypixel):
""" Resize the shell. """ Send a resize event to the remote shell's tty.
:param label: label of the rshell tunnel to resize
:param row: number of rows
:param col: number of columns
:param xpixel: unused
:param ypixel: unused
""" """
self.check('rshell')
ttype, client, ctun, stun = self.client.get_tunnel(label) ttype, client, ctun, stun = self.client.get_tunnel(label)
if ttype != 'rshell': if ttype != 'rshell':
raise ValueError('Label does not refers on a rshell') raise ValueError('Label does not refers on a rshell')
...@@ -700,6 +732,7 @@ class CliHandler(RegisteredCCHandler): ...@@ -700,6 +732,7 @@ class CliHandler(RegisteredCCHandler):
def rshell_wait(self, label): def rshell_wait(self, label):
""" Wait for a remote shell termination. """ Wait for a remote shell termination.
""" """
self.check('rshell')
ttype, client, ctun, stun = self.client.get_tunnel(label) ttype, client, ctun, stun = self.client.get_tunnel(label)
if ttype != 'rshell': if ttype != 'rshell':
raise ValueError('Label does not refers on a rshell') raise ValueError('Label does not refers on a rshell')
...@@ -707,12 +740,38 @@ class CliHandler(RegisteredCCHandler): ...@@ -707,12 +740,38 @@ class CliHandler(RegisteredCCHandler):
rcode = client.rshell_wait(stun.label) rcode = client.rshell_wait(stun.label)
except Exception as err: except Exception as err:
rcode = -1 rcode = -1
logging.warning('Unexpected exit of tunnel: %s', err) self.logger.warning('Unexpected exit of tunnel: %s', err)
self.client.unregister_tunnel(ctun.label) self.client.unregister_tunnel(ctun.label)
ctun.close() ctun.close()
stun.close() stun.close()
return rcode return rcode
@listed
def forward(self, label, login, port, destination='127.0.0.1'):
""" Forward a TCP port to the client.
:param label: label of the tunnel created by the client (cli side)
:param login: login of the remote client on which establish the tunnel
:param port: port on which establish the tunnel on destination
:param destination: tunnel destination (from the remote client side)
"""
self.check('forward', 'id=%s' % login)
# Create the tunnel to the node:
try:
host_client = self.server.get_client(login)
except KeyError:
raise KeyError('Specified client is not connected')
s2n_tun = host_client.forward(port, destination)
# Create tunnel to the CLI
self.client.register_tunnel('forward', host_client, s2n_tun)
@listed
def dbstats(self):
""" Get statistics about tql database.
"""
return self.server.db.stats()
def forward_call(self, login, func, *args, **kwargs): def forward_call(self, login, func, *args, **kwargs):
""" Forward a call to a connected client and return result. """ Forward a call to a connected client and return result.
...@@ -732,6 +791,7 @@ class CliClient(Client): ...@@ -732,6 +791,7 @@ class CliClient(Client):
ROLE = 'cli' ROLE = 'cli'
RPC_HANDLER = CliHandler RPC_HANDLER = CliHandler
KILL_ALREADY_CONNECTED = True
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(CliClient, self).__init__(*args, **kwargs) super(CliClient, self).__init__(*args, **kwargs)
...@@ -757,9 +817,21 @@ class CliClient(Client): ...@@ -757,9 +817,21 @@ class CliClient(Client):
return ctun return ctun
def get_tunnel(self, label): def get_tunnel(self, label):
""" Get the tunnel binded to the provided label.
:return: a tuple (type, remote_client, tunnel, remote_client_tunnel)
where: **type** is a string provided on tunnel creation,
**remote_client** the client object of the remote client on which
the tunnel is established, **tunnel** the cli-to-server tunnel
object from the sjRpc, **remote_client_tunnel** the
server-to-remote-client tunnel object from the sjRpc.
"""
return self._tunnels[label] return self._tunnels[label]
def unregister_tunnel(self, label): def unregister_tunnel(self, label):
del self._tunnels[label] try:
del self._tunnels[label]
except KeyError:
pass
Client.register_client_class(CliClient) Client.register_client_class(CliClient)
from ccserver.clients import Client from cloudcontrol.server.clients import Client
class HostClient(Client): class HostClient(Client):
...@@ -13,6 +13,13 @@ class HostClient(Client): ...@@ -13,6 +13,13 @@ class HostClient(Client):
def execute(self, command): def execute(self, command):
return self.conn.call('execute_command', command) return self.conn.call('execute_command', command)
def console(self, name):
""" Start a remote console on the specified vm.
"""
label = self.proxy.vm_open_console(name)
tun = self.conn.create_tunnel(label=label)
return tun
def rshell(self): def rshell(self):
""" Start a remote shell on the host. """ Start a remote shell on the host.
""" """
...@@ -28,5 +35,12 @@ class HostClient(Client): ...@@ -28,5 +35,12 @@ class HostClient(Client):
""" """
return self.proxy.rshell_wait(label, _timeout=None) return self.proxy.rshell_wait(label, _timeout=None)
def forward(self, port, destination='127.0.0.1'):
""" Create a forwarding tunnel on this client and return it.
"""
tun = self.conn.create_tunnel()
self.proxy.forward(tun.label, port, destination)
return tun
Client.register_client_class(HostClient) Client.register_client_class(HostClient)
import threading import threading
from ccserver.handlers import listed from cloudcontrol.server.handlers import listed
from ccserver.clients import Client, RegisteredCCHandler from cloudcontrol.server.clients import Client, RegisteredCCHandler
from ccserver.clients.host import HostClient from cloudcontrol.server.clients.host import HostClient
from ccserver.db import RemoteTag from cloudcontrol.server.db import RemoteTag
from cloudcontrol.common.tql.db.object import TqlObject from cloudcontrol.common.tql.db.object import TqlObject
from cloudcontrol.common.tql.db.tag import StaticTag, CallbackTag from cloudcontrol.common.tql.db.tag import StaticTag
from functools import partial from functools import partial
...@@ -17,8 +17,7 @@ class HypervisorHandler(RegisteredCCHandler): ...@@ -17,8 +17,7 @@ class HypervisorHandler(RegisteredCCHandler):
@listed @listed
def register(self, obj_id, role): def register(self, obj_id, role):
''' """ Register an object managed by the calling node.
Register an object managed by the calling node.
.. note: .. note:
the obj_id argument passed to this handler is the object id of the the obj_id argument passed to this handler is the object id of the
...@@ -27,13 +26,12 @@ class HypervisorHandler(RegisteredCCHandler): ...@@ -27,13 +26,12 @@ class HypervisorHandler(RegisteredCCHandler):
:param obj_id: the id of the object to register :param obj_id: the id of the object to register
:param role: the role of the object to register :param role: the role of the object to register
''' """
self.client.register(obj_id, role) self.client.register(obj_id, role)
@listed @listed
def unregister(self, obj_id): def unregister(self, obj_id):
''' """ Unregister an object managed by the calling node.
Unregister an object managed by the calling node.
.. note: .. note:
the obj_id argument passed to this handler is the object id of the the obj_id argument passed to this handler is the object id of the
...@@ -41,7 +39,7 @@ class HypervisorHandler(RegisteredCCHandler): ...@@ -41,7 +39,7 @@ class HypervisorHandler(RegisteredCCHandler):
preprend the id by "node_id." itself). preprend the id by "node_id." itself).
:param obj_id: the id of the object to unregister :param obj_id: the id of the object to unregister
''' """
self.client.unregister(obj_id) self.client.unregister(obj_id)
@listed @listed
...@@ -58,8 +56,7 @@ class HypervisorHandler(RegisteredCCHandler): ...@@ -58,8 +56,7 @@ class HypervisorHandler(RegisteredCCHandler):
@listed @listed
def sub_tags_unregister(self, obj_id, name): def sub_tags_unregister(self, obj_id, name):
""" """ Unregister a remote tag for a child of the client.
Unregister a remote tag for a child of the client.
:param obj_id: child name :param obj_id: child name
:param name: name of the tag to unregister :param name: name of the tag to unregister
...@@ -160,8 +157,7 @@ class HvClient(HostClient): ...@@ -160,8 +157,7 @@ class HvClient(HostClient):
self._children[obj_id].register(tag) self._children[obj_id].register(tag)
def sub_tags_unregister(self, obj_id, name): def sub_tags_unregister(self, obj_id, name):
""" """ Unregister a remote tag for a child of the client.
Unregister a remote tag for a child of the client.
:param obj_id: child name :param obj_id: child name
:param name: name of the tag to unregister :param name: name of the tag to unregister
...@@ -198,7 +194,7 @@ class HvClient(HostClient): ...@@ -198,7 +194,7 @@ class HvClient(HostClient):
""" Asynchronously update sub tags from the remote client using """ Asynchronously update sub tags from the remote client using
specified watcher. specified watcher.
""" """
watcher.register(self.conn, 'sub_tags', obj_id, tags, _data=robj) watcher.register(self.conn, 'sub_tags', obj_id, tags, _data=(tags, robj))
Client.register_client_class(HvClient) Client.register_client_class(HvClient)
import logging from cloudcontrol.server.clients import Client, RegisteredCCHandler
from cloudcontrol.server.handlers import listed
from ccserver.clients import Client, RegisteredCCHandler
from ccserver.handlers import listed
class SpvHandler(RegisteredCCHandler): class SpvHandler(RegisteredCCHandler):
...@@ -15,7 +13,7 @@ class SpvHandler(RegisteredCCHandler): ...@@ -15,7 +13,7 @@ class SpvHandler(RegisteredCCHandler):
:param query: the query to select objects to show :param query: the query to select objects to show
""" """
logging.debug('Executed list function with query %s', query) self.logger.debug('Executed list function with query %s', query)
objects = self.server.list(query) objects = self.server.list(query)
return {'objects': objects} return {'objects': objects}
......
#!/usr/bin/env python """ This module provide an abstraction to the clients configuration directory.
#coding=utf8
'''
This module provide an abstraction to the clients configuration directory.
The client configuration directory contains a list of ``.json`` files, each The client configuration directory contains a list of ``.json`` files, each
file contains the configuration for one client. The username of the client is file contains the configuration for one client. The username of the client is
...@@ -33,24 +29,22 @@ u'node' ...@@ -33,24 +29,22 @@ u'node'
'tags': {}, 'tags': {},
'perms': None} 'perms': None}
>>> conf.remove_account('rms') >>> conf.remove_account('rms')
>>> >>>
''' """
import hashlib import hashlib
import base64 import base64
import random import random
import threading import threading
import logging
import json import json
import os import os
import re import re
from functools import wraps from functools import wraps
def writer(func): def writer(func):
''' """ Decorator used to threadsafize methods that made write operations on
Decorator used to threadsafize methods that made write operations on client configuration tree.
client configuration tree. """
'''
@wraps(func) @wraps(func)
def f(self, *args, **kwargs): def f(self, *args, **kwargs):
...@@ -60,11 +54,10 @@ def writer(func): ...@@ -60,11 +54,10 @@ def writer(func):
return f return f
class CCConf(object): class CCConf(object):
''' """ Create a new configuration interface.
Create a new configuration interface.
:param path_directory: the directory to store the configuration files :param path_directory: the directory to store the configuration files
''' """
CONF_TEMPLATE = {'password': None, CONF_TEMPLATE = {'password': None,
'role': None, 'role': None,
...@@ -73,7 +66,8 @@ class CCConf(object): ...@@ -73,7 +66,8 @@ class CCConf(object):
RE_SALTPW = re.compile(r'{(?P<method>[A-Z]+)}(?P<password>.+)') RE_SALTPW = re.compile(r'{(?P<method>[A-Z]+)}(?P<password>.+)')
def __init__(self, path_directory): def __init__(self, logger, path_directory):
self.logger = logger
self._path = path_directory self._path = path_directory
self._lock = threading.RLock() self._lock = threading.RLock()
...@@ -81,65 +75,61 @@ class CCConf(object): ...@@ -81,65 +75,61 @@ class CCConf(object):
return self._lock.__enter__() return self._lock.__enter__()
def __exit__(self, *args, **kwargs): def __exit__(self, *args, **kwargs):
return self._lock.__exit__(*args, **kwargs) return self._lock.__exit__(*args, **kwargs)
def _get_conf(self, login): def _get_conf(self, login):
''' """
Return the configuration of a client by its login. Return the configuration of a client by its login.
:param login: login of the client :param login: login of the client
:return: the configuration of the client :return: the configuration of the client
:raise CCConf.UnknownAccount: if user login is unknown :raise CCConf.UnknownAccount: if user login is unknown
''' """
filename = os.path.join(self._path, '%s.json' % login) filename = os.path.join(self._path, '%s.json' % login)
if os.path.isfile(filename): if os.path.isfile(filename):
conf = json.load(open(filename, 'r')) conf = json.load(open(filename, 'r'))
logging.debug('Getting configuration %s: %s', filename, conf) self.logger.debug('Getting configuration %s: %s', filename, conf)
return conf return conf
else: else:
raise CCConf.UnknownAccount('%s is not a file' % filename) raise CCConf.UnknownAccount('%s is not a file' % filename)
def _set_conf(self, login, conf, create=False): def _set_conf(self, login, conf, create=False):
''' """ Update the configuration of a client by its login.
Update the configuration of a client by its login.
:param login: login of the client :param login: login of the client
:param conf: configuration to set for the client :param conf: configuration to set for the client
:raise CCConf.UnknownAccount: if user login is unknown :raise CCConf.UnknownAccount: if user login is unknown
''' """
filename = os.path.join(self._path, '%s.json' % login) filename = os.path.join(self._path, '%s.json' % login)
logging.debug('Writing configuration %s: %s', filename, conf) self.logger.debug('Writing configuration %s: %s', filename, conf)
if os.path.isfile(filename) ^ create: if os.path.isfile(filename) ^ create:
json.dump(conf, open(filename, 'w')) json.dump(conf, open(filename, 'w'))
else: else:
raise CCConf.UnknownAccount('%s is not a file' % filename) raise CCConf.UnknownAccount('%s is not a file' % filename)
def acquire(self): def acquire(self):
''' """ Acquire the configuration writing lock for non-atomic configuration
Acquire the configuration writing lock for non-atomic configuration changes.
changes.
.. warning:: .. warning::
Don't forget to call the :meth:`release` method after your changes Don't forget to call the :meth:`release` method after your changes
for each :meth:`acquire` you made. for each :meth:`acquire` you made.
''' """
self._lock.acquire() self._lock.acquire()
def release(self): def release(self):
''' """ Release the configuration writing lock.
Release the configuration writing lock. """
'''
self._lock.release() self._lock.release()
def show(self, login): def show(self, login):
''' """ Show the configuration for specified account.
Show the configuration for specified account.
:param login: the login of the client :param login: the login of the client
:return: configuration of user :return: configuration of user
''' """
return self._get_conf(login) return self._get_conf(login)
...@@ -183,14 +173,13 @@ class CCConf(object): ...@@ -183,14 +173,13 @@ class CCConf(object):
return provided_passwd return provided_passwd
def _hash_password(self, password, method='ssha'): def _hash_password(self, password, method='ssha'):
''' """ Hash a password using given method and return it.
Hash a password using given method and return it.
:param password: the password to hash :param password: the password to hash
:param method: the hashing method :param method: the hashing method
:return: hashed password :return: hashed password
''' """
meth = '_auth_%s' % method.lower() meth = '_auth_%s' % method.lower()
if hasattr(self, meth): if hasattr(self, meth):
auth = getattr(self, meth) auth = getattr(self, meth)
...@@ -199,15 +188,14 @@ class CCConf(object): ...@@ -199,15 +188,14 @@ class CCConf(object):
raise CCConf.BadMethodError('Bad hashing method: %s' % repr(method)) raise CCConf.BadMethodError('Bad hashing method: %s' % repr(method))
def authentify(self, login, password): def authentify(self, login, password):
''' """ Authentify the client providing its login and password. The function
Authentify the client providing its login and password. The function return the role of the client on success, or ``None``.
return the role of the client on success, or ``None``.
:param login: the login of the client :param login: the login of the client
:param password: the password of the client :param password: the password of the client
:return: the client's role or None on failed authentication :return: the client's role or None on failed authentication
:raise CCConf.UnknownAccount: if user login is unknown :raise CCConf.UnknownAccount: if user login is unknown
''' """
conf = self._get_conf(login) conf = self._get_conf(login)
passwd_conf = conf['password'] passwd_conf = conf['password']
...@@ -230,8 +218,8 @@ class CCConf(object): ...@@ -230,8 +218,8 @@ class CCConf(object):
auth = getattr(self, meth) auth = getattr(self, meth)
is_valid = auth(password, password_wo_method) == passwd_conf is_valid = auth(password, password_wo_method) == passwd_conf
else: else:
logging.warning('Bad authentication method for %s: ' self.logger.warning('Bad authentication method for %s: '
'%s', login, m.group('method')) '%s', login, m.group('method'))
if is_valid: if is_valid:
return conf['role'] return conf['role']
else: else:
...@@ -239,14 +227,13 @@ class CCConf(object): ...@@ -239,14 +227,13 @@ class CCConf(object):
@writer @writer
def set_password(self, login, password, method='ssha'): def set_password(self, login, password, method='ssha'):
''' """ Update the client's password in the configuration.
Update the client's password in the configuration.
:param login: login of the user :param login: login of the user
:param password: new password :param password: new password
:param method: the hashing method to use :param method: the hashing method to use
:raise CCConf.UnknownAccount: if user login is unknown :raise CCConf.UnknownAccount: if user login is unknown
''' """
conf = self._get_conf(login) conf = self._get_conf(login)
password = self._hash_password(password, method) password = self._hash_password(password, method)
...@@ -255,32 +242,30 @@ class CCConf(object): ...@@ -255,32 +242,30 @@ class CCConf(object):
@writer @writer
def add_tag(self, login, tag_name, tag_value): def add_tag(self, login, tag_name, tag_value):
''' """ Add the tag to the user.
Add the tag to the user.
:param login: login of the user :param login: login of the user
:param tag_name: tag name to add to the user :param tag_name: tag name to add to the user
:param tag_value: the tag value :param tag_value: the tag value
:raise CCConf.UnknownAccount: if user login is unknown :raise CCConf.UnknownAccount: if user login is unknown
''' """
logging.debug('Added tag %s:%s for %s account', self.logger.debug('Added tag %s:%s for %s account',
tag_name, tag_value, login) tag_name, tag_value, login)
conf = self._get_conf(login) conf = self._get_conf(login)
conf['tags'][tag_name] = tag_value conf['tags'][tag_name] = tag_value
self._set_conf(login, conf) self._set_conf(login, conf)
@writer @writer
def remove_tag(self, login, tag): def remove_tag(self, login, tag):
''' """ Remove the tag to the user.
Remove the tag to the user.
:param login: login of the user :param login: login of the user
:param tag: tag to remove to the user :param tag: tag to remove to the user
:raise CCConf.UnknownAccount: if user login is unknown :raise CCConf.UnknownAccount: if user login is unknown
''' """
logging.debug('Removed tag %s for %s account', login, tag) self.logger.debug('Removed tag %s for %s account', login, tag)
conf = self._get_conf(login) conf = self._get_conf(login)
if tag in conf['tags']: if tag in conf['tags']:
del conf['tags'][tag] del conf['tags'][tag]
...@@ -288,14 +273,13 @@ class CCConf(object): ...@@ -288,14 +273,13 @@ class CCConf(object):
@writer @writer
def remove_account(self, login): def remove_account(self, login):
''' """ Remove the configuration of the account.
Remove the configuration of the account.
:param login: login of the account to remove :param login: login of the account to remove
:raise CCConf.UnknownAccount: if user login is unknown :raise CCConf.UnknownAccount: if user login is unknown
''' """
logging.debug('Removed %s account', login) self.logger.debug('Removed %s account', login)
filename = os.path.join(self._path, '%s.json' % login) filename = os.path.join(self._path, '%s.json' % login)
if os.path.exists(filename): if os.path.exists(filename):
os.remove(filename) os.remove(filename)
...@@ -304,16 +288,15 @@ class CCConf(object): ...@@ -304,16 +288,15 @@ class CCConf(object):
@writer @writer
def create_account(self, login, role, password): def create_account(self, login, role, password):
''' """ Create a new account.
Create a new account.
:param login: login of the new user :param login: login of the new user
:param password: password of the new user :param password: password of the new user
:param role: the role of the new user :param role: the role of the new user
:raise CCConf.AlreadyExistingAccount: if the login is already :raise CCConf.AlreadyExistingAccount: if the login is already
''' """
logging.debug('Creating %s account with role %s', login, role) self.logger.debug('Creating %s account with role %s', login, role)
filename = os.path.join(self._path, '%s.json' % login) filename = os.path.join(self._path, '%s.json' % login)
if os.path.exists(filename): if os.path.exists(filename):
raise CCConf.AlreadyExistingAccount('%s found' % filename) raise CCConf.AlreadyExistingAccount('%s found' % filename)
...@@ -327,15 +310,14 @@ class CCConf(object): ...@@ -327,15 +310,14 @@ class CCConf(object):
@writer @writer
def copy_account(self, copy_login, login, password): def copy_account(self, copy_login, login, password):
''' """ Create a new account based on another.
Create a new account based on another.
:param copy_login: the login of the account to copy. :param copy_login: the login of the account to copy.
:param password: password of the new user :param password: password of the new user
:param role: the role of the new user :param role: the role of the new user
:raise CCConf.AlreadyExistingAccount: if the login is already :raise CCConf.AlreadyExistingAccount: if the login is already
:raise CCConf.UnknownAccount: if the copy login doesn't exist :raise CCConf.UnknownAccount: if the copy login doesn't exist
''' """
conf_copy = self._get_conf(copy_login) conf_copy = self._get_conf(copy_login)
self.create_account(login, conf_copy['role'], password) self.create_account(login, conf_copy['role'], password)
...@@ -345,8 +327,7 @@ class CCConf(object): ...@@ -345,8 +327,7 @@ class CCConf(object):
@writer @writer
def add_right(self, login, tql, method=None, target='allow', index=None): def add_right(self, login, tql, method=None, target='allow', index=None):
''' """ Add a right rule to the provided account.
Add a right rule to the provided account.
:param login: the login of the account :param login: the login of the account
:param tql: the TQL request to allow :param tql: the TQL request to allow
...@@ -358,7 +339,7 @@ class CCConf(object): ...@@ -358,7 +339,7 @@ class CCConf(object):
.. note:: .. note::
If the index is out of range, the rule will be added to the end of If the index is out of range, the rule will be added to the end of
the ruleset. the ruleset.
''' """
conf = self._get_conf(login) conf = self._get_conf(login)
rights = conf['rights'] rights = conf['rights']
...@@ -370,12 +351,11 @@ class CCConf(object): ...@@ -370,12 +351,11 @@ class CCConf(object):
@writer @writer
def remove_right(self, login, index): def remove_right(self, login, index):
''' """ Remove a right rule from the provided account.
Remove a right rule from the provided account.
:param login: the login of the account :param login: the login of the account
:param index: the index of the rule to delete or None for all rules :param index: the index of the rule to delete or None for all rules
''' """
conf = self._get_conf(login) conf = self._get_conf(login)
if index is None: if index is None:
...@@ -389,12 +369,11 @@ class CCConf(object): ...@@ -389,12 +369,11 @@ class CCConf(object):
self._set_conf(login, conf) self._set_conf(login, conf)
def list_accounts(self): def list_accounts(self):
''' """ List all registered accounts.
List all registered accounts.
:return: :class:`tuple` of :class:`str`, each item being an :return: :class:`tuple` of :class:`str`, each item being an
account login account login
''' """
logins = [] logins = []
for filename in os.listdir(self._path): for filename in os.listdir(self._path):
......
...@@ -20,18 +20,25 @@ class SObject(TqlObject): ...@@ -20,18 +20,25 @@ class SObject(TqlObject):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(SObject, self).__init__(*args, **kwargs) super(SObject, self).__init__(*args, **kwargs)
self._overridden = defaultdict(lambda: []) self._overridden = defaultdict(lambda: None)
def register(self, tag, override=False): def register(self, tag, override=False):
""" Register a tag on this object (or override). """ Register a tag on this object (or override).
""" """
# If the tag must override another one, move the old one on the
# overridden tags dict:
if override: if override:
# The tag to register must override an eventual existing tag.
# Overridden tag is moved in the overridden tags dict:
if tag.name in self._tags: if tag.name in self._tags:
self._overridden[tag.name].append(self._tags[tag.name]) self._overridden[tag.name] = self._tags[tag.name]
del self._tags[tag.name] del self._tags[tag.name]
elif tag.name in self._tags:
# The tag to register is already overridden, we place it directly
# on the overridden tags dict:
if tag.name in self._overridden:
raise KeyError('A tag with this name is already registered on this object')
self._overridden[tag.name] = tag
return
return super(SObject, self).register(tag) return super(SObject, self).register(tag)
def unregister(self, name, override=False): def unregister(self, name, override=False):
...@@ -40,14 +47,15 @@ class SObject(TqlObject): ...@@ -40,14 +47,15 @@ class SObject(TqlObject):
super(SObject, self).unregister(name) super(SObject, self).unregister(name)
# If a tag is overriden, replace it on the tag list: # If a tag is overriden, replace it on the tag list:
if override and name in self._overridden: if override and name in self._overridden:
self._tags[name] = self._overridden[name].pop() self._tags[name] = self._overridden[name]
del self._overridden[name]
def is_overriding(self, name): def is_overriding(self, name):
""" Return True if a tag is overriding another one for the name. """ Return True if a tag is overriding another one for the name.
If the tag is not found, False is returned. If the tag is not found, False is returned.
""" """
return bool(self._overridden[name]) return self._overridden[name] is not None
class CCSAsyncTagInterface(BaseTagInterface): class CCSAsyncTagInterface(BaseTagInterface):
...@@ -77,7 +85,7 @@ class RemoteTag(BaseTag): ...@@ -77,7 +85,7 @@ class RemoteTag(BaseTag):
def __init__(self, name, callback, ttl=None): def __init__(self, name, callback, ttl=None):
super(RemoteTag, self).__init__(name) super(RemoteTag, self).__init__(name)
self._callback = callback self._callback = callback
self._ttl = ttl if ttl != -1 else None #FIXME: ANAEL !!!! self._ttl = ttl
self._cache_last_update = None self._cache_last_update = None
self._cache_value = u'' self._cache_value = u''
...@@ -151,8 +159,17 @@ class SRequestor(StaticRequestor): ...@@ -151,8 +159,17 @@ class SRequestor(StaticRequestor):
cb = tuple(to_update)[0].callback cb = tuple(to_update)[0].callback
cb(watcher, obj, [t.name for t in to_update]) cb(watcher, obj, [t.name for t in to_update])
# Get and process the results: # Get and process the results:
for update in watcher.wait(timeout=60): #TODO: adaptative timeout for update in watcher.iter(timeout=4, raise_timeout=True): #TODO: adaptative timeout
obj = update['data'] requested_tags, obj = update['data']
for tag_name, tag_value in update['return'].iteritems(): if 'return' not in update:
obj.set(tag_name, tag_value) for tag_name in requested_tags:
obj[tag_name].cached = tag_value # Set the tag cache value obj.set(tag_name, '#ERR#')
else:
tags = update['return']
for tag_name in requested_tags:
tag_value = tags.get(tag_name)
if tag_value is None:
obj.set(tag_name, '#ERR')
else:
obj.set(tag_name, tag_value)
obj[tag_name].cached = tag_value # Set the tag cache value
#!/usr/bin/env python """ This module contains the hypervisor destination election stuff.
#coding=utf8 """
'''
This module contains the hypervisor destination election stuff.
'''
from __future__ import absolute_import
from copy import copy from copy import copy
from ccserver.exceptions import (UnknownElectionAlgo, UnknownElectionType, from cloudcontrol.server.exceptions import (UnknownElectionAlgo, UnknownElectionType,
ElectionError) ElectionError)
def tags(*args): def tags(*args):
''' """ Decorator used to declare tags used by a filter.
Decorator used to declare tags used by a filter. """
'''
def decorator(func): def decorator(func):
func.__tags__ = set(args) func.__tags__ = set(args)
...@@ -64,13 +57,12 @@ class Elector(object): ...@@ -64,13 +57,12 @@ class Elector(object):
self._login = login self._login = login
def election(self, mtype, algo): def election(self, mtype, algo):
''' """ Generate a new migration plan for this election. You must specify
Generate a new migration plan for this election. You must specify the the migration type and the distribution algoritm.
migration type and the distribution algoritm.
:param mtype: the migration type :param mtype: the migration type
:param algo: the distribution algoritm :param algo: the distribution algoritm
''' """
# Check the choosen election method: # Check the choosen election method:
if mtype not in self.ALGO_BY_TYPES: if mtype not in self.ALGO_BY_TYPES:
......