Skip to content
Commits on Source (59)
*.pyc
doc/_build/*
*.swp
*.swo
*.log
test_*.py
.ropeproject/
......@@ -10,7 +10,7 @@ from getpass import getpass
from pwd import getpwnam
from grp import getgrnam
from optparse import OptionParser
from ccserver.conf import CCConf
from cloudcontrol.server.conf import CCConf
DEFAULT_ACCOUNT_DIRECTORY = '/var/lib/cc-server/'
DEFAULT_ROLE = 'cli'
......@@ -65,7 +65,7 @@ if __name__ == '__main__':
if options.god:
conf.add_right(args[0], '', '*', 'allow', 0)
# Chown the files:
uid = getpwnam(CHOWN_USER).pw_uid
gid = getgrnam(CHOWN_GROUP).gr_gid
......
#!/usr/bin/env python
#coding=utf8
from cloudcontrol.common.helpers.logger import patch_logging; patch_logging()
import os
import sys
import atexit
......@@ -13,10 +15,9 @@ from pwd import getpwnam
from grp import getgrnam
from daemon import DaemonContext
from daemon.pidlockfile import PIDLockFile
from ccserver.ccserver import CCServer
from ccserver import __version__
from cloudcontrol.server.server import CCServer
from cloudcontrol.server import __version__
DEFAULT_CONFIG_FILE = '/etc/cc-server.conf'
......@@ -26,7 +27,7 @@ DEFAULT_CONFIGURATION = {
'user': '',
'group': '',
'pidfile': '',
'umask': '0177',
'umask': '077',
'port': 1984,
'debug': False,
'account_db': None, # None = mandatory option
......@@ -34,7 +35,7 @@ DEFAULT_CONFIGURATION = {
'ssl_cert': None,
'ssl_key': None,
'maxcon': 600,
'maxidle': 30,
'maxidle': 120,
}
......@@ -69,16 +70,21 @@ def run_server(options):
if options['stdout']:
handler = logging.StreamHandler()
fmt = EncodingFormatter('[%(asctime)s] '
'\x1B[30;47m%(name)s\x1B[0m '
'\x1B[30;42m%(levelname)s\x1B[0m: '
'%(message)s')
else:
facility = logging.handlers.SysLogHandler.LOG_DAEMON
handler = logging.handlers.SysLogHandler(address='/dev/log',
facility=facility)
fmt = EncodingFormatter('%(name)s: %(levelname)s %(message)s')
fmt = EncodingFormatter('cc-server (%(name)s): %(levelname)s %(message)s')
handler.setFormatter(fmt)
logger.addHandler(handler)
server = CCServer(conf_dir=options['account_db'],
server = CCServer(logger.getChild('cc-server'),
conf_dir=options['account_db'],
maxcon=int(options['maxcon']),
maxidle=int(options['maxidle']),
port=int(options['port']),
......
#!/usr/bin/env python
#coding=utf8
'''
CloudControl server libraries.
'''
__version__ = '24~dev'
#!/usr/bin/env python
#coding=utf8
'''
Local client representation classes.
'''
from threading import Lock
from datetime import datetime
from sjrpc.utils import ConnectionProxy
class CCClient(object):
'''
Represent a single client connected to the server.
'''
def __init__(self, login, role, server, connection):
# The login of the client:
self.login = login
# The role of the client:
self.role = role
# The server binded to this client:
self.server = server
# The connection of the client (public attribute):
self.connection = connection
# The date of connection of the client:
self._connection_date = datetime.now()
# The date of the last action:
self._last_action = datetime.now()
# The connection proxy:
self.proxy = ConnectionProxy(connection)
# Jobs lock for this client:
self.lock = Lock()
def get_uptime(self):
'''
Get the uptime of the client connection in seconds.
:return: uptime of the client
'''
dt = datetime.now() - self._connection_date
return dt.seconds + dt.days * 86400
def get_idle(self):
'''
Get the idle time of the client connection in seconds.
:return: idle of the client
'''
dt = datetime.now() - self._last_action
return dt.seconds + dt.days * 86400
def top(self):
'''
Reset the last_action date to now.
'''
self._last_action = datetime.now()
def get_ip(self):
peer = self.connection.getpeername()
return ':'.join(peer.split(':')[:-1])
def shutdown(self):
'''
Shutdown the connection to the client.
'''
self.server.rpc.unregister(self.connection, shutdown=True)
def get_tags(self):
'''
Get all server defined tags.
'''
tags = {}
tags['con'] = self.get_uptime()
tags['ip'] = self.get_ip()
return tags
def get_remote_tags(self, tag):
return self.connection.call('get_tags', (tag,))[tag]
from ccserver.clients import Client
class BootstrapClient(Client):
""" A bootstrap client connected to the cc-server.
"""
ROLE = 'bootstrap'
Client.register_client_class(BootstrapClient)
#!/usr/bin/env python
#coding=utf8
'''
Jobs management on the server.
'''
from __future__ import absolute_import
import re
import logging
import time
from datetime import datetime
from threading import Thread, Lock
from sjrpc.core import AsyncWatcher
from ccserver.exceptions import (BadJobTypeError, UnknownJobError, JobError,
UnknownObjectError)
from ccserver.utils import AcquiresAllOrNone
class JobCancelError(Exception):
'''
Exception used by jobs to stop it when a cancel signal is sent.
'''
pass
class BaseJob(dict, Thread, object):
'''
A base class to define a job.
The standards job items are:
* id: id of the job
* status: message explaining the current job status
* done: True if the job is done
* cancelled: True if job has been cancelled by user
* created: job date of creation
* ended: job date of end (or None if done = False)
* duration: duration in seconds of the job (processed on export)
* author: author login of the job
:param manager: the :class:`JobsManager` instance.
'''
def __init__(self, manager, *args, **kwargs):
# Initialize the inherited classes:
dict.__init__(self, *args, **kwargs)
Thread.__init__(self)
# The manager of this job:
self.manager = manager
# Define default job properties:
self['status'] = 'pending'
self['done'] = False
self['cancelled'] = False
self['created'] = datetime.now()
self['ended'] = None
self['duration'] = 0
#~ assert self.get('author') is not None, 'author is not defined'
# List of actions to do by the rollback method:
self._wayback = []
# Set the thread name:
self.name = 'job-%s' % self['id']
def __hash__(self):
return self['id'].__hash__()
def __setitem__(self, key, value):
if key == 'id':
raise KeyError('Key %r in read-only.' % key)
else:
super(BaseJob, self).__setitem__(key, value)
def __delitem__(self, key):
if key == 'id':
raise KeyError('Key %r in read-only.' % key)
else:
super(BaseJob, self).__delitem__(key)
def report(self, status, done=None):
'''
Report the status of the job.
:param status: the status to set to the job
:param done: is the job done, None to keep current value
'''
self['status'] = status
if done is not None:
self['ended'] = datetime.now()
self['done'] = done
def run(self):
'''
Run the job itself.
'''
try:
self.job()
except JobCancelError as err:
self._rollback('%s' % err)
except Exception as err:
logging.error('Error while executing job: %s, %r', err, err)
self._rollback('%s' % err)
else:
self.report('success', done=True)
def job(self):
'''
Method to override to define the job's behavior.
'''
pass
def checkpoint(self, func=None):
'''
Check if job is not cancelled, else raise the CancellJobError. Also
add the provided function (optionnal) to the wayback list.
:param func: callable to add to the wayback list.
'''
if self['cancelled']:
raise JobCancelError('Job has been cancelled by user')
if func is not None:
self._wayback.append(func)
def _rollback(self, error):
'''
Rollback the job.
'''
self.report('rollbacking')
try:
for func in reversed(self._wayback):
func()
except Exception as err:
self.report('rollback failed: %s' % err, done=True)
else:
self.report('cancelled: %s' % error, done=True)
def cancel(self):
'''
Cancel the job.
.. note::
You can override this method to trigger an action when user cancel
the job.
'''
if self['done']:
raise JobError('Job is done')
if self['cancelled']:
raise JobError('Job is already cancelled')
self['cancelled'] = True
self.report('cancelling')
def export(self, props=None):
'''
Export the job in a simple dict format.
'''
exported = {}
for key, val in self.iteritems():
if key.startswith('_') or (props is not None and key not in props):
continue
if isinstance(val, datetime):
val = int(time.mktime(val.timetuple()))
if key == 'duration':
if self['done']:
dt = self['ended'] - self['created']
val = dt.seconds + dt.days * 86400
else:
now = datetime.now()
dt = now - self['created']
val = dt.seconds + dt.days * 86400
exported[key] = str(val)
return exported
class KillClientJob(BaseJob):
'''
A job used to kill connected accounts.
Mandatory items:
* account: the account login to kill
Optional items:
* gracetime: time before to kill the user
'''
def job(self):
gracetime = self.get('gracetime')
account = self.get('account')
assert account is not None, 'Account not specified'
if gracetime is not None:
time.sleep(int(gracetime))
self.checkpoint()
self.manager.server.kill(account)
class KillOldCliJob(BaseJob):
'''
Typically an hidden job used to kill clients who are connected/idle since
too much time.
Mandatory items:
* maxcon: maximum connection time in minutes
* maxidle: maximum idle time in minutes
Optional items:
* delay: delay in secondes between two checks (default 1m)
'''
DEFAULT_DELAY = 60
def job(self):
maxcon = self.get('maxcon')
assert maxcon is not None, 'maxcon is None'
maxidle = self.get('maxidle')
assert maxidle is not None, 'maxidle is None'
delay = self.get('delay', self.DEFAULT_DELAY)
while True:
self.checkpoint()
for client in self.manager.server.iterclients('cli'):
if client.uptime > (maxcon * 60):
self.manager.server.kill(client.login)
#TODO: handle idleing.
time.sleep(delay)
class BaseMigrationJob(BaseJob):
'''
Base class for migration jobs.
'''
def _check_status(self, vm_id, status):
'''
Check the status of the VM.
'''
answer = self.manager.server.list('id=%s&status=%s' % (vm_id, status))
return bool(answer)
class ColdMigrationJob(BaseMigrationJob):
'''
A cold vm migration job.
Mandatory items:
* vm_name: name of the vm to migrate
* hv_source: name of the hv which execute the VM
* hv_dest: the destination hypervisor
* author: login of the author cli
'''
def job(self):
vm_id = '%s.%s' % (self['hv_source'], self['vm_name'])
self['title'] = 'Cold migration %s --> %s' % (vm_id, self['hv_dest'])
logging.info('Job-%s: Started migration for %s', self['id'], vm_id)
# Cancel the job if the user has not the right to migrate the vm or to
# select an hypervisor as destination:
right_check = self.manager.server.check
tql = 'id=%s' % vm_id
if not right_check(self['author'], 'coldmigrate', tql):
raise JobCancelError('author have no rights to migrate this VM')
tql = 'id=%s' % self['hv_dest']
if not right_check(self['author'], 'coldmigrate_dest', tql):
raise JobCancelError('author have no right to migrate to this hv')
# Update the VM object:
vm = self.manager.server.db.get_by_id(vm_id)
if vm is None:
raise JobCancelError('Source VM not found')
# Get the source and destination hv clients:
try:
source = self.manager.server.get_client(self['hv_source'])
except KeyError:
raise JobCancelError('source hypervisor is not connected')
try:
dest = self.manager.server.get_client(self['hv_dest'])
except KeyError:
raise JobCancelError('destination hypervisor is not connected')
self.checkpoint()
self.report('waiting lock for source and dest hypervisors')
logging.info('Job-%s: Trying to acquire locks', self['id'])
with AcquiresAllOrNone(source.hvlock, dest.hvlock):
logging.info('Job-%s: Locks acquired', self['id'])
self.checkpoint()
if not vm['status'] == 'stopped':
raise JobCancelError('vm is not stopped')
# Create storages on destination:
self.report('create volumes')
for disk in vm.get('disk', '').split():
# Getting informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
size = vm.get('disk%s_size' % disk)
assert pool is not None, 'pool tag doesn\'t exists'
assert name is not None, 'name tag doesn\'t exists'
assert size is not None, 'size tag doesn\'t exists'
# Create the volume on destination:
dest.proxy.vol_create(pool, name, int(size))
logging.info('Job-%s: Created volume %s/%s on destination '
'hypervisor', self['id'], pool, name)
# Rollback stuff for this action:
def rb_volcreate():
dest.proxy.vol_delete(pool, name)
self.checkpoint(rb_volcreate)
# Define VM:
self.report('define vm')
logging.info('Job-%s: XML configuration transfert', self['id'])
vm_config = source.proxy.vm_export(self['vm_name'])
dest.proxy.vm_define(vm_config)
# Rollback stuff for vm definition:
def rb_define():
dest.proxy.vm_undefine(self['vm_name'])
self.checkpoint(rb_define)
# Copy all source disk on destination disk:
for disk in vm.get('disk', '').split():
self._copy_disk(source, dest, vm, disk)
# At this point, if operation is a success, all we need is just to
# cleanup source hypervisor from disk and vm. This operation *CAN'T*
# be cancelled or rollbacked if anything fails (unlikely). The
# migration must be considered as a success, and the only way to
# undo this is to start a new migration in the other way.
# Delete the rollback list.
# This is mandatory to avoid data loss if the cleanup
# code below fail.
self._wayback = []
# Cleanup the disks:
for disk in vm.get('disk', '').split():
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
source.proxy.vol_delete(pool, name)
# Cleanup the VM:
source.proxy.vm_undefine(self['vm_name'])
logging.info('Job-%s: Migration completed with success', self['id'])
def _copy_disk(self, source, dest, vm, disk):
'''
Copy the specified disk name of the vm from source to dest.
'''
# Get informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
logging.info('Job-%s: Started copy for %s/%s', self['id'], pool, name)
self.report('copy %s/%s' % (pool, name))
# Make the copy and wait for it end:
xferprop = dest.proxy.vol_import(pool, name)
# Register the cancel function:
def cancel_xfer():
dest.proxy.vol_import_cancel(xferprop['id'])
self['func_cancel_xfer'] = cancel_xfer
# Wait for the end of transfert:
watcher = AsyncWatcher()
watcher.register(source.conn, 'vol_export', pool, name, dest.ip, xferprop['port'])
watcher.register(dest.conn, 'vol_import_wait', xferprop['id'])
msgs = watcher.wait()
del self['func_cancel_xfer']
# Compare checksum of two answers:
checksums = []
assert len(msgs) == 2
for msg in msgs:
if msg.get('error') is not None:
msg = 'error while copy: %s' % msg['error']['message']
raise JobCancelError(msg)
else:
checksums.append(msg['return'].get('checksum'))
self.checkpoint()
if checksums[0] != checksums[1]:
raise JobCancelError('checksum mismatches')
def cancel(self):
if self.get('func_cancel_xfer') is not None:
self.get('func_cancel_xfer')()
super(ColdMigrationJob, self).cancel()
class HotMigrationJob(BaseMigrationJob):
'''
A hot vm migration job.
Mandatory items:
* vm_name: name of the vm to migrate
* hv_source: name of the hv which execute the VM
* hv_dest: the destination hypervisor
* author: login of the author cli
'''
def job(self):
vm_id = '%s.%s' % (self['hv_source'], self['vm_name'])
self['title'] = 'Hot migration %s --> %s' % (vm_id, self['hv_dest'])
logging.info('Job-%s: Started hot migration for %s', self['id'], vm_id)
# Cancel the job if the user has not the right to migrate the vm or to
# select an hypervisor as destination:
right_check = self.manager.server.check
tql = 'id=%s' % vm_id
if not right_check(self['author'], 'hotmigrate', tql):
raise JobCancelError('author have no rights to migrate this VM')
tql = 'id=%s' % self['hv_dest']
if not right_check(self['author'], 'hotmigrate_dest', tql):
raise JobCancelError('author have no right to migrate to this hv')
# Update the VM object:
vm = self.manager.server.db.get_by_id(vm_id)
if vm is None:
raise JobCancelError('Source VM not found')
# Get the source and destination hv clients:
try:
source = self.manager.server.get_client(self['hv_source'])
except KeyError:
raise JobCancelError('source hypervisor is not connected')
try:
dest = self.manager.server.get_client(self['hv_dest'])
except KeyError:
raise JobCancelError('destination hypervisor is not connected')
self.checkpoint()
self.report('waiting lock for source and dest hypervisors')
logging.info('Job-%s: Trying to acquire locks', self['id'])
with AcquiresAllOrNone(source.hvlock, dest.hvlock):
logging.info('Job-%s: Locks acquired', self['id'])
self.checkpoint()
if not self._check_status(vm_id, 'running'):
raise JobCancelError('vm is not started')
to_cleanup = []
# Create storages on destination and start synchronization:
disks = vm.get('disk', '').split()
for disk in disks:
to_cleanup += self._sync_disk(vm, disk, source, dest)
# Libvirt tunnel setup:
tunres_src = source.proxy.tun_setup()
def rb_tun_src():
source.proxy.tun_destroy(tunres_src)
self.checkpoint(rb_tun_src)
tunres_dst = dest.proxy.tun_setup(local=False)
def rb_tun_dst():
dest.proxy.tun_destroy(tunres_dst)
self.checkpoint(rb_tun_dst)
source.proxy.tun_connect(tunres_src, tunres_dst, dest.ip)
dest.proxy.tun_connect_hv(tunres_dst, migration=False)
# Migration tunnel setup:
migtunres_src = source.proxy.tun_setup()
def rb_migtun_src():
source.proxy.tun_destroy(migtunres_src)
self.checkpoint(rb_migtun_src)
migtunres_dst = dest.proxy.tun_setup(local=False)
def rb_migtun_dst():
dest.proxy.tun_destroy(migtunres_dst)
self.checkpoint(rb_migtun_dst)
source.proxy.tun_connect(migtunres_src, migtunres_dst, dest.ip)
dest.proxy.tun_connect_hv(migtunres_dst, migration=True)
# Initiate the live migration:
self.report('migration in progress')
source.proxy.vm_migrate_tunneled(self['vm_name'], tunres_src,
migtunres_src)
# At this point, if operation is a success, all we need is just to
# cleanup source hypervisor from disk and vm. This operation *CAN'T*
# be cancelled or rollbacked if anything fails (unlikely). The
# migration must be considered as a success, and the only way to
# undo this is to start a new migration in the other way.
# Delete the rollback list.
# This is mandatory to avoid data loss if the cleanup
# code below fail.
self.report('cleanup')
self._wayback = []
source.proxy.tun_destroy(tunres_src)
dest.proxy.tun_destroy(tunres_dst)
source.proxy.tun_destroy(migtunres_src)
dest.proxy.tun_destroy(migtunres_dst)
for cb_cleanup in reversed(to_cleanup):
cb_cleanup()
# Cleanup the disks:
for disk in disks:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
source.proxy.vol_delete(pool, name)
logging.info('Job-%s: Migration completed with success', self['id'])
def _sync_disk(self, vm, disk, source, dest):
to_cleanup = []
# getting informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
size = vm.get('disk%s_size' % disk)
assert pool is not None, 'pool tag doesn\'t exists'
assert name is not None, 'name tag doesn\'t exists'
assert size is not None, 'size tag doesn\'t exists'
status_msg = 'sync volume %s/%s (%%s)' % (pool, name)
self.report(status_msg % 'creation')
# create the volume on destination:
dest.proxy.vol_create(pool, name, int(size))
logging.info('job-%s: created volume %s/%s on destination '
'hypervisor', self['id'], pool, name)
# rollback stuff for this action:
def rb_volcreate():
dest.proxy.vol_delete(pool, name)
self.checkpoint(rb_volcreate)
# setup the drbd synchronization with each hypervisors:
self.report(status_msg % 'setup')
res_src = source.proxy.drbd_setup(pool, name)
def rb_setupsrc():
source.proxy.drbd_shutdown(res_src)
self.checkpoint(rb_setupsrc)
to_cleanup.append(rb_setupsrc)
res_dst = dest.proxy.drbd_setup(pool, name)
def rb_setupdst():
dest.proxy.drbd_shutdown(res_dst)
self.checkpoint(rb_setupdst)
to_cleanup.append(rb_setupdst)
# start connection of drbd:
self.report(status_msg % 'connect')
source.proxy.drbd_connect(res_src, res_dst, dest.ip)
dest.proxy.drbd_connect(res_dst, res_src, source.ip)
# setup topology as primary/secondary:
source.proxy.drbd_role(res_src, True)
dest.proxy.drbd_role(res_dst, False)
# Wait the end of the synchronization:
sync_running = True
while sync_running:
status = dest.proxy.drbd_sync_status(res_dst)
if status['done']:
sync_running = False
self.report(status_msg % 'sync %s%%' % status['completion'])
time.sleep(2)
dest.proxy.drbd_role(res_dst, True)
source.proxy.drbd_takeover(res_src, True)
def rb_takeover_src():
source.proxy.drbd_takeover(res_src, False)
self.checkpoint(rb_takeover_src)
to_cleanup.append(rb_takeover_src)
dest.proxy.drbd_takeover(res_dst, True)
def rb_takeover_dst():
dest.proxy.drbd_takeover(res_dst, False)
self.checkpoint(rb_takeover_dst)
to_cleanup.append(rb_takeover_dst)
return to_cleanup
class CloneJob(BaseMigrationJob):
'''
A clone job.
Mandatory items:
* vm_name: name of the vm to migrate
* new_vm_name: the name of the cloned vm
* hv_source: name of the hv which execute the VM
* hv_dest: the destination hypervisor
* author: login of the author cli
'''
def job(self):
vm_id = '%s.%s' % (self['hv_source'], self['vm_name'])
self['title'] = 'Clone %s --> %s' % (vm_id, self['hv_dest'])
logging.info('Job-%s: Started clone for %s', self['id'], vm_id)
# Cancel the job if the user has not the right to clone the vm or to
# select an hypervisor as destination:
right_check = self.manager.server.check
tql = 'id=%s' % vm_id
if not right_check(self['author'], 'clone', tql):
raise JobCancelError('author have no rights to migrate this VM')
tql = 'id=%s' % self['hv_dest']
if not right_check(self['author'], 'clone_dest', tql):
raise JobCancelError('author have no right to migrate to this hv')
# Update the VM object:
try:
self.manager.server.objects.update(ids=(vm_id,))
vm = self.manager.server.objects.get_by_id(vm_id)
except UnknownObjectError:
raise JobCancelError('Cloned VM not found')
# Get the source and destination hv clients:
try:
source = self.manager.server.get_connection(self['hv_source'])
except KeyError:
raise JobCancelError('source hypervisor is not connected')
try:
dest = self.manager.server.get_connection(self['hv_dest'])
except KeyError:
raise JobCancelError('destination hypervisor is not connected')
self.checkpoint()
self.report('waiting lock for source and dest hypervisors')
logging.info('Job-%s: Trying to acquire locks', self['id'])
with AcquiresAllOrNone(source.lock, dest.lock):
logging.info('Job-%s: Locks acquired', self['id'])
self.checkpoint()
# Create storages on destination:
old_new_disk_mapping = {} # Mapping between old and new disk names
self.report('create volumes')
for disk in vm.get('disk', '').split():
# Getting informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
size = vm.get('disk%s_size' % disk)
assert pool is not None, 'pool tag doesn\'t exists'
assert name is not None, 'name tag doesn\'t exists'
assert size is not None, 'size tag doesn\'t exists'
# Change the name of the disk:
old_name = name
if name.startswith(self['vm_name']):
suffix = name[len(self['vm_name']):]
name = self['new_vm_name'] + suffix
else:
name = '%s_%s' % (self['new_vm_name'], name)
fulloldname = '/dev/%s/%s' % (pool, old_name)
fullnewname = '/dev/%s/%s' % (pool, name)
old_new_disk_mapping[fulloldname] = fullnewname
# Create the volume on destination:
dest.proxy.vol_create(pool, name, int(size))
logging.info('Job-%s: Created volume %s/%s on destination '
'hypervisor (was %s)', self['id'], pool, name,
old_name)
# Rollback stuff for this action:
def rb_volcreate():
dest.proxy.vol_delete(pool, name)
self.checkpoint(rb_volcreate)
# Define VM:
self.report('define vm')
logging.info('Job-%s: XML configuration transfert', self['id'])
vm_config = source.proxy.vm_export(self['vm_name'])
# Change vm configuration XML to update it with new name:
new_vm_config = self._update_xml(vm_config, self['vm_name'],
self['new_vm_name'],
old_new_disk_mapping)
dest.proxy.vm_define(new_vm_config)
# Rollback stuff for vm definition:
def rb_define():
dest.proxy.vm_undefine(name)
self.checkpoint(rb_define)
# Copy all source disk on destination disk:
for disk in vm.get('disk', '').split():
self._copy_disk(source, dest, vm, disk, name)
logging.info('Job-%s: Clonage completed with success', self['id'])
def _update_xml(self, vm_config, old_name, name, old_new_name_mapping):
'''
Update the XML definition of the VM with the new vm name, the new vm
disk, and remove the uuid tag.
'''
vm_config = vm_config.replace('<name>%s</name>' % old_name,
'<name>%s</name>' % name)
vm_config = re.sub('<uuid>.*?</uuid>\n', '', vm_config)
# delete MAC address, then it will be regenerated by libvirt
vm_config = re.sub('<mac .*?/>\n', '', vm_config)
for old, new in old_new_name_mapping.iteritems():
vm_config = vm_config.replace("='%s'" % old,
"='%s'" % new)
return vm_config
def _copy_disk(self, source, dest, vm, disk, new_disk):
'''
Copy the specified disk name of the vm from source to dest.
'''
# Get informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
logging.info('Job-%s: Started copy for %s/%s to %s/%s',
self['id'], pool, name, pool, new_disk)
self.report('copy %s/%s' % (pool, name))
# Make the copy and wait for it end:
xferprop = dest.proxy.vol_import(pool, new_disk)
# Register the cancel function:
def cancel_xfer():
dest.proxy.vol_import_cancel(xferprop['id'])
self['func_cancel_xfer'] = cancel_xfer
# Wait for the end of transfert:
cids = set()
cids.add(source.connection.async_call('vol_export', pool, name,
dest.get_ip(), xferprop['port']))
cids.add(dest.connection.async_call('vol_import_wait', xferprop['id']))
msgs = self.manager.server.manager.wait(frozenset(cids))
del self['func_cancel_xfer']
# Compare checksum of two answers:
checksums = []
assert len(msgs) == 2
for msg in msgs:
if msg.get('error') is not None:
msg = 'error while copy: %s' % msg['error']['message']
raise JobCancelError(msg)
else:
checksums.append(msg.get('checksum'))
self.checkpoint()
if checksums[0] != checksums[1]:
raise JobCancelError('checksum mismatches')
def cancel(self):
if self.get('func_cancel_xfer') is not None:
self.get('func_cancel_xfer')()
super(CloneJob, self).cancel()
class JobsManager(object):
'''
Manage the current job list.
:param server: The :class:`CCServer` instance.
'''
JOBS_TYPES = {
'kill': KillClientJob,
'kill_oldcli': KillOldCliJob,
'cold_migrate': ColdMigrationJob,
'hot_migrate': HotMigrationJob,
'clone': CloneJob,
}
def __init__(self, server):
# The main job dict, the keys are id of jobs.
self._jobs = {}
# The server:
self.server = server
# The id of the next job and it's lock:
self._current_id = 1
self._current_id_lock = Lock()
def create(self, jtype, **kwargs):
'''
Create a new job.
:param jtype: the type of the new job
:param \*\*kwargs: arguments to pass to the job
:raise BadJobTypeError: when invalid jtype is passed
'''
jobid = self.get_id()
jobtype = JobsManager.JOBS_TYPES.get(jtype)
if jobtype is None:
raise BadJobTypeError('Invalid job type %r' % jtype)
job = jobtype(self, id=jobid, **kwargs)
self._jobs[jobid] = job
job.daemon = True
job.start()
return job
def get_id(self):
'''
Get the current id and increment the counter.
'''
with self._current_id_lock:
jobid = self._current_id
self._current_id += 1
return jobid
def cancel(self, jobid):
'''
Cancel the provided job.
'''
job = self._jobs.get(jobid)
if job is None:
raise UnknownJobError('Invalid job id: %r' % jobid)
elif job.get('_hidden', False):
raise UnknownJobError('Invalid job id: %r (hidden)' % jobid)
else:
job.cancel()
def purge(self):
'''
Purge all done jobs.
'''
for job in self._jobs.values():
if job['done']:
del self._jobs[job['id']]
def iterjobs(self, show_done=True, show_running=True):
'''
Iter over jobs.
:param done: If set, iterate over done or not done jobs.
'''
for job in self._jobs.itervalues():
if (show_done and job['done'] or show_running and not job['done']
and not job.get('_hidden')):
yield job
#!/usr/bin/env python
#coding=utf8
'''
OrderedSet Python implementation.
This snippet of code is taken from http://code.activestate.com/recipes/576694/
Written by Raymond Hettinger's and licenced under the MIT Licence.
Comments:
Runs on Py2.6 or later (and runs on 3.0 or later without any modifications).
Implementation based on a doubly linked link and an internal dictionary.
This design gives OrderedSet the same big-Oh running times as regular sets
including O(1) adds, removes, and lookups as well as O(n) iteration.
'''
import collections
KEY, PREV, NEXT = range(3)
class OrderedSet(collections.MutableSet):
def __init__(self, iterable=None):
self.end = end = []
end += [None, end, end] # sentinel node for doubly linked list
self.map = {} # key --> [key, prev, next]
if iterable is not None:
self |= iterable
def __len__(self):
return len(self.map)
def __contains__(self, key):
return key in self.map
def add(self, key):
if key not in self.map:
end = self.end
curr = end[PREV]
curr[NEXT] = end[PREV] = self.map[key] = [key, curr, end]
def discard(self, key):
if key in self.map:
key, prev, next = self.map.pop(key)
prev[NEXT] = next
next[PREV] = prev
def __iter__(self):
end = self.end
curr = end[NEXT]
while curr is not end:
yield curr[KEY]
curr = curr[NEXT]
def __reversed__(self):
end = self.end
curr = end[PREV]
while curr is not end:
yield curr[KEY]
curr = curr[PREV]
def pop(self, last=True):
if not self:
raise KeyError('set is empty')
key = next(reversed(self)) if last else next(iter(self))
self.discard(key)
return key
def __repr__(self):
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, list(self))
def __eq__(self, other):
if isinstance(other, OrderedSet):
return len(self) == len(other) and list(self) == list(other)
return set(self) == set(other)
def __del__(self):
self.clear() # remove circular references
__import__('pkg_resources').declare_namespace(__name__)
""" CloudControl server libraries.
"""
__version__ = '24~rc1'
......@@ -4,15 +4,13 @@ This package store classes representing each client's role and the associated
sjRPC handler.
"""
import logging
from datetime import datetime
from sjrpc.utils import ConnectionProxy
from ccserver.handlers import CCHandler, listed
from ccserver.exceptions import RightError
from ccserver.db import RemoteTag
from cloudcontrol.server.handlers import CCHandler, listed
from cloudcontrol.server.exceptions import RightError
from cloudcontrol.server.db import RemoteTag
from cloudcontrol.common.tql.db.tag import CallbackTag
......@@ -26,7 +24,7 @@ class RegisteredCCHandler(CCHandler):
return super(RegisteredCCHandler, self).__getitem__(name)
def on_disconnect(self, conn):
logging.info('Client %s disconnected', self.client.login)
self.logger.info('Client %s disconnected', self.client.login)
self.client.shutdown()
def check(self, method, tql=None):
......@@ -88,29 +86,28 @@ class Client(object):
ROLE = None
RPC_HANDLER = RegisteredCCHandler
KILL_ALREADY_CONNECTED = False
roles = {}
def __init__(self, login, server, connection, tql_object):
def __init__(self, logger, login, server, connection):
self.logger = logger
self._login = login
self._server = server
self._connection = connection
self._tql_object = tql_object
self._handler = self.RPC_HANDLER(self)
self._proxy = ConnectionProxy(self._connection.rpc)
self._last_action = datetime.now()
self._connection_date = datetime.now()
# Set the role's handler for the client:
self._connection.rpc.set_handler(self._handler)
self._tql_object = None
# Remote tags registered:
self._remote_tags = set()
# Register the server defined client tags:
self._tql_object.register(CallbackTag('con', lambda: self.uptime, ttl=0))
self._tql_object.register(CallbackTag('idle', lambda: self.idle, ttl=0))
self._tql_object.register(CallbackTag('ip', lambda: self.ip))
def _get_tql_object(self):
""" Get the TQL object of the client from the cc-server tql database.
"""
return self._server.db.get(self.login)
@classmethod
def register_client_class(cls, class_):
......@@ -119,8 +116,8 @@ class Client(object):
cls.roles[class_.ROLE] = class_
@classmethod
def from_role(cls, role, login, server, connection, tql_object):
return cls.roles[role](login, server, connection, tql_object)
def from_role(cls, role, logger, login, server, connection):
return cls.roles[role](logger, login, server, connection)
#
# Properties
......@@ -144,6 +141,12 @@ class Client(object):
"""
return self._login
@property
def role(self):
""" Return the role of this client.
"""
return self.ROLE
@property
def server(self):
""" Return the cc-server binded to this client.
......@@ -182,16 +185,27 @@ class Client(object):
peer = self.conn.getpeername()
return ':'.join(peer.split(':')[:-1])
def get_tags(self, tags): # DEPRECATED
""" Get tags on the remote node.
:param tags: tags is the list of tags to fetch
def attach(self):
""" Attach the client to the server.
"""
return self._connection.call('get_tags', tags)
# Set the role's handler for the client:
self.conn.rpc.set_handler(self._handler)
# Register the client in tql database:
self._tql_object = self._get_tql_object()
# Register the server defined client tags:
self._tql_object.register(CallbackTag('con', lambda: self.uptime, ttl=0))
self._tql_object.register(CallbackTag('idle', lambda: self.idle, ttl=0))
self._tql_object.register(CallbackTag('ip', lambda: self.ip))
def shutdown(self):
""" Shutdown the connection to the client.
"""
# Disable the client handler:
self.conn.rpc.set_handler(None)
# Unregister all remote tags:
for tag in self._remote_tags.copy():
self.tags_unregister(tag)
......@@ -209,14 +223,11 @@ class Client(object):
"""
self._last_action = datetime.now()
def get_remote_tags(self, tag): # DEPRECATED
return self.conn.call('get_tags', (tag,))[tag]
def async_remote_tags(self, watcher, robj, tags):
""" Asynchronously update tags from the remote client using
specified watcher.
"""
watcher.register(self.conn, 'get_tags', tags, _data=robj)
watcher.register(self.conn, 'get_tags', tags, _data=(tags, robj))
def tags_register(self, name, ttl=None, value=None):
""" Register a new remote tag for the client.
......
from cloudcontrol.server.clients import Client
from cloudcontrol.server.clients.host import HostClient
from cloudcontrol.server.db import SObject
from cloudcontrol.common.tql.db.tag import StaticTag
class BootstrapClient(HostClient):
""" A bootstrap client connected to the cc-server.
"""
ROLE = 'bootstrap'
def _get_tql_object(self):
tql_object = SObject(self.login)
tql_object.register(StaticTag('r', self.role))
self._server.db.register(tql_object)
return tql_object
@property
def login(self):
return '%s.%s' % (self._login, self.conn.get_fd())
@property
def role(self):
return 'host'
def shutdown(self):
super(BootstrapClient, self).shutdown()
# Also, remote the object from the db:
self._server.db.unregister(self.login)
Client.register_client_class(BootstrapClient)
import logging
from collections import defaultdict
from sjrpc.core import RpcError
from sjrpc.core.protocols import TunnelProtocol
from ccserver.orderedset import OrderedSet
from ccserver.conf import CCConf
from ccserver.exceptions import (AlreadyRegistered, AuthenticationError,
RightError, ReservedTagError, BadObjectError,
BadRoleError, NotConnectedAccountError,
CloneError)
from ccserver.election import Elector
from ccserver.handlers import listed, Reporter
from ccserver.clients import Client, RegisteredCCHandler
from cloudcontrol.common.datastructures.orderedset import OrderedSet
from cloudcontrol.server.conf import CCConf
from cloudcontrol.server.exceptions import (ReservedTagError, BadObjectError,
BadRoleError, NotConnectedAccountError,
CloneError)
from cloudcontrol.server.election import Elector
from cloudcontrol.server.handlers import listed, Reporter
from cloudcontrol.server.clients import Client, RegisteredCCHandler
from cloudcontrol.common.tql.db.tag import StaticTag
MIGRATION_TYPES = {'cold': 'cold_migrate',
......@@ -21,38 +16,44 @@ MIGRATION_TYPES = {'cold': 'cold_migrate',
class CliHandler(RegisteredCCHandler):
""" Handler binded to 'cli' role.
""" Handler binded to the 'cli' role.
Summary of methods:
================ ================================ =============
Method name Description Right(s)
================ ================================ =============
list list objects list
start start a vm start
stop stop a vm stop
destroy destroy a vm destroy
pause suspend a vm pause
resume resume a paused vm resume
passwd change password of accounts passwd
addaccount add a new account addaccount
copyaccount copy an account addaccount
addtag add a tag to accounts addtag
deltag remove a tag from accounts deltag
tags show tags of accounts tags
delaccount delete an account delaccount
close close an account close
declose declose an account declose
kill kill a connected account kill
rights show rights of accounts rights
addright add right rules to accounts addright
delright remove right rules from accounts delright
execute execute remote command on hosts execute
shutdown shutdown a connected client shutdown
jobs show jobs jobs
cancel cancel a running job cancel
jobspurge remove done jobs from jobs list jobspurge
================ ================================ =============
.. currentmodule:: cloudcontrol.server.clients.cli
.. autosummary::
CliHandler.list
CliHandler.start
CliHandler.stop
CliHandler.destroy
CliHandler.pause
CliHandler.resume
CliHandler.passwd
CliHandler.addaccount
CliHandler.copyaccount
CliHandler.addtag
CliHandler.deltag
CliHandler.tags
CliHandler.delaccount
CliHandler.close
CliHandler.declose
CliHandler.kill
CliHandler.rights
CliHandler.addright
CliHandler.delright
CliHandler.execute
CliHandler.shutdown
CliHandler.jobs
CliHandler.cancel
CliHandler.jobspurge
CliHandler.console
CliHandler.rshell
CliHandler.rshell_resize
CliHandler.rshell_wait
CliHandler.forward
CliHandler.dbstats
"""
@listed
......@@ -63,7 +64,7 @@ class CliHandler(RegisteredCCHandler):
"""
self.check('list', query)
logging.debug('Executed list function with query %s', query)
self.logger.debug('Executed list function with query %s', query)
objects = self.server.list(query)
order = OrderedSet(['id'])
#if tags is not None:
......@@ -140,10 +141,7 @@ class CliHandler(RegisteredCCHandler):
self.check('undefine', query)
#FIXME: When tag globbing will be implemented, the list of tags to
# show will be: r, p, h, disk*
# I ask "all tags" pending implementation.
objects = self.server.list(query, show=('*',))
objects = self.server.list(query, show=('r', 'p', 'h', 'disk*',))
errs = Reporter()
for obj in objects:
if obj['r'] != 'vm':
......@@ -234,7 +232,6 @@ class CliHandler(RegisteredCCHandler):
errs = Reporter()
with self.conf:
for obj in objects:
print obj
if 'a' not in obj:
errs.error(obj['id'], 'not an account')
continue
......@@ -246,7 +243,7 @@ class CliHandler(RegisteredCCHandler):
' to %s' % (tags[tag_name], tag_value))
# Update the object db (update the tag value):
dbobj = self.server.db.get(obj['id'])
dbobj[tag_name].set_value(tag_value)
dbobj[tag_name].value = tag_value
else:
errs.success(obj['id'], 'tag created')
# Update the object db (create the tag):
......@@ -355,6 +352,8 @@ class CliHandler(RegisteredCCHandler):
errs.success(obj['id'], 'closed')
self.server.conf.add_tag(obj['a'], 'close', 'yes')
dbobj = self.server.db.get(obj['id'])
dbobj.register(StaticTag('close', 'yes'), override=True)
self.server.jobs.create('kill', author=self.client.login,
account=obj['a'], gracetime=1)
......@@ -379,9 +378,11 @@ class CliHandler(RegisteredCCHandler):
tags = self.conf.show(obj['a'])['tags']
if 'close' in tags:
errs.success(obj['id'], 'account declosed')
self.conf.remove_tag(obj['a'], 'close')
dbobj = self.server.db.get(obj['id'])
dbobj.unregister('close', override=True)
else:
errs.warn(obj['id'], 'account not closed')
self.conf.remove_tag(obj['a'], 'close')
return errs.get_dict()
......@@ -667,10 +668,36 @@ class CliHandler(RegisteredCCHandler):
'hv_dest': dest['id'],
'author': self.client.login})
@listed
def console(self, tql):
""" Start a remote console on object matching the provided tql.
:param tql: tql matching only one object on which start the console
:return: the label of the created tunnel
"""
self.check('console', tql)
objects = self.server.list(tql, show=('r', 'p', 'h'))
if len(objects) != 1:
raise NotImplementedError('Console only support one tunnel at time for now')
errs = Reporter()
for obj in objects:
if obj['r'] in ('vm',):
client = self.server.get_client(obj['p'])
srv_to_host_tun = client.console(obj['h'])
cli_tun = self.client.register_tunnel('console', client, srv_to_host_tun)
errs.success(obj['id'], 'tunnel started.', output=cli_tun.label)
else:
errs.error(obj['id'], 'bad role')
return errs.get_dict()
@listed
def rshell(self, tql):
""" Start a remote shell on object matching the provided tql.
:param tql: tql matching only one object on which start the rshell
:return: the label of the created tunnel
"""
self.check('rshell', tql)
objects = self.server.list(tql, show=('r', 'p'))
if len(objects) != 1:
raise NotImplementedError('Rshell only support one tunnel at time for now')
......@@ -681,16 +708,21 @@ class CliHandler(RegisteredCCHandler):
srv_to_host_tun = client.rshell()
cli_tun = self.client.register_tunnel('rshell', client, srv_to_host_tun)
errs.success(obj['id'], 'tunnel started.', output=cli_tun.label)
elif obj['r'] in ('vm', ):
raise NotImplementedError('rshell on vm not implemented')
else:
errs.error(obj['id'], 'bad role')
return errs.get_dict()
@listed
def rshell_resize(self, label, row, col, xpixel, ypixel):
""" Resize the shell.
""" Send a resize event to the remote shell's tty.
:param label: label of the rshell tunnel to resize
:param row: number of rows
:param col: number of columns
:param xpixel: unused
:param ypixel: unused
"""
self.check('rshell')
ttype, client, ctun, stun = self.client.get_tunnel(label)
if ttype != 'rshell':
raise ValueError('Label does not refers on a rshell')
......@@ -700,6 +732,7 @@ class CliHandler(RegisteredCCHandler):
def rshell_wait(self, label):
""" Wait for a remote shell termination.
"""
self.check('rshell')
ttype, client, ctun, stun = self.client.get_tunnel(label)
if ttype != 'rshell':
raise ValueError('Label does not refers on a rshell')
......@@ -707,12 +740,38 @@ class CliHandler(RegisteredCCHandler):
rcode = client.rshell_wait(stun.label)
except Exception as err:
rcode = -1
logging.warning('Unexpected exit of tunnel: %s', err)
self.logger.warning('Unexpected exit of tunnel: %s', err)
self.client.unregister_tunnel(ctun.label)
ctun.close()
stun.close()
return rcode
@listed
def forward(self, label, login, port, destination='127.0.0.1'):
""" Forward a TCP port to the client.
:param label: label of the tunnel created by the client (cli side)
:param login: login of the remote client on which establish the tunnel
:param port: port on which establish the tunnel on destination
:param destination: tunnel destination (from the remote client side)
"""
self.check('forward', 'id=%s' % login)
# Create the tunnel to the node:
try:
host_client = self.server.get_client(login)
except KeyError:
raise KeyError('Specified client is not connected')
s2n_tun = host_client.forward(port, destination)
# Create tunnel to the CLI
self.client.register_tunnel('forward', host_client, s2n_tun)
@listed
def dbstats(self):
""" Get statistics about tql database.
"""
return self.server.db.stats()
def forward_call(self, login, func, *args, **kwargs):
""" Forward a call to a connected client and return result.
......@@ -732,6 +791,7 @@ class CliClient(Client):
ROLE = 'cli'
RPC_HANDLER = CliHandler
KILL_ALREADY_CONNECTED = True
def __init__(self, *args, **kwargs):
super(CliClient, self).__init__(*args, **kwargs)
......@@ -757,9 +817,21 @@ class CliClient(Client):
return ctun
def get_tunnel(self, label):
""" Get the tunnel binded to the provided label.
:return: a tuple (type, remote_client, tunnel, remote_client_tunnel)
where: **type** is a string provided on tunnel creation,
**remote_client** the client object of the remote client on which
the tunnel is established, **tunnel** the cli-to-server tunnel
object from the sjRpc, **remote_client_tunnel** the
server-to-remote-client tunnel object from the sjRpc.
"""
return self._tunnels[label]
def unregister_tunnel(self, label):
del self._tunnels[label]
try:
del self._tunnels[label]
except KeyError:
pass
Client.register_client_class(CliClient)
from ccserver.clients import Client
from cloudcontrol.server.clients import Client
class HostClient(Client):
......@@ -13,6 +13,13 @@ class HostClient(Client):
def execute(self, command):
return self.conn.call('execute_command', command)
def console(self, name):
""" Start a remote console on the specified vm.
"""
label = self.proxy.vm_open_console(name)
tun = self.conn.create_tunnel(label=label)
return tun
def rshell(self):
""" Start a remote shell on the host.
"""
......@@ -28,5 +35,12 @@ class HostClient(Client):
"""
return self.proxy.rshell_wait(label, _timeout=None)
def forward(self, port, destination='127.0.0.1'):
""" Create a forwarding tunnel on this client and return it.
"""
tun = self.conn.create_tunnel()
self.proxy.forward(tun.label, port, destination)
return tun
Client.register_client_class(HostClient)
import threading
from ccserver.handlers import listed
from ccserver.clients import Client, RegisteredCCHandler
from ccserver.clients.host import HostClient
from ccserver.db import RemoteTag
from cloudcontrol.server.handlers import listed
from cloudcontrol.server.clients import Client, RegisteredCCHandler
from cloudcontrol.server.clients.host import HostClient
from cloudcontrol.server.db import RemoteTag
from cloudcontrol.common.tql.db.object import TqlObject
from cloudcontrol.common.tql.db.tag import StaticTag, CallbackTag
from cloudcontrol.common.tql.db.tag import StaticTag
from functools import partial
......@@ -17,8 +17,7 @@ class HypervisorHandler(RegisteredCCHandler):
@listed
def register(self, obj_id, role):
'''
Register an object managed by the calling node.
""" Register an object managed by the calling node.
.. note:
the obj_id argument passed to this handler is the object id of the
......@@ -27,13 +26,12 @@ class HypervisorHandler(RegisteredCCHandler):
:param obj_id: the id of the object to register
:param role: the role of the object to register
'''
"""
self.client.register(obj_id, role)
@listed
def unregister(self, obj_id):
'''
Unregister an object managed by the calling node.
""" Unregister an object managed by the calling node.
.. note:
the obj_id argument passed to this handler is the object id of the
......@@ -41,7 +39,7 @@ class HypervisorHandler(RegisteredCCHandler):
preprend the id by "node_id." itself).
:param obj_id: the id of the object to unregister
'''
"""
self.client.unregister(obj_id)
@listed
......@@ -58,8 +56,7 @@ class HypervisorHandler(RegisteredCCHandler):
@listed
def sub_tags_unregister(self, obj_id, name):
"""
Unregister a remote tag for a child of the client.
""" Unregister a remote tag for a child of the client.
:param obj_id: child name
:param name: name of the tag to unregister
......@@ -160,8 +157,7 @@ class HvClient(HostClient):
self._children[obj_id].register(tag)
def sub_tags_unregister(self, obj_id, name):
"""
Unregister a remote tag for a child of the client.
""" Unregister a remote tag for a child of the client.
:param obj_id: child name
:param name: name of the tag to unregister
......@@ -198,7 +194,7 @@ class HvClient(HostClient):
""" Asynchronously update sub tags from the remote client using
specified watcher.
"""
watcher.register(self.conn, 'sub_tags', obj_id, tags, _data=robj)
watcher.register(self.conn, 'sub_tags', obj_id, tags, _data=(tags, robj))
Client.register_client_class(HvClient)
import logging
from ccserver.clients import Client, RegisteredCCHandler
from ccserver.handlers import listed
from cloudcontrol.server.clients import Client, RegisteredCCHandler
from cloudcontrol.server.handlers import listed
class SpvHandler(RegisteredCCHandler):
......@@ -15,7 +13,7 @@ class SpvHandler(RegisteredCCHandler):
:param query: the query to select objects to show
"""
logging.debug('Executed list function with query %s', query)
self.logger.debug('Executed list function with query %s', query)
objects = self.server.list(query)
return {'objects': objects}
......
#!/usr/bin/env python
#coding=utf8
'''
This module provide an abstraction to the clients configuration directory.
""" This module provide an abstraction to the clients configuration directory.
The client configuration directory contains a list of ``.json`` files, each
file contains the configuration for one client. The username of the client is
......@@ -33,24 +29,22 @@ u'node'
'tags': {},
'perms': None}
>>> conf.remove_account('rms')
>>>
'''
>>>
"""
import hashlib
import base64
import random
import threading
import logging
import json
import os
import re
from functools import wraps
def writer(func):
'''
Decorator used to threadsafize methods that made write operations on
client configuration tree.
'''
""" Decorator used to threadsafize methods that made write operations on
client configuration tree.
"""
@wraps(func)
def f(self, *args, **kwargs):
......@@ -60,11 +54,10 @@ def writer(func):
return f
class CCConf(object):
'''
Create a new configuration interface.
""" Create a new configuration interface.
:param path_directory: the directory to store the configuration files
'''
"""
CONF_TEMPLATE = {'password': None,
'role': None,
......@@ -73,7 +66,8 @@ class CCConf(object):
RE_SALTPW = re.compile(r'{(?P<method>[A-Z]+)}(?P<password>.+)')
def __init__(self, path_directory):
def __init__(self, logger, path_directory):
self.logger = logger
self._path = path_directory
self._lock = threading.RLock()
......@@ -81,65 +75,61 @@ class CCConf(object):
return self._lock.__enter__()
def __exit__(self, *args, **kwargs):
return self._lock.__exit__(*args, **kwargs)
return self._lock.__exit__(*args, **kwargs)
def _get_conf(self, login):
'''
"""
Return the configuration of a client by its login.
:param login: login of the client
:return: the configuration of the client
:raise CCConf.UnknownAccount: if user login is unknown
'''
"""
filename = os.path.join(self._path, '%s.json' % login)
if os.path.isfile(filename):
conf = json.load(open(filename, 'r'))
logging.debug('Getting configuration %s: %s', filename, conf)
self.logger.debug('Getting configuration %s: %s', filename, conf)
return conf
else:
raise CCConf.UnknownAccount('%s is not a file' % filename)
def _set_conf(self, login, conf, create=False):
'''
Update the configuration of a client by its login.
""" Update the configuration of a client by its login.
:param login: login of the client
:param conf: configuration to set for the client
:raise CCConf.UnknownAccount: if user login is unknown
'''
"""
filename = os.path.join(self._path, '%s.json' % login)
logging.debug('Writing configuration %s: %s', filename, conf)
self.logger.debug('Writing configuration %s: %s', filename, conf)
if os.path.isfile(filename) ^ create:
json.dump(conf, open(filename, 'w'))
else:
raise CCConf.UnknownAccount('%s is not a file' % filename)
def acquire(self):
'''
Acquire the configuration writing lock for non-atomic configuration
changes.
""" Acquire the configuration writing lock for non-atomic configuration
changes.
.. warning::
Don't forget to call the :meth:`release` method after your changes
for each :meth:`acquire` you made.
'''
"""
self._lock.acquire()
def release(self):
'''
Release the configuration writing lock.
'''
""" Release the configuration writing lock.
"""
self._lock.release()
def show(self, login):
'''
Show the configuration for specified account.
""" Show the configuration for specified account.
:param login: the login of the client
:return: configuration of user
'''
"""
return self._get_conf(login)
......@@ -183,14 +173,13 @@ class CCConf(object):
return provided_passwd
def _hash_password(self, password, method='ssha'):
'''
Hash a password using given method and return it.
""" Hash a password using given method and return it.
:param password: the password to hash
:param method: the hashing method
:return: hashed password
'''
"""
meth = '_auth_%s' % method.lower()
if hasattr(self, meth):
auth = getattr(self, meth)
......@@ -199,15 +188,14 @@ class CCConf(object):
raise CCConf.BadMethodError('Bad hashing method: %s' % repr(method))
def authentify(self, login, password):
'''
Authentify the client providing its login and password. The function
return the role of the client on success, or ``None``.
""" Authentify the client providing its login and password. The function
return the role of the client on success, or ``None``.
:param login: the login of the client
:param password: the password of the client
:return: the client's role or None on failed authentication
:raise CCConf.UnknownAccount: if user login is unknown
'''
"""
conf = self._get_conf(login)
passwd_conf = conf['password']
......@@ -230,8 +218,8 @@ class CCConf(object):
auth = getattr(self, meth)
is_valid = auth(password, password_wo_method) == passwd_conf
else:
logging.warning('Bad authentication method for %s: '
'%s', login, m.group('method'))
self.logger.warning('Bad authentication method for %s: '
'%s', login, m.group('method'))
if is_valid:
return conf['role']
else:
......@@ -239,14 +227,13 @@ class CCConf(object):
@writer
def set_password(self, login, password, method='ssha'):
'''
Update the client's password in the configuration.
""" Update the client's password in the configuration.
:param login: login of the user
:param password: new password
:param method: the hashing method to use
:raise CCConf.UnknownAccount: if user login is unknown
'''
"""
conf = self._get_conf(login)
password = self._hash_password(password, method)
......@@ -255,32 +242,30 @@ class CCConf(object):
@writer
def add_tag(self, login, tag_name, tag_value):
'''
Add the tag to the user.
""" Add the tag to the user.
:param login: login of the user
:param tag_name: tag name to add to the user
:param tag_value: the tag value
:raise CCConf.UnknownAccount: if user login is unknown
'''
"""
logging.debug('Added tag %s:%s for %s account',
tag_name, tag_value, login)
self.logger.debug('Added tag %s:%s for %s account',
tag_name, tag_value, login)
conf = self._get_conf(login)
conf['tags'][tag_name] = tag_value
self._set_conf(login, conf)
@writer
def remove_tag(self, login, tag):
'''
Remove the tag to the user.
""" Remove the tag to the user.
:param login: login of the user
:param tag: tag to remove to the user
:raise CCConf.UnknownAccount: if user login is unknown
'''
"""
logging.debug('Removed tag %s for %s account', login, tag)
self.logger.debug('Removed tag %s for %s account', login, tag)
conf = self._get_conf(login)
if tag in conf['tags']:
del conf['tags'][tag]
......@@ -288,14 +273,13 @@ class CCConf(object):
@writer
def remove_account(self, login):
'''
Remove the configuration of the account.
""" Remove the configuration of the account.
:param login: login of the account to remove
:raise CCConf.UnknownAccount: if user login is unknown
'''
"""
logging.debug('Removed %s account', login)
self.logger.debug('Removed %s account', login)
filename = os.path.join(self._path, '%s.json' % login)
if os.path.exists(filename):
os.remove(filename)
......@@ -304,16 +288,15 @@ class CCConf(object):
@writer
def create_account(self, login, role, password):
'''
Create a new account.
""" Create a new account.
:param login: login of the new user
:param password: password of the new user
:param role: the role of the new user
:raise CCConf.AlreadyExistingAccount: if the login is already
'''
"""
logging.debug('Creating %s account with role %s', login, role)
self.logger.debug('Creating %s account with role %s', login, role)
filename = os.path.join(self._path, '%s.json' % login)
if os.path.exists(filename):
raise CCConf.AlreadyExistingAccount('%s found' % filename)
......@@ -327,15 +310,14 @@ class CCConf(object):
@writer
def copy_account(self, copy_login, login, password):
'''
Create a new account based on another.
""" Create a new account based on another.
:param copy_login: the login of the account to copy.
:param password: password of the new user
:param role: the role of the new user
:raise CCConf.AlreadyExistingAccount: if the login is already
:raise CCConf.UnknownAccount: if the copy login doesn't exist
'''
"""
conf_copy = self._get_conf(copy_login)
self.create_account(login, conf_copy['role'], password)
......@@ -345,8 +327,7 @@ class CCConf(object):
@writer
def add_right(self, login, tql, method=None, target='allow', index=None):
'''
Add a right rule to the provided account.
""" Add a right rule to the provided account.
:param login: the login of the account
:param tql: the TQL request to allow
......@@ -358,7 +339,7 @@ class CCConf(object):
.. note::
If the index is out of range, the rule will be added to the end of
the ruleset.
'''
"""
conf = self._get_conf(login)
rights = conf['rights']
......@@ -370,12 +351,11 @@ class CCConf(object):
@writer
def remove_right(self, login, index):
'''
Remove a right rule from the provided account.
""" Remove a right rule from the provided account.
:param login: the login of the account
:param index: the index of the rule to delete or None for all rules
'''
"""
conf = self._get_conf(login)
if index is None:
......@@ -389,12 +369,11 @@ class CCConf(object):
self._set_conf(login, conf)
def list_accounts(self):
'''
List all registered accounts.
""" List all registered accounts.
:return: :class:`tuple` of :class:`str`, each item being an
account login
'''
"""
logins = []
for filename in os.listdir(self._path):
......
......@@ -20,18 +20,25 @@ class SObject(TqlObject):
def __init__(self, *args, **kwargs):
super(SObject, self).__init__(*args, **kwargs)
self._overridden = defaultdict(lambda: [])
self._overridden = defaultdict(lambda: None)
def register(self, tag, override=False):
""" Register a tag on this object (or override).
"""
# If the tag must override another one, move the old one on the
# overridden tags dict:
if override:
# The tag to register must override an eventual existing tag.
# Overridden tag is moved in the overridden tags dict:
if tag.name in self._tags:
self._overridden[tag.name].append(self._tags[tag.name])
self._overridden[tag.name] = self._tags[tag.name]
del self._tags[tag.name]
elif tag.name in self._tags:
# The tag to register is already overridden, we place it directly
# on the overridden tags dict:
if tag.name in self._overridden:
raise KeyError('A tag with this name is already registered on this object')
self._overridden[tag.name] = tag
return
return super(SObject, self).register(tag)
def unregister(self, name, override=False):
......@@ -40,14 +47,15 @@ class SObject(TqlObject):
super(SObject, self).unregister(name)
# If a tag is overriden, replace it on the tag list:
if override and name in self._overridden:
self._tags[name] = self._overridden[name].pop()
self._tags[name] = self._overridden[name]
del self._overridden[name]
def is_overriding(self, name):
""" Return True if a tag is overriding another one for the name.
If the tag is not found, False is returned.
"""
return bool(self._overridden[name])
return self._overridden[name] is not None
class CCSAsyncTagInterface(BaseTagInterface):
......@@ -77,7 +85,7 @@ class RemoteTag(BaseTag):
def __init__(self, name, callback, ttl=None):
super(RemoteTag, self).__init__(name)
self._callback = callback
self._ttl = ttl if ttl != -1 else None #FIXME: ANAEL !!!!
self._ttl = ttl
self._cache_last_update = None
self._cache_value = u''
......@@ -151,8 +159,17 @@ class SRequestor(StaticRequestor):
cb = tuple(to_update)[0].callback
cb(watcher, obj, [t.name for t in to_update])
# Get and process the results:
for update in watcher.wait(timeout=60): #TODO: adaptative timeout
obj = update['data']
for tag_name, tag_value in update['return'].iteritems():
obj.set(tag_name, tag_value)
obj[tag_name].cached = tag_value # Set the tag cache value
for update in watcher.iter(timeout=4, raise_timeout=True): #TODO: adaptative timeout
requested_tags, obj = update['data']
if 'return' not in update:
for tag_name in requested_tags:
obj.set(tag_name, '#ERR#')
else:
tags = update['return']
for tag_name in requested_tags:
tag_value = tags.get(tag_name)
if tag_value is None:
obj.set(tag_name, '#ERR')
else:
obj.set(tag_name, tag_value)
obj[tag_name].cached = tag_value # Set the tag cache value
#!/usr/bin/env python
#coding=utf8
'''
This module contains the hypervisor destination election stuff.
'''
from __future__ import absolute_import
""" This module contains the hypervisor destination election stuff.
"""
from copy import copy
from ccserver.exceptions import (UnknownElectionAlgo, UnknownElectionType,
from cloudcontrol.server.exceptions import (UnknownElectionAlgo, UnknownElectionType,
ElectionError)
def tags(*args):
'''
Decorator used to declare tags used by a filter.
'''
""" Decorator used to declare tags used by a filter.
"""
def decorator(func):
func.__tags__ = set(args)
......@@ -64,13 +57,12 @@ class Elector(object):
self._login = login
def election(self, mtype, algo):
'''
Generate a new migration plan for this election. You must specify the
migration type and the distribution algoritm.
""" Generate a new migration plan for this election. You must specify
the migration type and the distribution algoritm.
:param mtype: the migration type
:param algo: the distribution algoritm
'''
"""
# Check the choosen election method:
if mtype not in self.ALGO_BY_TYPES:
......