Commit 8a30ff53 authored by Thibault VINCENT's avatar Thibault VINCENT
Browse files

volume migration + fixes

parent c6d817d4
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@ from subprocess import Popen, PIPE
from multiprocessing import cpu_count
from platform import platform, machine
from socket import gethostbyaddr, gethostname
from jobs import JobManager


class Host(object):
@@ -34,6 +35,8 @@ class LocalHost(Host):
        '''
        '''
        super(LocalHost, self).__init__()
        self.jobmgr = JobManager(self)
    
    def scheduler_run(self):
        '''
        '''
+30 −0
Original line number Diff line number Diff line
@@ -56,3 +56,33 @@ class StorageVolumeError(StorageError):
    '''
    '''
    pass


# job errors

class JobManagerError(CCException):
    '''
    '''
    pass


class JobError(CCException):
    '''
    '''
    pass


class XferJobError(JobError):
    '''
    '''
    pass
    
class ReceiveFileJobError(XferJobError):
    '''
    '''
    pass

class SendFileJobError(XferJobError):
    '''
    '''
    pass
+104 −10
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-

from logging import debug, warning, info
from fnmatch import fnmatchcase
from time import sleep
from sjrpc.core import RpcError
from __init__ import __version__
from sjrpc.utils import RpcHandler
from sjrpc.utils import pure
from logging import debug, warning, info
from exceptions import HypervisorError
from errors import HypervisorError
from common import LocalHost
from jobs import ReceiveFileJob, SendFileJob
from __init__ import __version__


_MOD_KVM = True
@@ -86,6 +88,7 @@ class NodeHandler(RpcHandler):
            'h'         : self._tag_map_direct('get_name', 24*3600),
            # one hour
            # one minute
            #'cpuremain' : self._tag_map_direct('get_cpu_remain', -1),
            'memfree'   : self._tag_map_direct('get_mem_free', 60),
            'memused'   : self._tag_map_direct('get_mem_used', 60),
            'sto'       : ( lambda o: hasattr(o, 'storage'),
@@ -232,6 +235,12 @@ class NodeHandler(RpcHandler):
        if len(volumes):
            result['disk'] = ' '.join([str(i) for i in range(0, len(volumes))])
        for vol_id, vol in enumerate(volumes):
            name = vol.get_name()
            if name:
                result['disk%i_vol' % vol_id] = str(name)
            pool = vol.get_pool()
            if pool:
                result['disk%i_pool' % vol_id] = str(pool.name())
            path = vol.get_path()
            if path:
                result['disk%i_path' % vol_id] = str(path)
@@ -661,11 +670,96 @@ class NodeHandler(RpcHandler):
            except:
                pass
    
    ##################################
    #   Storage control
    ##################################
    
    @pure
    def execute_command(self, command):
        '''
        '''
        warning('execute_command: starting execution of `%s`' % command)
        output = self._host_handle.execute(command)
        warning('execute_command: finished execution of `%s`' % command)
        return output
    def vol_create(self, pool, name, size):
        size = int(size)
        if hasattr(self._host_handle, 'storage'):
            pool = self._host_handle.storage().pool_get(pool_name)
            pool.volume_create(name, size)
        else:
            raise NotImplementedError('host handler has no storage support')
    
    @pure
    def vol_delete(self, pool, name, wipe=False):
        if hasattr(self._host_handle, 'storage'):
            pool = self._host_handle.storage().pool_get(pool_name)
            vol = pool.volume_get(name)
            if wipe:
                vol.wipe()
            vol.delete()
        else:
            raise NotImplementedError('host handler has no storage support')
    
    @pure
    def vol_export(self, pool, name, raddr, rport):
        rport = int(rport)
        if hasattr(self._host_handle, 'storage'):
            # get device path info
            sto = self._host_handle.storage()
            vol_path = sto.pool_get(pool).volume_get(name).get_path()
            # create job
            job = SendFileJob(self._host_handle.jobmgr, vol_path, raddr, rport)
            job.start_now()
            jid = job.get_id()
            # wait for job completion
            while self._host_handle.jobmgr.is_job_running(jid):
                sleep(2)
            # return job report
            res = {}
            res['id'] = jid
            res['log'] = job.get_log()
            res['checksum'] = job.get_checksum()
            return res
        else:
            raise NotImplementedError('host handler has no storage support')
    
    @pure
    def vol_import(self, pool, name, timeout=30):
        timeout = int(timeout)
        if hasattr(self._host_handle, 'storage'):
            # get device path info
            sto = self._host_handle.storage()
            vol_path = sto.pool_get(pool).volume_get(name).get_path()
            # create job
            # FIXME timeout
            job = ReceiveFileJob(self._host_handle.jobmgr, vol_path)
            job.prepare()
            job.start_now()
            # return job info
            res = {}
            res['id'] = job.get_id()
            res['port'] = job.get_port()
            return res
        else:
            raise NotImplementedError('host handler has no storage support')
    
    @pure
    def vol_import_wait(self, jid):
        '''
        '''
        jid = int(jid)
        jmgr = self._host_handle.jobmgr
        # wait for the job to be finished
        while jmgr.is_job_running(jid):
            sleep(2)
        # get the job
        job = jmgr.get_job(jid)
        # return job report
        res = {}
        res['id'] = job.get_id()
        res['log'] = job.get_log()
        res['checksum'] = job.get_checksum()
        return res
    
    @pure
    def vol_import_list(self):
        raise NotImplementedError()
    
    @pure
    def vol_import_cancel(self, jid):
        jid = int(jid)
        raise NotImplementedError()
 No newline at end of file

ccnode/jobs.py

0 → 100644
+449 −0
Original line number Diff line number Diff line
# -*- coding: utf-8 -*-

import os, time, socket
from datetime import datetime
from threading import Thread, RLock
from utils import RWLock
from errors import (JobManagerError, JobError, XferJobError, ReceiveFileJobError,
            SendFileJobError)

#fixme
import traceback, sys

class JobManager(Thread, object):
    '''
    '''
    MAX_JID = 65535
    
    def __init__(self, hypervisor):
        '''
        '''
        super(JobManager, self).__init__()
        self._mutex = RLock()
        self._next_jid = 1
        self._jobs = {}
        self._jobs_pending = []
        self._jobs_running = []
        self._jobs_crashed = []
        self._jobs_canceled = []
        self._jobs_finished = []
    
    def _job_crashed(self, jid):
        '''
        '''
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`' % jid)
            # move job to the crashed queue
            self._jobs_crashed.append(jid)
            # clean any queue that may contain the job
            for queue in [self._jobs_pending, self._jobs_running,
                                    self._jobs_canceled, self._jobs_finished]:
                if jid in queue:
                    queue.remove(jid)
    
    def _job_finished(self, jid):
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`' % jid)
            # move job to the finished queue
            self._jobs_finished.append(jid)
            # remove from running queue
            self._jobs_running.remove(jid)
    
    def append(self, job):
        '''
        '''
        with self._mutex:
            jid = self._next_jid
            self._next_jid += 1
            self._jobs[jid] = job
            self._jobs_pending.append(jid)
        return jid
    
    def run(self):
        '''
        '''
        while True:
            pass
    
    def schedule_immediate(self, jid):
        '''
        '''
        with self._mutex:
            # job should exist
            if jid not in self._jobs:
                raise JobManagerError('unknown job ID `%i`' % jid)
            # job should be pending execution
            if jid not in self._jobs_pending:
                raise JobManagerError('job `%i` not prepared for execution' %
                                                                            jid)
            # execute job
            self._jobs_running.append(jid)
            self._jobs_pending.remove(jid)
            self._jobs[jid].start()
    
    def get_job(self, jid):
        '''
        '''
        with self._mutex:
            if self.is_job(jid):
                return self._jobs[jid]
            else:
                raise JobManagerError('unknown job ID `%i`' % jid)
    
    def is_job(self, jid):
        '''
        '''
        with self._mutex:
            return jid in self._jobs
    
    def is_job_running(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_running
    
    def is_job_finished(self, jid):
        '''
        '''
        with self._mutex:
            if not self.is_job(jid):
                raise JobManagerError('unknown job ID `%i`' % jid)
            return jid in self._jobs_finished


class JobLog():
    '''
    '''
    def __init__(self):
        '''
        '''
        self._items = []
        self._mutex = RLock()
    
    def __str__(self):
        '''
        '''
        res = ""
        for tup in self._items:
            res += "[%s] %s\n" % (datetime.fromtimestamp(tup[0]).strftime(
                                                "%Y-%m-%d %H:%M:%S"), tup[1])
        return res
    
    def append(self, message):
        '''
        '''
        print "JobLog : %s" % message
        with self._mutex:
            self._items.append((time.time(), message))


class BaseJob(Thread, object):
    '''
    '''
    def __init__(self, manager):
        '''
        '''
        super(BaseJob, self).__init__()
        self._manager = manager
        self._log = JobLog()
        # job state
        self._type = self.__class__.__name__
        self._validated = None
        self._ready_run = False
        # store job in manager
        self._jid = self._manager.append(self)
        self._log.append('stored with id `%i` and type `%s`' % (self._jid,
                                                                    self._type))
    
    def _validate(self):
        '''
        To be implemented in derivate class
        '''
        return False
    
    def _prepare(self):
        '''
        To be implemented in derivate class
        '''
        return False
    
    def _job(self):
        '''
        To be implemented in derivate class
        '''
        self._log.append('nothing to do')
        raise JobError('empty job')
    
    def is_ready():
        '''
        '''
        return self._ready_run is True
    
    def get_id(self):
        '''
        '''
        return self._jid
    
    def get_log(self):
        '''
        '''
        return str(self._log)
    
    def start_now(self):
        '''
        '''
        self._manager.schedule_immediate(self._jid)
    
    def run(self):
        '''
        '''
        try:
            if not self._ready_run:
                self.prepare()
            self._log.append("running")
            self._job()
            self._log.append("finished")
            self._manager._job_finished(self._jid)
        except Exception as err:
            self._log.append("crashed: `%r`: `%s`" % (err, err))
            self._manager._job_crashed(self._jid)
    
    def prepare(self):
        '''
        '''
        if not self._ready_run:
            # run validation if not done yet
            if self._validated is None:
                try:
                    self._validated = self._validate()
                except:
                    self._validated = False
                finally:
                    # do not continue if validation fails
                    if not self._validate:
                        self._log.append('validation failed')
                        raise JobError('validation failed')
            # run validation in derivated Job class
            try:
                ready = self._prepare()
            except:
                ready = False
            finally:
                self._ready_run = ready
                # do not continue if preparation fails
                if not ready:
                    self._log.append('preparation failed')
                    raise JobError('preparation failed')
                else:
                    self._log.append('ready for execution')


class BlowJob(BaseJob):
    '''
    '''
    def __init__(self, manager):
        raise Exception('I LOVE MY KITTY')


class XferJob(BaseJob):
    '''
    '''
    TYPES = [
        'SendFileJob',
        'ReceiveFileJob',
    ]
    
    # tcp port range used for transferts
    TCP_PORT_MIN = 42000
    TCP_PORT_MAX = 42999
    
    def __init__(self, manager, path):
        '''
        '''
        self._path = path
        self._csum = None
        super(XferJob, self).__init__(manager)
    
    def _validate(self):
        '''
        '''
        return self._type in self.TYPES
    
    def get_checksum(self):
        # FIXME lol
        return '625f4986bccba2dbd70cdd9ef80af787'


class SendFileJob(XferJob):
    '''
    '''
    def __init__(self, manager, path, remote_host, remote_port):
        '''
        '''
        self._host = remote_host
        self._port = int(remote_port)
        super(SendFileJob, self).__init__(manager, path)
    
    def _validate(self):
        '''
        '''
        return ( super(SendFileJob, self)._validate() 
                    and os.path.isfile(self._path)
                    and os.access(self._path, os.F_OK | os.R_OK )
                    and self._port >= self.TCP_PORT_MIN
                    and self._port <= self.TCP_PORT_MAX
                    and isinstance(self._host, str))
    
    def _prepare(self):
        '''
        '''
        # create socket
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        return bool(self._sock)
    
    def _job(self):
        '''
        '''
        fd = None
        try:
            # try to connect
            self._log.append('connecting to `%s`:`%i`' % (self._host,
                                                                    self._port))
            self._sock.connect((self._host, self._port))
            self._log.append('connected')
            # open file
            self._log.append('opening source file `%s`' % self._path)
            fd = open(self._path, 'rb')
            # read and send file
            self._log.append('sending data')
            while True:
                buf = fd.read(4096)
                if not buf:
                    break
                self._sock.send(buf)
        except:
            traceback.print_exc(file=sys.stdout)
            raise SendFileJobError('FIXME error reporting')
        finally:
            # close socket and files anyway
            try:
                self._sock.close()
            except:
                pass
            if fd:
                try:
                    fd.close()
                except:
                    pass


class ReceiveFileJob(XferJob):
    '''
    '''
    _ports_used = []
    _ports_used_mutex = RWLock()
    
    def __init__(self, manager, path, create=False):
        '''
        '''
        self._create = create
        self._port = None
        super(ReceiveFileJob, self).__init__(manager, path)
    
    def _validate(self):
        '''
        '''
        return ( super(ReceiveFileJob, self)._validate() and
                (
                    (   not self._create and
                        os.path.isfile(self._path) and
                        os.access(self._path, os.F_OK | os.W_OK ))
                    or
                    (   self._create and
                        os.path.isdir(os.path.dirname(self._path)) and
                        os.access(os.path.dirname(self._path), os.F_OK |os.W_OK)
                    )
                ))
    
    def _prepare(self):
        '''
        '''
        # create socket
        self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # bind socket to a port
        for port in range(self.TCP_PORT_MIN, self.TCP_PORT_MAX):
            # find a port to bind to
            try:
                with self._ports_used_mutex.write:
                    if port not in self._ports_used:
                        self._sock.bind(('', port))
                        self._port = port
                        self._ports_used.append(port)
                        break
            except socket.error:
                pass
        if not self._port:
            self._log.append('no port to listen to')
            raise ReceiveFileJobError('no port to listen to')
        return True
    
    def _job(self):
        '''
        '''
        fd = None
        try:            
            self._sock.listen(1)
            # wait for connection
            self._log.append('waiting for connection on port `%i`' % self._port)
            conn, addr = self._sock.accept()
            if conn:
                self._log.append('client connected from `%s`')
                # open file
                self._log.append('opening destination `%s`' % self._path)
                fd = open(self._path, 'wb')
                # write to the file
                self._log.append('receiving data')
                while True:
                    buf = conn.recv(4096)
                    if not buf:
                        break
                    fd.write(buf)
                self._log.append('remote host closed connection')
            else:
                self._log.append('no connection received, aborting reception')
        except:
            traceback.print_exc(file=sys.stdout)
            raise ReceiveFileJobError('FIXME error reporting')
        finally:
            # close socket and files anyway
            try:
                self._sock.close()
            except:
                pass
            if fd:
                try:
                    fd.close()
                except:
                    pass

    def get_port(self):
        '''
        '''
        return int(self._port)


if __name__ == '__main__':
    jm = JobManager(None)
    jm.start()
    
    #r = ReceiveFileJob(jm, '/tmp/receive')
    #r.start_now()
    
    s = SendFileJob(jm, '/etc/hosts', '127.0.0.1', '42000')
    s.start_now()
+122 −54
Original line number Diff line number Diff line
@@ -7,7 +7,7 @@ from logging import error, warning, info, debug
from time import sleep
from common import Hypervisor, VM, Storage, StoragePool, StorageVolume
from utils import RWLock
from exceptions import HypervisorError, VMError, StoragePoolError
from errors import HypervisorError, VMError, StoragePoolError


KVM_LIBVIRT_SESSION = 'qemu:///system'
@@ -173,11 +173,14 @@ class LibvirtHypervisor(Hypervisor):
    def vm_get(self, name):
        '''
        '''
        with self._vm_cache_lock.read:
        if name in self.vm_list():
            try:
                with self._vm_cache_lock.read:
                    return self._vm_cache[name]
            except:
                raise HypervisorError('VM `%s` has vanished' % name)
        else:
                raise Exception()
            raise HypervisorError('host has no VM named `%s`' % name)


#### storage
@@ -198,10 +201,12 @@ class LibvirtStorage(Storage):
        self._pool_cache_running = {}
        self._pool_cache_defined = {}
        self._pool_cache = {}
        self._pool_cache_lock = RWLock()
    
    def _pool_cache_rebuild(self):
        '''
        '''
        with self._pool_cache_lock.write:
            self._pool_cache_running = {}
            self._pool_cache_defined = {}
            self._pool_cache = {}
@@ -210,6 +215,7 @@ class LibvirtStorage(Storage):
                pool = LibvirtStoragePool(self,
                        self._hv_handle._lvcon_handle.storagePoolLookupByName(name))
                self._pool_cache_running[pool.get_name()] = pool
            
            for name in self._hv_handle._lvcon_handle.listDefinedStoragePools():
                pool = LibvirtStoragePool(self,
                        self._hv_handle._lvcon_handle.storagePoolLookupByName(name))
@@ -223,15 +229,20 @@ class LibvirtStorage(Storage):
        '''
        if not self._pool_cache:
            self._pool_cache_rebuild()
        with self._pool_cache_lock.read:
            return self._pool_cache.keys()
    
    def pool_get(self, name):
        '''
        '''
        if name in self.pool_list():
            try:
                with self._pool_cache_lock.read:
                    return self._pool_cache[name]
            except:
                raise StorageError('storage pool `%s` vanished' % name)
        else:
            raise Exception()
            raise StorageError('no storage pool with name `%s`' % name)
    
    def capacity(self):
        '''
@@ -276,10 +287,12 @@ class LibvirtStoragePool(StoragePool):
                                                        , libvirt_pool))
        
        self._vol_cache = {}
        self._vol_cache_lock = RWLock()
    
    def _vol_cache_rebuild(self):
        '''
        '''
        with self._vol_cache_lock.write:
            self._vol_cache = {}
            if self._lvpool_handle.isActive():
                for name in self._lvpool_handle.listVolumes():
@@ -287,6 +300,51 @@ class LibvirtStoragePool(StoragePool):
                            self._lvpool_handle.storageVolLookupByName(name))
                    self._vol_cache[vol.get_name()] = vol
    
    def volume_list(self):
        '''
        '''
        if not self._vol_cache:
            self._vol_cache_rebuild()
        with self._vol_cache_lock.read:
            return self._vol_cache.keys()
    
    def volume_get(self, name):
        '''
        '''
        if name in self.volume_list():
            try:
                with self._vol_cache_lock.read:
                    return self._vol_cache[name]
            except:
                raise StoragePoolError('volume `%s` has vanished from pool `%s`'
                                                    %(name, self.get_name()))
        else:
            raise StoragePoolError('pool `%s` has no volume `%s`' % (
                                                        self.get_name(), name))
    
    def volume_create(self, name, size):
        '''
        '''
        xml = '''
            <volume>
                <name>%(name)s</name>
                <capacity>%(capacity)i</capacity>
            </volume>
        ''' % {
                'name' : name,
                'capacity' : size
            }
        try:
            vol = self._lvpool_handle.createXML(xml, 0)
            if isinstance(vol, libvirt.virStorageVol):
                self._vol_cache_rebuild()
                return vol
            else:
                raise StoragePoolError('volume creation failed for an unknown reason')
        except libvirt.libvirtError as err:
            raise StoragePoolError('volume creation failed : `%r` : `%s`' %
                                                                    (err, err))
    
    def get_name(self):
        '''
        '''
@@ -323,21 +381,6 @@ class LibvirtStoragePool(StoragePool):
        except libvirt.libvirtError as e:
            raise StoragePoolError("can't get pool information (%s)" % e)

    def volume_list(self):
        '''
        '''
        if not self._vol_cache:
            self._vol_cache_rebuild()
        return self._vol_cache.keys()
    
    def volume_get(self, name):
        '''
        '''
        if name in self.volume_list():
            return self._vol_cache[name]
        else:
            raise Exception()


class LibvirtStorageVolume(StorageVolume):
    '''
@@ -357,6 +400,40 @@ class LibvirtStorageVolume(StorageVolume):
            raise TypeError('Expected `%s` given `%s`' % (libvirt.virStorageVol,
                                                                libvirt_vol))
    
    def wipe(self):
        '''
        '''
        try:
            if self._lvvol_handle.wipe(0):
                raise StorageVolumeError('volume wipe failed for an unknown reason')
        except libvirt.libvirtError as err:
            raise StorageVolumeError('volume wipe failed : `%r` : `%s`' % (err,
                                                                        err))
    
    def delete(self):
        '''
        '''
        try:
            if self._lvvol_handle.delete(0):
                raise StorageVolumeError('volume deletion failed for an unknown reason')
            else:
                self._pool_handle._vol_cache_rebuild()
        except libvirt.libvirtError as err:
            raise StorageVolumeError('volume deletion failed : `%r` : `%s`' %
                                                                    (err, err))
    
    def get_pool(self):
        '''
        '''
        pool = None
        try:
            data = self._lvvol_handle.storagePoolLookupByVolume()
            if data:
                pool = data
        except libvirt.libvirtError:
            pass
        return pool
    
    def get_name(self):
        '''
        '''
@@ -399,15 +476,6 @@ class LibvirtStorageVolume(StorageVolume):
            pass
        return path

    def wipe(self):
        '''
        '''
        try:
            self._lvvol_handle.wipe(0)
        except libvirt.libvirtError:
            pass


#### vm

class LibvirtVm(VM):