#!/usr/bin/env python
#coding=utf8
'''
Jobs management on the server.
'''
from __future__ import absolute_import
import re
import logging
import time
from datetime import datetime
from threading import Thread, Lock
from ccserver.exceptions import (BadJobTypeError, UnknownJobError, JobError,
UnknownObjectError)
from ccserver.utils import AcquiresAllOrNone
class JobCancelError(Exception):
'''
Exception used by jobs to stop it when a cancel signal is sent.
'''
pass
class BaseJob(dict, Thread, object):
'''
A base class to define a job.
The standards job items are:
* id: id of the job
* status: message explaining the current job status
* done: True if the job is done
* cancelled: True if job has been cancelled by user
* created: job date of creation
* ended: job date of end (or None if done = False)
* duration: duration in seconds of the job (processed on export)
* author: author login of the job
:param manager: the :class:`JobsManager` instance.
'''
def __init__(self, manager, *args, **kwargs):
# Initialize the inherited classes:
dict.__init__(self, *args, **kwargs)
Thread.__init__(self)
# The manager of this job:
self.manager = manager
# Define default job properties:
self['status'] = 'pending'
self['done'] = False
self['cancelled'] = False
self['created'] = datetime.now()
self['ended'] = None
self['duration'] = 0
#~ assert self.get('author') is not None, 'author is not defined'
# List of actions to do by the rollback method:
self._wayback = []
# Set the thread name:
self.name = 'job-%s' % self['id']
def __hash__(self):
return self['id'].__hash__()
def __setitem__(self, key, value):
if key == 'id':
raise KeyError('Key %r in read-only.' % key)
else:
super(BaseJob, self).__setitem__(key, value)
def __delitem__(self, key):
if key == 'id':
raise KeyError('Key %r in read-only.' % key)
else:
super(BaseJob, self).__delitem__(key)
def report(self, status, done=None):
'''
Report the status of the job.
:param status: the status to set to the job
:param done: is the job done, None to keep current value
'''
self['status'] = status
if done is not None:
self['ended'] = datetime.now()
self['done'] = done
def run(self):
'''
Run the job itself.
'''
try:
self.job()
except JobCancelError as err:
self._rollback('%s' % err)
except Exception as err:
logging.error('Error while executing job: %s, %r', err, err)
self._rollback('%s' % err)
else:
self.report('success', done=True)
def job(self):
'''
Method to override to define the job's behavior.
'''
pass
def checkpoint(self, func=None):
'''
Check if job is not cancelled, else raise the CancellJobError. Also
add the provided function (optionnal) to the wayback list.
:param func: callable to add to the wayback list.
'''
if self['cancelled']:
raise JobCancelError('Job has been cancelled by user')
if func is not None:
self._wayback.append(func)
def _rollback(self, error):
'''
Rollback the job.
'''
self.report('rollbacking')
try:
for func in reversed(self._wayback):
func()
except Exception as err:
self.report('rollback failed: %s' % err, done=True)
else:
self.report('cancelled: %s' % error, done=True)
def cancel(self):
'''
Cancel the job.
.. note::
You can override this method to trigger an action when user cancel
the job.
'''
if self['done']:
raise JobError('Job is done')
if self['cancelled']:
raise JobError('Job is already cancelled')
self['cancelled'] = True
self.report('cancelling')
def export(self, props=None):
'''
Export the job in a simple dict format.
'''
exported = {}
for key, val in self.iteritems():
if key.startswith('_') or (props is not None and key not in props):
continue
if isinstance(val, datetime):
val = int(time.mktime(val.timetuple()))
if key == 'duration':
if self['done']:
dt = self['ended'] - self['created']
val = dt.seconds + dt.days * 86400
else:
now = datetime.now()
dt = now - self['created']
val = dt.seconds + dt.days * 86400
exported[key] = str(val)
return exported
class KillClientJob(BaseJob):
'''
A job used to kill connected accounts.
Mandatory items:
* account: the account login to kill
Optional items:
* gracetime: time before to kill the user
'''
def job(self):
gracetime = self.get('gracetime')
account = self.get('account')
assert account is not None, 'Account not specified'
if gracetime is not None:
time.sleep(int(gracetime))
self.checkpoint()
self.manager.server.kill(account)
class KillOldCliJob(BaseJob):
'''
Typically an hidden job used to kill clients who are connected/idle since
too much time.
Mandatory items:
* maxcon: maximum connection time in minutes
* maxidle: maximum idle time in minutes
Optional items:
* delay: delay in secondes between two checks (default 1m)
'''
DEFAULT_DELAY = 60
def job(self):
maxcon = self.get('maxcon')
assert maxcon is not None, 'maxcon is None'
maxidle = self.get('maxidle')
assert maxidle is not None, 'maxidle is None'
delay = self.get('delay', self.DEFAULT_DELAY)
while True:
self.checkpoint()
for client in self.manager.server.iter_connected_role('cli'):
if client.get_uptime() > (maxcon * 60):
self.manager.server.kill(client.login)
#TODO: handle idleing.
time.sleep(delay)
class BaseMigrationJob(BaseJob):
'''
Base class for migration jobs.
'''
def _check_status(self, vm_id, status):
'''
Check the status of the VM.
'''
answer = self.manager.server.list('id=%s&status=%s' % (vm_id, status))
return bool(answer)
class ColdMigrationJob(BaseMigrationJob):
'''
A cold vm migration job.
Mandatory items:
* vm_name: name of the vm to migrate
* hv_source: name of the hv which execute the VM
* hv_dest: the destination hypervisor
* author: login of the author cli
'''
def job(self):
vm_id = '%s.%s' % (self['hv_source'], self['vm_name'])
self['title'] = 'Cold migration %s --> %s' % (vm_id, self['hv_dest'])
logging.info('Job-%s: Started migration for %s', self['id'], vm_id)
# Cancel the job if the user has not the right to migrate the vm or to
# select an hypervisor as destination:
right_check = self.manager.server.check
tql = 'id=%s' % vm_id
if not right_check(self['author'], 'coldmigrate', tql):
raise JobCancelError('author have no rights to migrate this VM')
tql = 'id=%s' % self['hv_dest']
if not right_check(self['author'], 'coldmigrate_dest', tql):
raise JobCancelError('author have no right to migrate to this hv')
# Update the VM object:
try:
self.manager.server.objects.update(ids=(vm_id,))
vm = self.manager.server.objects.get_by_id(vm_id)
except UnknownObjectError:
raise JobCancelError('Source VM not found')
# Get the source and destination hv clients:
try:
source = self.manager.server.get_connection(self['hv_source'])
except KeyError:
raise JobCancelError('source hypervisor is not connected')
try:
dest = self.manager.server.get_connection(self['hv_dest'])
except KeyError:
raise JobCancelError('destination hypervisor is not connected')
self.checkpoint()
self.report('waiting lock for source and dest hypervisors')
logging.info('Job-%s: Trying to acquire locks', self['id'])
with AcquiresAllOrNone(source.lock, dest.lock):
logging.info('Job-%s: Locks acquired', self['id'])
self.checkpoint()
if not self._check_status(vm_id, 'stopped'):
raise JobCancelError('vm is not stopped')
# Create storages on destination:
self.report('create volumes')
for disk in vm.get('disk', '').split():
# Getting informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
size = vm.get('disk%s_size' % disk)
assert pool is not None, 'pool tag doesn\'t exists'
assert name is not None, 'name tag doesn\'t exists'
assert size is not None, 'size tag doesn\'t exists'
# Create the volume on destination:
dest.proxy.vol_create(pool, name, int(size))
logging.info('Job-%s: Created volume %s/%s on destination '
'hypervisor', self['id'], pool, name)
# Rollback stuff for this action:
def rb_volcreate():
dest.proxy.vol_delete(pool, name)
self.checkpoint(rb_volcreate)
# Define VM:
self.report('define vm')
logging.info('Job-%s: XML configuration transfert', self['id'])
vm_config = source.proxy.vm_export(self['vm_name'])
dest.proxy.vm_define(vm_config)
# Rollback stuff for vm definition:
def rb_define():
dest.proxy.vm_undefine(self['vm_name'])
self.checkpoint(rb_define)
# Copy all source disk on destination disk:
for disk in vm.get('disk', '').split():
self._copy_disk(source, dest, vm, disk)
# At this point, if operation is a success, all we need is just to
# cleanup source hypervisor from disk and vm. This operation *CAN'T*
# be cancelled or rollbacked if anything fails (unlikely). The
# migration must be considered as a success, and the only way to
# undo this is to start a new migration in the other way.
# Delete the rollback list.
# This is mandatory to avoid data loss if the cleanup
# code below fail.
self._wayback = []
# Cleanup the disks:
for disk in vm.get('disk', '').split():
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
source.proxy.vol_delete(pool, name)
# Cleanup the VM:
source.proxy.vm_undefine(self['vm_name'])
logging.info('Job-%s: Migration completed with success', self['id'])
def _copy_disk(self, source, dest, vm, disk):
'''
Copy the specified disk name of the vm from source to dest.
'''
# Get informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
logging.info('Job-%s: Started copy for %s/%s', self['id'], pool, name)
self.report('copy %s/%s' % (pool, name))
# Make the copy and wait for it end:
xferprop = dest.proxy.vol_import(pool, name)
# Register the cancel function:
def cancel_xfer():
dest.proxy.vol_import_cancel(xferprop['id'])
self['func_cancel_xfer'] = cancel_xfer
# Wait for the end of transfert:
cids = set()
cids.add(source.connection.async_call('vol_export', pool, name,
dest.get_ip(), xferprop['port']))
cids.add(dest.connection.async_call('vol_import_wait', xferprop['id']))
msgs = self.manager.server.manager.wait(frozenset(cids))
del self['func_cancel_xfer']
# Compare checksum of two answers:
checksums = []
assert len(msgs) == 2
for msg in msgs:
if msg.get('error') is not None:
msg = 'error while copy: %s' % msg['error']['message']
raise JobCancelError(msg)
else:
checksums.append(msg.get('checksum'))
self.checkpoint()
if checksums[0] != checksums[1]:
raise JobCancelError('checksum mismatches')
def cancel(self):
if self.get('func_cancel_xfer') is not None:
self.get('func_cancel_xfer')()
super(ColdMigrationJob, self).cancel()
class HotMigrationJob(BaseMigrationJob):
'''
A hot vm migration job.
Mandatory items:
* vm_name: name of the vm to migrate
* hv_source: name of the hv which execute the VM
* hv_dest: the destination hypervisor
* author: login of the author cli
'''
def job(self):
vm_id = '%s.%s' % (self['hv_source'], self['vm_name'])
self['title'] = 'Hot migration %s --> %s' % (vm_id, self['hv_dest'])
logging.info('Job-%s: Started hot migration for %s', self['id'], vm_id)
# Cancel the job if the user has not the right to migrate the vm or to
# select an hypervisor as destination:
right_check = self.manager.server.check
tql = 'id=%s' % vm_id
if not right_check(self['author'], 'hotmigrate', tql):
raise JobCancelError('author have no rights to migrate this VM')
tql = 'id=%s' % self['hv_dest']
if not right_check(self['author'], 'hotmigrate_dest', tql):
raise JobCancelError('author have no right to migrate to this hv')
# Update the VM object:
try:
self.manager.server.objects.update(ids=(vm_id,))
vm = self.manager.server.objects.get_by_id(vm_id)
except UnknownObjectError:
raise JobCancelError('Source VM not found')
# Get the source and destination hv clients:
try:
source = self.manager.server.get_connection(self['hv_source'])
except KeyError:
raise JobCancelError('source hypervisor is not connected')
try:
dest = self.manager.server.get_connection(self['hv_dest'])
except KeyError:
raise JobCancelError('destination hypervisor is not connected')
self.checkpoint()
self.report('waiting lock for source and dest hypervisors')
logging.info('Job-%s: Trying to acquire locks', self['id'])
with AcquiresAllOrNone(source.lock, dest.lock):
logging.info('Job-%s: Locks acquired', self['id'])
self.checkpoint()
if not self._check_status(vm_id, 'running'):
raise JobCancelError('vm is not started')
# Create storages on destination and start synchronization:
disks = vm.get('disk', '').split()
for disk in disks:
# Getting informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
size = vm.get('disk%s_size' % disk)
assert pool is not None, 'pool tag doesn\'t exists'
assert name is not None, 'name tag doesn\'t exists'
assert size is not None, 'size tag doesn\'t exists'
self.report('sync volume %s/%s (creation)' % (pool, name))
# Create the volume on destination:
dest.proxy.vol_create(pool, name, int(size))
logging.info('Job-%s: Created volume %s/%s on destination '
'hypervisor', self['id'], pool, name)
# Rollback stuff for this action:
def rb_volcreate():
dest.proxy.vol_delete(pool, name)
self.checkpoint(rb_volcreate)
# Setup the drbd synchronization with each hypervisors:
self.report('sync volume %s/%s (setup)' % (pool, name))
to_cleanup = []
res_src = source.proxy.drbd_setup(pool, name)
def rb_setupsrc():
source.proxy.drbd_shutdown(res_src)
self.checkpoint(rb_setupsrc)
to_cleanup.append(rb_setupsrc)
res_dst = dest.proxy.drbd_setup(pool, name)
def rb_setupdst():
dest.proxy.drbd_shutdown(res_dst)
self.checkpoint(rb_setupdst)
to_cleanup.append(rb_setupdst)
# Start connection of drbd:
self.report('sync volume %s/%s (connect)' % (pool, name))
source.proxy.drbd_connect(res_src, res_dst, dest.get_ip())
dest.proxy.drbd_connect(res_dst, res_src, source.get_ip())
# Setup topology as Primary/Secondary:
source.proxy.drbd_role(res_src, True)
dest.proxy.drbd_role(res_dst, False)
# Wait for the end of the disk synchronization:
self.report('sync volume %s/%s (sync)' % (pool, name))
cids = set()
cids.add(source.connection.async_call('drbd_sync', res_src))
cids.add(dest.connection.async_call('drbd_sync', res_dst))
msgs = self.manager.server.manager.wait(frozenset(cids))
#TODO: check error
source.proxy.drbd_takeover(res_src, True)
def rb_takeover_src():
source.proxy.drbd_takeover(res_src, False)
self.checkpoint(rb_takeover_src)
to_cleanup.append(rb_takeover_src)
dest.proxy.drbd_takeover(res_dst, True)
def rb_takeover_dst():
dest.proxy.drbd_takeover(res_dst, False)
self.checkpoint(rb_takeover_dst)
to_cleanup.append(rb_takeover_dst)
#...
tunres_src = source.proxy.tun_setup()
def rb_tun_src():
source.proxy.tun_destroy(tunres_src)
self.checkpoint(rb_tun_src)
tunres_dst = dest.proxy.tun_setup(local=False)
def rb_tun_dst():
dest.proxy.tun_destroy(tunres_dst)
self.checkpoint(rb_tun_dst)
source.proxy.tun_connect(tunres_src, tunres_dst, dest.get_ip())
dest.proxy.tun_connect_hv(tunres_dst, migration=True)
# Initiate the live migration:
self.report('migration in progress')
source.proxy.vm_migrate_tunneled(self['vm_name'], tunres_src)
# At this point, if operation is a success, all we need is just to
# cleanup source hypervisor from disk and vm. This operation *CAN'T*
# be cancelled or rollbacked if anything fails (unlikely). The
# migration must be considered as a success, and the only way to
# undo this is to start a new migration in the other way.
# Delete the rollback list.
# This is mandatory to avoid data loss if the cleanup
# code below fail.
self.report('cleanup')
self._wayback = []
source.proxy.tun_destroy(tunres_src)
dest.proxy.tun_destroy(tunres_dst)
for cb_cleanup in reversed(to_cleanup):
cb_cleanup()
# Cleanup the disks:
for disk in disks:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
source.proxy.vol_delete(pool, name)
logging.info('Job-%s: Migration completed with success', self['id'])
class CloneJob(BaseMigrationJob):
'''
A clone job.
Mandatory items:
* vm_name: name of the vm to migrate
* new_vm_name: the name of the cloned vm
* hv_source: name of the hv which execute the VM
* hv_dest: the destination hypervisor
* author: login of the author cli
'''
def job(self):
vm_id = '%s.%s' % (self['hv_source'], self['vm_name'])
self['title'] = 'Clone %s --> %s' % (vm_id, self['hv_dest'])
logging.info('Job-%s: Started clone for %s', self['id'], vm_id)
# Cancel the job if the user has not the right to clone the vm or to
# select an hypervisor as destination:
right_check = self.manager.server.check
tql = 'id=%s' % vm_id
if not right_check(self['author'], 'clone', tql):
raise JobCancelError('author have no rights to migrate this VM')
tql = 'id=%s' % self['hv_dest']
if not right_check(self['author'], 'clone_dest', tql):
raise JobCancelError('author have no right to migrate to this hv')
# Update the VM object:
try:
self.manager.server.objects.update(ids=(vm_id,))
vm = self.manager.server.objects.get_by_id(vm_id)
except UnknownObjectError:
raise JobCancelError('Cloned VM not found')
# Get the source and destination hv clients:
try:
source = self.manager.server.get_connection(self['hv_source'])
except KeyError:
raise JobCancelError('source hypervisor is not connected')
try:
dest = self.manager.server.get_connection(self['hv_dest'])
except KeyError:
raise JobCancelError('destination hypervisor is not connected')
self.checkpoint()
self.report('waiting lock for source and dest hypervisors')
logging.info('Job-%s: Trying to acquire locks', self['id'])
with AcquiresAllOrNone(source.lock, dest.lock):
logging.info('Job-%s: Locks acquired', self['id'])
self.checkpoint()
# Create storages on destination:
old_new_disk_mapping = {} # Mapping between old and new disk names
self.report('create volumes')
for disk in vm.get('disk', '').split():
# Getting informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
size = vm.get('disk%s_size' % disk)
assert pool is not None, 'pool tag doesn\'t exists'
assert name is not None, 'name tag doesn\'t exists'
assert size is not None, 'size tag doesn\'t exists'
# Change the name of the disk:
old_name = name
if name.startswith(self['vm_name']):
suffix = name[len(self['vm_name']):]
name = self['new_vm_name'] + suffix
else:
name = '%s_%s' % (self['new_vm_name'], name)
fulloldname = '/dev/%s/%s' % (pool, old_name)
fullnewname = '/dev/%s/%s' % (pool, name)
old_new_disk_mapping[fulloldname] = fullnewname
# Create the volume on destination:
dest.proxy.vol_create(pool, name, int(size))
logging.info('Job-%s: Created volume %s/%s on destination '
'hypervisor (was %s)', self['id'], pool, name,
old_name)
# Rollback stuff for this action:
def rb_volcreate():
dest.proxy.vol_delete(pool, name)
self.checkpoint(rb_volcreate)
# Define VM:
self.report('define vm')
logging.info('Job-%s: XML configuration transfert', self['id'])
vm_config = source.proxy.vm_export(self['vm_name'])
# Change vm configuration XML to update it with new name:
new_vm_config = self._update_xml(vm_config, self['vm_name'],
self['new_vm_name'],
old_new_disk_mapping)
dest.proxy.vm_define(new_vm_config)
# Rollback stuff for vm definition:
def rb_define():
dest.proxy.vm_undefine(name)
self.checkpoint(rb_define)
# Copy all source disk on destination disk:
for disk in vm.get('disk', '').split():
self._copy_disk(source, dest, vm, disk, name)
logging.info('Job-%s: Clonage completed with success', self['id'])
def _update_xml(self, vm_config, old_name, name, old_new_name_mapping):
'''
Update the XML definition of the VM with the new vm name, the new vm
disk, and remove the uuid tag.
'''
vm_config = vm_config.replace('%s' % old_name,
'%s' % name)
vm_config = re.sub('(.*?\n)', '', vm_config)
for old, new in old_new_name_mapping.iteritems():
vm_config = vm_config.replace("='%s'" % old,
"='%s'" % new)
return vm_config
def _copy_disk(self, source, dest, vm, disk, new_disk):
'''
Copy the specified disk name of the vm from source to dest.
'''
# Get informations about the disk:
pool = vm.get('disk%s_pool' % disk)
name = vm.get('disk%s_vol' % disk)
logging.info('Job-%s: Started copy for %s/%s to %s/%s',
self['id'], pool, name, pool, new_disk)
self.report('copy %s/%s' % (pool, name))
# Make the copy and wait for it end:
xferprop = dest.proxy.vol_import(pool, new_disk)
# Register the cancel function:
def cancel_xfer():
dest.proxy.vol_import_cancel(xferprop['id'])
self['func_cancel_xfer'] = cancel_xfer
# Wait for the end of transfert:
cids = set()
cids.add(source.connection.async_call('vol_export', pool, name,
dest.get_ip(), xferprop['port']))
cids.add(dest.connection.async_call('vol_import_wait', xferprop['id']))
msgs = self.manager.server.manager.wait(frozenset(cids))
del self['func_cancel_xfer']
# Compare checksum of two answers:
checksums = []
assert len(msgs) == 2
for msg in msgs:
if msg.get('error') is not None:
msg = 'error while copy: %s' % msg['error']['message']
raise JobCancelError(msg)
else:
checksums.append(msg.get('checksum'))
self.checkpoint()
if checksums[0] != checksums[1]:
raise JobCancelError('checksum mismatches')
def cancel(self):
if self.get('func_cancel_xfer') is not None:
self.get('func_cancel_xfer')()
super(CloneJob, self).cancel()
class JobsManager(object):
'''
Manage the current job list.
:param server: The :class:`CCServer` instance.
'''
JOBS_TYPES = {
'kill': KillClientJob,
'kill_oldcli': KillOldCliJob,
'cold_migrate': ColdMigrationJob,
'hot_migrate': HotMigrationJob,
'clone': CloneJob,
}
def __init__(self, server):
# The main job dict, the keys are id of jobs.
self._jobs = {}
# The server:
self.server = server
# The id of the next job and it's lock:
self._current_id = 1
self._current_id_lock = Lock()
def create(self, jtype, **kwargs):
'''
Create a new job.
:param jtype: the type of the new job
:param \*\*kwargs: arguments to pass to the job
:raise BadJobTypeError: when invalid jtype is passed
'''
jobid = self.get_id()
jobtype = JobsManager.JOBS_TYPES.get(jtype)
if jobtype is None:
raise BadJobTypeError('Invalid job type %r' % jtype)
job = jobtype(self, id=jobid, **kwargs)
self._jobs[jobid] = job
job.daemon = True
job.start()
return job
def get_id(self):
'''
Get the current id and increment the counter.
'''
with self._current_id_lock:
jobid = self._current_id
self._current_id += 1
return jobid
def cancel(self, jobid):
'''
Cancel the provided job.
'''
job = self._jobs.get(jobid)
if job is None:
raise UnknownJobError('Invalid job id: %r' % jobid)
elif job.get('_hidden', False):
raise UnknownJobError('Invalid job id: %r (hidden)' % jobid)
else:
job.cancel()
def purge(self):
'''
Purge all done jobs.
'''
for job in self._jobs.values():
if job['done']:
del self._jobs[job['id']]
def iterjobs(self, show_done=True, show_running=True):
'''
Iter over jobs.
:param done: If set, iterate over done or not done jobs.
'''
for job in self._jobs.itervalues():
if (show_done and job['done'] or show_running and not job['done']
and not job.get('_hidden')):
yield job