Commit 52cb794d authored by Anael Beutot's avatar Anael Beutot
Browse files

Implemented DRBD synchronisation job.

Needed for disk synchronisation during live migration.
parent d71256c7
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
@@ -19,3 +19,12 @@ class PoolStorageError(CCNodeError):

class TunnelError(CCNodeError):
    pass


class DRBDAllocationError(CCNodeError):
    """Cannot create DRBD volume."""
    pass


class DRBDError(CCNodeError):
    pass
+380 −1
Original line number Diff line number Diff line
@@ -3,13 +3,19 @@ import os
import errno
import socket
import logging
from os.path import exists as path_exists
from time import sleep
from hashlib import md5
from collections import deque
from StringIO import StringIO
from subprocess import CalledProcessError
from xml.etree import ElementTree as et

import pyev

from cloudcontrol.node.exc import TunnelError
from cloudcontrol.node.exc import TunnelError, DRBDAllocationError, DRBDError
from cloudcontrol.node.jobs import BaseThreadedJob
from cloudcontrol.node.utils import subproc_call


logger = logging.getLogger(__name__)
@@ -621,3 +627,376 @@ class TCPTunnel(object):
            other_watcher.start()

        # logger.debug('Proccessed write event')


class Meta(type):
    """DRBDAllocator metaclass used for singleton implementation."""
    def __init__(cls, name, bases, dict):
        super(Meta, cls).__init__(cls, bases, dict)
        cls._instance = None

    def __call__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super(Meta, cls).__call__(*args, **kwargs)

        return cls._instance


class DRBDAllocator(object):
    """Keeps a list of allocated DRBD devices."""

    __metaclass__ = Meta

    RMMOD = '/sbin/rmmod'
    MODPROBE = '/sbin/modprobe'

    #: maximum number of DRBD devices
    MINOR_MAX = 100

    def __init__(self):
        self.volumes = set()

        self.reload_kernel_module()

    def new_volume(self):
        for i in xrange(self.MINOR_MAX):
            if i not in self.volumes:
                self.volumes.add(i)
                break
        else:
            raise DRBDAllocationError('Cannot allocate DRBD volume')
        return i

    def remove_volume(self, id_):
        self.volumes.remove(id_)

    def reload_kernel_module(self):
        # FIXME find an other way to set parameters to drbd module
        # try to remove kernel module
        try:
            subproc_call([self.RMMOD, 'drbd'])
        except CalledProcessError:
            # this is not an error if drbd module wasn't loaded
            if 'drbd' in open('/proc/modules').read():
                logger.error('Cannot remove drbd kernel module')
                raise
        # load kernel module with proper parameters
        try:
            # we use greater minor_count than the default which seems to small.
            # we set usermode helper to bin true because by default, the module
            # is calling some drbd helpers that returns non 0 value and make the
            # synchronisation halt.
            subproc_call([self.MODPROBE, 'drbd',
                          'minor_count=%d' % self.MINOR_MAX,
                          'usermode_helper=/bin/true'])
        except CalledProcessError:
            logger.error('Cannot load drbd kernel module')


class DRBD(object):
    """Manage DRBD job."""

    DMSETUP = '/sbin/dmsetup'
    DRBDSETUP = '/sbin/drbdsetup'
    DRBDMETA = '/sbin/drbdmeta'
    DRBD_TIMEOUT = '30'
    DRBD_RATE = '50000'

    def __init__(self, job_manager, ev_loop, storage_index, lvm_pool, lvm_volume):
        """
        :param job_manager: :class:`JobManager` instance
        :param ev_loop: ev loop instance
        :param storage_index: :class:`StorageIndex` instance
        :param lvm_pool: :class:`Storage` instance
        :param lvm_volume: :class:`Volume` instance
        """
        #: job id
        self.id = job_manager.job_id.next()

        self.allocator = DRBDAllocator()

        # define a set of states
        self.state = 'INIT'

        self.storage = storage_index
        self.pool = lvm_pool
        self.volume = lvm_volume
        self.meta_volume = None

        #: DRBD id as returned by DRBDAllocator
        self.drbd_id = None
        self.drbd_port = None
        #: DRBD device full path
        self.drbd_path = None
        self.drbd_table = None
        self.drbd_status = dict(conn=None)

        #: name of DM copy of LV
        self.dm_table = None
        self.dm_copy = '%s-%s.copy' % (
            'vg', self.volume.name.replace('-', '--'))

        # each step is executed in the RPC call thread, thus exception are
        # propagated directly to the cc-server

    def stop(self):
        pass

    def cleanup(self):
        # reset DM to initial state
        try:
            table = subproc_call([self.DMSETUP, 'table', self.volume.path])
        except CalledProcessError:
            logger.error('Error while getting table of VM LV')
        else:
            if table != self.dm_table:
                try:
                    subproc_call([self.DMSETUP, 'load', self.volume.path],
                                 self.dm_table)
                    subproc_call([self.DMSETUP, 'suspend', self.volume.path])
                    subproc_call([self.DMSETUP, 'resume', self.volume.path])
                except CalledProcessError:
                    logger.error('Error while loading back VM LV table')
                    # FIXME this is kind of critical, we should tell the user to
                    # call a Gaetant

        # stop drbd volume
        # if path_exists(self.drbd_path):
        if self.drbd_id is not None:
            try:
                subproc_call([self.DRBDSETUP, self.drbd_path, 'disconnect'])
            except CalledProcessError:
                logger.error('Error while disconnecting DRBD device %s',
                             self.drbd_path)
            try:
                subproc_call([self.DRBDSETUP, self.drbd_path, 'secondary'])
            except CalledProcessError:
                logger.error('Error while switching DRBD device to secondary'
                             ' (%s)', self.drbd_path)
            try:
                subproc_call([self.DRBDSETUP, self.drbd_path, 'detach'])
            except CalledProcessError:
                logger.error('Error while detaching DRBD device %s',
                             self.drbd_path)
            try:
                subproc_call([self.DRBDSETUP, self.drbd_path, 'down'])
            except CalledProcessError:
                logger.error('Error while bringing down DRBD device %s',
                             self.drbd_path)

            self.allocator.remove_volume(self.drbd_id)
            self.drbd_id = None
            self.drbd_port = None
            self.drbd_path = None
            self.drbd_table = None
            self.drbd_status = dict(conn=None)

        # remove drbd meta volume
        if self.meta_volume is not None:
            try:
                self.storage.delete_volume(
                    self.pool.name,
                    self.volume.name + '.drbdmeta',
                )
            except:  # FIXME
                logger.exception('Error while removing DRBD metadata LV')
            self.meta_volume = None

        # remove copy DM
        if path_exists('/dev/mapper/' + self.dm_copy):
            try:
                subproc_call([self.DMSETUP, 'remove', self.dm_copy])
            except CalledProcessError:
                logger.error('Error while removing DM copy')
            self.dm_table = None

        # set mapper

    def setup(self):
        logger.debug('Create DRBD meta device')
        self.meta_volume = self.storage.create_volume(
            self.pool.name,
            self.volume.name + '.drbdmeta',
            # see
            # http://www.drbd.org/users-guide/ch-internals.html#s-meta-data-size
            # for external metadata size calculation
            max(self.volume.capacity / 32768 + 4 * 2 ** 20, 128 * 2 ** 20),
        )

        logger.debug('Create a copy DM of the LV')
        # get LV table
        try:
            self.dm_table = subproc_call([self.DMSETUP, 'table',
                                          '--showkeys', self.volume.path])
        except CalledProcessError:
            logger.error('Cannot get DM table of VM LV')
            raise DRBDError('Cannot get DM table of VM LV')
        # create new DM
        logger.debug('Got table of LV "%s"', self.dm_table)
        try:
            subproc_call([self.DMSETUP, 'create', self.dm_copy], self.dm_table)
        except CalledProcessError:
            logger.error('Cannot create copy DM of LV')
            raise
        logger.debug('Setup DRBD device')
        # get drbd path
        self.drbd_id = self.allocator.new_volume()
        self.drbd_port = 7788 + self.drbd_id  # FIXME magic number
        self.drbd_path = '/dev/drbd%d' % self.drbd_id
        # wipe drbd metadata (just in case)
        try:
            subproc_call([self.DRBDMETA, '--force', self.drbd_path,
                          'v08', self.meta_volume.path, '0', 'wipe-md'])
        except CalledProcessError:
            pass
        try:
            subproc_call([self.DRBDMETA, '--force', self.drbd_path,
                          'v08', self.meta_volume.path, '0', 'create-md'])
        except CalledProcessError:
            logger.error('Cannot create DRBD external metadata on device')
            raise DRBDError('Cannot create DRBD metadata')
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'disk',
                          '/dev/mapper/%s' % self.dm_copy,
                          self.meta_volume.path,
                         '0', '--create-device'])
        except CalledProcessError:
            logger.error('Error while creating DRBD device')
            raise DRBDError('Cannot create DRBD device')
        self.drbd_table = '0 %d linear %s 0' % (
            self.volume.capacity / 512,  # FIXME comment
            self.drbd_path,
        )

        logger.debug('Setup DRBD done')
        self.state = 'SETUP'

    def connect(self, remote_addr, remote_port):
        logger.debug('Setup networking for DRBD')
        # connect to remote node
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'net',
                          '0.0.0.0:%d' % self.drbd_port,
                          '%s:%d' % (remote_addr, remote_port),
                          'C', '-m', '-S', '10000000'])
        except CalledProcessError:
            logger.error('Error while setting up network facility for DRBD')
            raise DRBDError('Cannot set up network for DRBD')

        sleep(.5)  # FIXME
        logger.debug('Set up bandwidth limit')
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'syncer', '-r',
                          self.DRBD_RATE])
        except CalledProcessError:
            logger.error('Cannot set bandwidth rate limit on DRBD')
            raise DRBDError('Error while setting bandwidth limit')

        self.state = 'CONNECTED'

    def wait_connection(self):
        self.state = 'WAIT PEER CONNECT'
        sleep(.5)  # FIXME
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'wait-connect',
                          '-t', self.DRBD_TIMEOUT,
                          '-d', self.DRBD_TIMEOUT,
                          '-o', self.DRBD_TIMEOUT])
        except CalledProcessError:
            logger.error('Error while waiting for remote DRBD to connect,'
                         ' timeout = %s', self.DRBD_TIMEOUT)
            raise DRBDError('Error while waiting DRBD connect')

        sleep(.5)  # FIXME
        self.state = 'CONNECTED'

    def switch_primary(self):
        logger.debug('Switch DRBD %s in primary mode', self.drbd_path)
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'primary', '-o'])
        except CalledProcessError:
            logger.error('Error while switching to primary role (%s)',
                         self.drbd_path)
            raise DRBDError('Cannot switch to primary role')

        self.state = 'CONNECTED PRIMARY'

    def switch_secondary(self):
        logger.debug('Switch DRBD %s in secondary mode', self.drbd_path)
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'secondary'])
        except CalledProcessError:
            logger.error('Error while switching to secondary role (%s)',
                         self.drbd_path)
            raise DRBDError('Cannot switch to secondary role')

        self.state = 'CONNECTED SECONDARY'

    def wait_sync(self):
        self.state = 'WAIT SYNC'
        sleep(.5)  # FIXME

        logger.debug('Wait sync %s', self.drbd_path)
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'wait-sync'])
        except CalledProcessError:
            logger.error('Error while waiting for synchronisation of DRBD'
                         ' device (%s)', self.drbd_path)
            raise DRBDError('Wait sync error')

        self.state = 'SYNC DONE'

    def disconnect(self):
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'disconnect'])
        except CalledProcessError:
            logger.error('Error while disconnecting DRBD device %s',
                         self.drbd_path)
            raise DRBDError('Cannot disconnect device')

        self.state = 'DISCONNECTED'

    def status(self):
        """DRBD status."""
        try:
            out = subproc_call([self.DRBDSETUP, self.drbd_path, 'status'])
        except CalledProcessError:
            logger.error('Error while getting DRBD status (%s)', self.drbd_path)
            raise DRBDError('Status: error while executing DRBD status')
        try:
            status = et.ElementTree().parse(StringIO(out))
        except:
            logger.error('Error while parsing status command output for DRBD'
                         ' device %s', self.drbd_path)
            raise DRBDError('Status: cannot parse output')

        self.drbd_status = dict(
            conn=status.get('cs'),
            disk=status.get('ds1'),
            rdisk=status.get('ds2'),
            role=status.get('ro1'),
            rrole=status.get('ro2'),
            percent=status.get('resynced_percent', None),
        )
        return self.drbd_status

    def takeover(self):
        """Set up DRBD device as VM backing device."""
        logger.debug('DRBD takeover %s', self.drbd_path)
        assert self.drbd_table is not None
        try:
            subproc_call([self.DMSETUP, 'load', self.volume.path],
                         self.drbd_table)
        except CalledProcessError:
            logger.error('Error while loading new table for VM LV')
            raise DRBDError('Takeover: cannot load DM table')
        try:
            subproc_call([self.DMSETUP, 'suspend', self.volume.path])
        except CalledProcessError:
            logger.error('Error while suspending VM LV')
            raise DRBDError('Takeover: cannot suspend DM')
        try:
            subproc_call([self.DMSETUP, 'resume', self.volume.path])
        except CalledProcessError:
            logger.error('Error while resuming VM LV')
            raise DRBDError('Takeover: cannot resume DM')