Commit 5f233cdd authored by Antoine Millet's avatar Antoine Millet
Browse files

Implemented DiskCopy job

parent 0d9944ca
Loading
Loading
Loading
Loading
+18 −1
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@ from cloudcontrol.server.handlers import listed, Reporter
from cloudcontrol.server.clients import Client, RegisteredCCHandler
from cloudcontrol.server.jobs import (ColdMigrationJob, HotMigrationJob,
                                      CloneJob, KillClientJob, AllocationJob,
                                      MigrationJob)
                                      MigrationJob, DiskCopyJob)
from cloudcontrol.server.db import SObject

from cloudcontrol.common.tql.db.tag import StaticTag
@@ -1134,6 +1134,23 @@ class CliHandler(RegisteredCCHandler):
                errs.success(vm['id'], 'migration launched, id:%s' % job.id)
        return errs.get_dict()

    @listed
    def diskcopy(self, source_id, source_pool, source_vol, dest_id, dest_pool, dest_vol):
        """ Launch a migration.
        """

        self.client.check('diskcopy')
        settings = {'server': self.server,
                    'client': self.client,
                    'source_id': source_id,
                    'source_pool': source_pool,
                    'source_vol': source_vol,
                    'dest_id': dest_id,
                    'dest_pool': dest_pool,
                    'dest_vol': dest_vol}
        job = self.client.spawn_job(DiskCopyJob, settings=settings)
        return job.id

    @listed
    def clone(self, tql_vm, tql_dest, name):
        """ Create and launch a clone job.
+2 −1
Original line number Diff line number Diff line
@@ -24,7 +24,8 @@ from cloudcontrol.server.jobs.killoldcli import KillOldCliJob
from cloudcontrol.server.jobs.killclient import KillClientJob
from cloudcontrol.server.jobs.allocation import AllocationJob
from cloudcontrol.server.jobs.migration import MigrationJob
from cloudcontrol.server.jobs.diskcopy import DiskCopyJob


__all__ = ('ColdMigrationJob', 'HotMigrationJob', 'CloneJob', 'KillOldCliJob',
           'KillClientJob', 'AllocationJob', 'MigrationJob')
           'KillClientJob', 'AllocationJob', 'MigrationJob', 'DiskCopyJob')
+56 −0
Original line number Diff line number Diff line
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl.  If not, see <http://www.gnu.org/licenses/>.

from sjrpc.core import AsyncWatcher

from cloudcontrol.common.jobs import Job, JobCancelError


class DiskCopyJob(Job):

    """ Copy a disk between two nodes.
    """

    def job(self, server, client, source_id, source_pool, source_vol, dest_id, dest_pool, dest_vol):
        self._func_cancel_xfer = None  # Callback to a function used to cancel a disk transfert

        env = (source_id, source_pool, source_vol, dest_id, dest_pool, dest_vol)
        self.title = 'Started disk copy between %s:%s/%s -> %s:%s/%s' % env

        source = server.get_client(source_id)
        dest = server.get_client(dest_id)

        xferprop = dest.proxy.vol_import(dest_pool, dest_vol)

        # Wait for the end of transfert:
        watcher = AsyncWatcher()
        watcher.register(source.conn, 'vol_export', source_pool, source_vol, dest.ip, xferprop['port'])
        watcher.register(dest.conn, 'vol_import_wait', xferprop['id'])

        msgs = watcher.wait()

        # Compare checksum of two answers:
        checksums = []
        assert len(msgs) == 2
        for msg in msgs:
            if msg.get('error') is not None:
                msg = 'error while copy: %s' % msg['error']['message']
                raise JobCancelError(msg)
            else:
                checksums.append(msg['return'].get('checksum'))
                self.checkpoint()

        if checksums[0] != checksums[1]:
            raise JobCancelError('checksum mismatches')