Skip to content
jobs.py 33.3 KiB
Newer Older
import io
import os
Anael Beutot's avatar
Anael Beutot committed
import errno
import socket
import logging
from os.path import exists as path_exists
from time import sleep
from hashlib import md5
from StringIO import StringIO
from subprocess import CalledProcessError
from xml.etree import ElementTree as et
Anael Beutot's avatar
Anael Beutot committed
import pyev

from cloudcontrol.node.exc import TunnelError, DRBDAllocationError, DRBDError
from cloudcontrol.node.jobs import BaseIOJob, ForkedJob
from cloudcontrol.node.utils import SocketBuffer, subproc_call, Singleton


logger = logging.getLogger(__name__)


class ImportVolume(BaseIOJob):
    """Import volume job.

    """
    BUFFER_LEN = 8192 * 16
    HASH = md5

    def __init__(self, job_manager, volume):
        BaseIOJob.__init__(self, job_manager)

        self.checksum = None
        self.volume = volume
        # where the other node will connect
        self.port = None

        # fds
        self.sock = None
        self.client_sock = None
        self.disk = None

    @property
    def open_fds(self):
        return [fo.fileno() for fo in (self.sock, self.client_sock, self.disk)
                if fo is not None]

    def clean_fds(self):
        if self.sock is not None:
            self.sock.close()
            self.sock = None
        if self.client_sock is not None:
            self.client_sock.close()
            self.client_sock = None
        if self.disk is not None:
            self.disk.close()
            self.disk = None

    def pre_job(self):
        """
        :returns: port number the socket is listening on
        """
        # create socket
        try:
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except socket.error:
            logger.exception('Error while creating socket for volume export')
            self.clean_fds()
            raise
        try:
            self.sock.settimeout(10.)
        except socket.error:
            logger.exception('Cannot set timeout on socket for volume export')
            self.clean_fds()
            raise
        try:
            self.sock.bind(('0.0.0.0', 0))
        except socket.error:
            logger.exception('Error while binding socket for volume export')
            self.clean_fds()
            raise
        try:
            self.sock.listen(1)
        except socket.error:
            logger.exception('Error while listening on socket')
            self.clean_fds()
            raise

        # open local disk
        try:
            self.disk = io.open(self.volume.path, 'wb', 0)
        except IOError:
            logger.exception('Error while trying to open local disk')
            self.clean_fds()
            raise

        self.port = self.sock.getsockname()[1]
        return self.port

    def run_job(self):
        try:
            self.client_sock, _ = self.sock.accept()
        except socket.timeout:
            sys.stderr.write('Error for importing job: client did not connect\n')
            self.clean_fds()
            raise
        except socket.error:
            sys.stderr.write('Error while accepting socket\n')
            self.clean_fds()
            raise

        # close the listening socket
        self.sock.close()
        self.sock = None

        checksum = self.HASH()

        # start downloading disk image
        while self.running:
            try:
                received = []  # keep a list of received buffers in order to do
                               # only one concatenation in the end
                total_received = 0
                while True:
                    recv_buf = self.client_sock.recv(self.BUFFER_LEN - total_received)
                    # sys.stderr.write('Received %d\n' % len(recv_buf))
                    if not recv_buf:  # EOF
                        # in case received in not empty, we will come back here
                        # once again and it returns EOF one more time
                        break
                    total_received += len(recv_buf)
                    received.append(recv_buf)
                    if total_received == self.BUFFER_LEN:
                        break
            except socket.error:
                sys.stderr.write('Error while receiving disk image\n')
                self.clean_fds()
                raise
            buffer_ = b''.join(received)
            if not buffer_:
                sys.stderr.write('Received EOF import job\n')
                break
            checksum.update(buffer_)
            try:
                written = 0
                # FIXME never write small chuncks
                # in which case does disk.write would not write all the buffer ?
                to_send = buffer_
                while True:
                    written += self.disk.write(to_send)
                    # sys.stderr.write('Written %s to disk\n' % written)
                    to_send = buffer(buffer_, written)
                    if not to_send:
                        break
            except IOError:
                sys.stderr.write('Error while writing image to disk\n')
                self.clean_fds()
                raise

        # here we could not have received the full disk but we don't consider
        # this as an error in the import part
        self.checksum = checksum.hexdigest()
        # clean the fds
        self.clean_fds()
        sys.stderr.write('Volume import done\n')
class ExportVolume(BaseIOJob):
    """Export volume job.

    """
    BUFFER_LEN = 8192 * 16
    HASH = md5

    def __init__(self, job_manager, volume, raddr, rport):
        """
        :param volume: :class:`Volume` instance
        :param raddr: remote IP address
        :param rport: remote TCP port
        """
        BaseIOJob.__init__(self, job_manager)

        # where to connect to send the volume
        self.raddr = raddr
        self.rport = rport

        self.volume = volume
        self.checksum = None

        # fds
        self.sock = None
        self.disk = None

    @property
    def open_fds(self):
        return [fo.fileno() for fo in (self.sock, self.disk)
                if fo is not None]

    def clean_fds(self):
        if self.sock is not None:
            self.sock.close()
            self.sock = None
        if self.disk is not None:
            self.disk.close()
            self.disk = None

    def pre_job(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        # connect to the remote host
        try:
            self.sock.connect((self.raddr, self.rport))
        except socket.error as exc:
            logger.exception('Error while trying to connect to remote host %s',
                            os.strerror(exc.errno))
            self.clean_fds()
            raise

        # open local volume
        try:
            self.disk = io.open(self.volume.path, 'rb', 0)
        except IOError:
            logger.exception('Error while opening disk for export job')
            self.clean_fds()
            raise

    def run_job(self):
        checksum = self.HASH()
        # sent_count = 0

        # do copy
        while self.running:
            try:
                read = self.disk.read(self.BUFFER_LEN)
            except IOError:
                sys.stderr.write('Error while reading from disk\n')
                self.clean_fds()
                break
            # read length may be less than BUFFER_LEN but we don't care as it
            # will go over TCP
            if not read:  # end of file
                # sys.stderr.write('EOF, exported %d bytes\n' % sent_count)
                break
            # sent_count += len(read)
            # sys.stderr.write('Read %d from disk\n' % len(read))
            checksum.update(read)
            try:
                self.sock.sendall(read)
            except socket.error:
                sys.stderr.write('Error while sending through socket\n')
                self.clean_fds()
                break


        self.checksum = checksum.hexdigest()
        self.clean_fds()
class TCPTunnel(ForkedJob):
Anael Beutot's avatar
Anael Beutot committed
    """Handles a TCP tunnel."""

    BUFFER_LEN = 8096

    def __init__(self, job_manager, connect=None, listen='0.0.0.0'):
Anael Beutot's avatar
Anael Beutot committed
        """
        :param job_manager: :class:`JobManager` instance
        :param connect: where to connect one end of the tunnel (a tuple, as
            given to socket.connect)
        :param listen: which interface to listen to for the other end of the
            tunnel
        """
        ForkedJob.__init__(self, job_manager)
        # create a new libev loop that will run inside our child
        self.ev_loop = pyev.Loop()
Anael Beutot's avatar
Anael Beutot committed
        self.connect = connect
        self.listen = listen
        #: port is assigned by the kernel
        self.port = None

        # keep state information for both ends
        self.listen_state = 'CLOSED'
        self.connect_state = 'CLOSED'
        #: very basic error report
        self.error = None

        # these are the watchers
        self.source_reader = None
        self.source_writer = None
        self.dest_reader = None
        self.dest_writer = None

        #: source_sock is the socket that will listen for remote|local to happen
        self.source_sock = None
        #: dest sock connects to an other setuped tunnel
        self.dest_sock = None

        # input buffer is used for data that is coming from source_sock and goes
        # to dest_sock
        self.input_buffer = SocketBuffer()
        # output_buffer is usde for data that is coming from dest_sock and goes
        # to source_sock
        self.output_buffer = SocketBuffer()

    @property
    def open_fds(self):
        return [fo.fileno() for fo in (self.source_sock, self.dest_sock)
                if fo is not None]

    def after_fork(self):
        self.ev_loop.reset()

Anael Beutot's avatar
Anael Beutot committed
    def close(self):
        # as this could be called from child, don't use logger (this is for
        # debug anyway)
        sys.stderr.write('Closing job %d' % self.id)
Anael Beutot's avatar
Anael Beutot committed
        # stop watchers
        if self.source_reader is not None:
            self.source_reader.stop()
            self.source_reader = None
        if self.source_writer is not None:
            self.source_writer.stop()
            self.source_writer = None
        if self.dest_reader is not None:
            self.dest_reader.stop()
            self.dest_reader = None
        if self.dest_writer is not None:
            self.dest_writer.stop()
            self.dest_writer = None
        # close sockets
        if self.source_sock is not None:
            self.source_sock.close()
            self.source_sock = None
        if self.dest_sock is not None:
            self.dest_sock.close()
            self.dest_sock = None
        # clear buffers (this memory won't be needed anyway)
        self.input_buffer = None
        self.output_buffer = None
        # reset states
        self.listen_state = 'CLOSED'
        self.connect_state = 'CLOSED'

    def stop(self):
        self.close()

    def setup_listen(self, interface=None):
        """Setup source socket.

        :param interface: specify which interface to listen onto
        """
        if interface is not None:
            self.listening = interface
        logger.debug('Setup listening %s %d', self.listen, self.id)
        try:
            self.source_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except socket.error:
            logger.exception('Error while creating source_sock for tunnel job'
                             ' %d', self.id)
            self.close()
            raise
        try:
            self.source_sock.setblocking(0)
        except socket.error:
            logger.exception('Cannot set source_sock in blocking mode for'
                             ' tunnel job %d', self.id)
            self.close()
            raise
        try:
            self.source_sock.bind((self.listen, 0))
        except socket.error:
            logger.exception('Error while binding source_sock for tunnel job'
                             ' %d', self.id)
            self.close()
            raise
        self.port = self.source_sock.getsockname()[1]
        try:
            self.source_sock.listen(1)
        except socket.error:
            logger.exception('Error while listening on source_sock for tunnel'
                             ' job %d', self.id)
            self.close()
            raise

        self.listen_state = 'LISTENING'
        # ready to accept
        self.source_reader = self.ev_loop.io(self.source_sock,
                                             pyev.EV_READ, self.accept_cb)
        self.source_reader.start()

    def setup_connect(self, endpoint=None):
        """Start connection to remote end.

        :param endpoint: specify where to connect (same as connect argument in
        constructor), can be specified in both places
        """
        if endpoint is not None:
            self.connect = endpoint
        if self.connect is None:
            raise TunnelError('Remote endpoint to connect to was not specified')
        logger.debug('Connect to endpoint %s %d', self.connect, self.id)
        try:
            if isinstance(self.connect, tuple):
                addr_family = socket.AF_INET
            else:
                addr_family = socket.AF_UNIX
            self.dest_sock = socket.socket(addr_family, socket.SOCK_STREAM)
        except socket.error:
            logger.exception('Error while creating dest_sock for tunnel job'
                             ' %d', self.id)
            self.close()
            raise
        try:
            self.dest_sock.setblocking(0)
        except socket.error:
            logger.exception('Error while sitting non block mode on dest_sock'
                             ' for tunnel job %d', self.id)
            raise

        error = self.dest_sock.connect_ex(self.connect)
        if error and error != errno.EINPROGRESS:
            raise socket.error('Error during connect for tunnel job, %s' %
                               os.strerror(error))
        self.dest_writer = self.ev_loop.io(self.dest_sock,
                                           pyev.EV_WRITE, self.connect_cb)
        self.dest_writer.start()

        self.connect_state = 'CONNECTING'

    def run_job(self):
        sys.stderr.write('Will start ev loop in child\n')
        self.ev_loop.start()

Anael Beutot's avatar
Anael Beutot committed
    def accept_cb(self, watcher, revents):
        try:
            new_source, remote = self.source_sock.accept()
        except socket.error as exc:
            if exc.errno == errno.EAGAIN or errno.EWOULDBLOCK:
                # we will come back
                return

            # else
            self.fatal_exc('Error while accepting new connection on'
                           ' sock_source for tunnel job')
Anael Beutot's avatar
Anael Beutot committed

        # everything went fine
        self.source_sock.close()  # we won't accept connections
        self.source_sock = new_source
        # set new socket non blocking
        try:
            self.source_sock.setblocking(0)
        except socket.error as exc:
            self.fatal_exc('Cannot set source socket in non blocking for'
                           ' tunnel job: %s', os.strerror(exc.errno))
Anael Beutot's avatar
Anael Beutot committed
        self.source_reader.stop()
        self.source_reader = self.ev_loop.io(new_source, pyev.EV_READ,
                                             self.read_cb)
        self.source_writer = self.ev_loop.io(new_source, pyev.EV_WRITE,
                                             self.write_cb)
        sys.stderr.write('Successfully accepted remote client %s for tunnel'
                         ' job %d\n' % (remote, self.id))
Anael Beutot's avatar
Anael Beutot committed
        self.listen_state = 'CONNECTED'
        if self.connect_state == 'CONNECTED':
            # start the watchers only if both ends are ready to accept data
            self.source_reader.start()
            self.dest_reader.start()

    def connect_cb(self, watcher, revents):
        # check that connection was a success
        error = self.dest_sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if error:
            self.fatal('Error during connect for tunnel job, %s\n' %
                       os.strerror(error))
Anael Beutot's avatar
Anael Beutot committed

        # else we setup watcher with proper events
        self.dest_reader = self.ev_loop.io(self.dest_sock, pyev.EV_READ,
                                           self.read_cb)
        self.dest_writer.stop()
        self.dest_writer = self.ev_loop.io(self.dest_sock, pyev.EV_WRITE,
                                           self.write_cb)
        sys.stderr.write('Successfully connected to remote endpoint %s %d\n' %
                         (self.connect, self.id))
Anael Beutot's avatar
Anael Beutot committed
        self.connect_state = 'CONNECTED'
        if self.listen_state == 'CONNECTED':
            # start the watchers only if both ends are ready to accept data
            self.source_reader.start()
            self.dest_reader.start()

    def read_cb(self, watcher, revents):
        if watcher == self.dest_reader:
            # sys.stderr.write('Read event on dest %s\n' % self.id)
Anael Beutot's avatar
Anael Beutot committed
            sock = self.dest_sock
            buffer_ = self.output_buffer
            other_watcher = self.source_writer
        else:
            # sys.stderr.write('Read event on source %s\n' % self.id)
Anael Beutot's avatar
Anael Beutot committed
            sock = self.source_sock
            buffer_ = self.input_buffer
            other_watcher = self.dest_writer

        # sys.stderr.write('Will loop into event\n')
Anael Beutot's avatar
Anael Beutot committed
        while True:
            try:
                incoming = sock.recv(self.BUFFER_LEN)
            except socket.error as exc:
                if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
                    # sys.stderr.write('EAGAIN\n')
Anael Beutot's avatar
Anael Beutot committed
                    break
                # else: unexpected error
                self.fatal_exc('Unexpected error while reading on socket'
                               ' for tunnel job, %s\n', os.strerror(exc.errno))
Anael Beutot's avatar
Anael Beutot committed

            if not incoming:
                # EOF
                # sys.stderr.write('EOF\n')
Anael Beutot's avatar
Anael Beutot committed
                self.close()
                return
            # sys.stderr.write('Read %d bytes\n' % len(incoming))
Anael Beutot's avatar
Anael Beutot committed
            buffer_.append(incoming)
            if buffer_.is_full():
                # sys.stderr.write('Buffer is full\n')
Anael Beutot's avatar
Anael Beutot committed
                watcher.stop()
                break

        # we did read some bytes that we could write to the other end
        if not buffer_.is_empty():
            # sys.stderr.write('Starting other watcher\n')
Anael Beutot's avatar
Anael Beutot committed
            other_watcher.start()

        # sys.stderr.write('Read event done\n')
Anael Beutot's avatar
Anael Beutot committed

    def write_cb(self, watcher, revents):
        if watcher == self.dest_writer:
            # sys.stderr.write('Write event on dest %s', self.id)
Anael Beutot's avatar
Anael Beutot committed
            sock = self.dest_sock
            buffer_ = self.input_buffer
            other_watcher = self.source_reader
        else:
            # sys.stderr.write('Write event on source %s\n' % self.id)
Anael Beutot's avatar
Anael Beutot committed
            sock = self.source_sock
            buffer_ = self.output_buffer
            other_watcher = self.dest_reader

        while True:
            try:
                to_send = buffer_.popleft()
            except IndexError:
                # buffer is empty, we should stop write event
                # sys.stderr.write('Buffer is empty\n')
Anael Beutot's avatar
Anael Beutot committed
                watcher.stop()
                break
            send_buffer = to_send
            total_sent = 0
            while True:
                try:
                    written = sock.send(send_buffer)
                except socket.error as exc:
                    if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
                        buffer_.appendleft(to_send[total_sent:])
                        # sys.stderr.write('EAGAIN\n')
Anael Beutot's avatar
Anael Beutot committed
                        break
                    # else: unexpected error
                    self.fatal_exc('Unexpected error while writting on socket'
                                   ' for tunnel job, %s',
                                   os.strerror(exc.errno))

                # sys.stderr.write('Written %d bytes\n' % written)
Anael Beutot's avatar
Anael Beutot committed
                if written == len(send_buffer):
                    break

                # else
                total_sent += written
                send_buffer = buffer(to_send, total_sent)

        # if we can read on the other end
        if not buffer_.is_full():
            # sys.stderr.write('Starting other watcher\n')
Anael Beutot's avatar
Anael Beutot committed
            other_watcher.start()

        # sys.stderr.write('Proccessed write event\n')


class DRBDAllocator(object):
    """Keeps a list of allocated DRBD devices."""

    __metaclass__ = Singleton

    RMMOD = '/sbin/rmmod'
    MODPROBE = '/sbin/modprobe'

    #: maximum number of DRBD devices
    MINOR_MAX = 100

    def __init__(self):
        self.volumes = set()

        self.reload_kernel_module()

    def new_volume(self):
        for i in xrange(self.MINOR_MAX):
            if i not in self.volumes:
                self.volumes.add(i)
                break
        else:
            raise DRBDAllocationError('Cannot allocate DRBD volume')
        return i

    def remove_volume(self, id_):
        self.volumes.remove(id_)

    def reload_kernel_module(self):
        # FIXME find an other way to set parameters to drbd module
        # try to remove kernel module
        try:
            subproc_call([self.RMMOD, 'drbd'])
        except CalledProcessError:
            # this is not an error if drbd module wasn't loaded
            if 'drbd' in open('/proc/modules').read():
                logger.error('Cannot remove drbd kernel module')
                raise
        # load kernel module with proper parameters
        try:
            # we use greater minor_count than the default which seems to small.
            # we set usermode helper to bin true because by default, the module
            # is calling some drbd helpers that returns non 0 value and make the
            # synchronisation halt.
            subproc_call([self.MODPROBE, 'drbd',
                          'minor_count=%d' % self.MINOR_MAX,
                          'usermode_helper=/bin/true'])
        except CalledProcessError:
            logger.error('Cannot load drbd kernel module')


class DRBD(object):
    """Manage DRBD job."""

    DMSETUP = '/sbin/dmsetup'
    DRBDSETUP = '/sbin/drbdsetup'
    DRBDMETA = '/sbin/drbdmeta'
    DRBD_TIMEOUT = '30'
    DRBD_RATE = '50000'

    def __init__(self, job_manager, ev_loop, storage_index, lvm_pool, lvm_volume):
        """
        :param job_manager: :class:`JobManager` instance
        :param ev_loop: ev loop instance
        :param storage_index: :class:`StorageIndex` instance
        :param lvm_pool: :class:`Storage` instance
        :param lvm_volume: :class:`Volume` instance
        """
        #: job id
        self.id = job_manager.job_id.next()

        self.allocator = DRBDAllocator()

        # define a set of states
        self.state = 'INIT'

        self.storage = storage_index
        self.pool = lvm_pool
        self.volume = lvm_volume
        self.meta_volume = None

        #: DRBD id as returned by DRBDAllocator
        self.drbd_id = None
        self.drbd_port = None
        #: DRBD device full path
        self.drbd_path = None
        self.drbd_table = None
        self.drbd_status = dict(conn=None)

        #: name of DM copy of LV
        self.dm_table = None
        self.dm_copy = '%s-%s.copy' % (
            'vg', self.volume.name.replace('-', '--'))

        # each step is executed in the RPC call thread, thus exception are
        # propagated directly to the cc-server

    def stop(self):
        pass

    def cleanup(self):
        # reset DM to initial state
        try:
            table = subproc_call([self.DMSETUP, 'table', self.volume.path])
        except CalledProcessError:
            logger.error('Error while getting table of VM LV')
        else:
            if table != self.dm_table:
                try:
                    subproc_call([self.DMSETUP, 'load', self.volume.path],
                                 self.dm_table)
                    subproc_call([self.DMSETUP, 'suspend', self.volume.path])
                    subproc_call([self.DMSETUP, 'resume', self.volume.path])
                except CalledProcessError:
                    logger.error('Error while loading back VM LV table')
                    # FIXME this is kind of critical, we should tell the user to
                    # call a Gaetant

        # stop drbd volume
        # if path_exists(self.drbd_path):
        if self.drbd_id is not None:
            try:
                subproc_call([self.DRBDSETUP, self.drbd_path, 'disconnect'])
            except CalledProcessError:
                logger.error('Error while disconnecting DRBD device %s',
                             self.drbd_path)
            try:
                subproc_call([self.DRBDSETUP, self.drbd_path, 'secondary'])
            except CalledProcessError:
                logger.error('Error while switching DRBD device to secondary'
                             ' (%s)', self.drbd_path)
            try:
                subproc_call([self.DRBDSETUP, self.drbd_path, 'detach'])
            except CalledProcessError:
                logger.error('Error while detaching DRBD device %s',
                             self.drbd_path)
            try:
                subproc_call([self.DRBDSETUP, self.drbd_path, 'down'])
            except CalledProcessError:
                logger.error('Error while bringing down DRBD device %s',
                             self.drbd_path)

            self.allocator.remove_volume(self.drbd_id)
            self.drbd_id = None
            self.drbd_port = None
            self.drbd_path = None
            self.drbd_table = None
            self.drbd_status = dict(conn=None)

        # remove drbd meta volume
        if self.meta_volume is not None:
            try:
                self.storage.delete_volume(
                    self.pool.name,
                    self.volume.name + '.drbdmeta',
                )
            except:  # FIXME
                logger.exception('Error while removing DRBD metadata LV')
            self.meta_volume = None

        # remove copy DM
        if path_exists('/dev/mapper/' + self.dm_copy):
            try:
                subproc_call([self.DMSETUP, 'remove', self.dm_copy])
            except CalledProcessError:
                logger.error('Error while removing DM copy')
            self.dm_table = None

        # set mapper

    def setup(self):
        logger.debug('Create DRBD meta device')
        self.meta_volume = self.storage.create_volume(
            self.pool.name,
            self.volume.name + '.drbdmeta',
            # see
            # http://www.drbd.org/users-guide/ch-internals.html#s-meta-data-size
            # for external metadata size calculation
            max(self.volume.capacity / 32768 + 4 * 2 ** 20, 128 * 2 ** 20),
        )

        logger.debug('Create a copy DM of the LV')
        # get LV table
        try:
            self.dm_table = subproc_call([self.DMSETUP, 'table',
                                          '--showkeys', self.volume.path])
        except CalledProcessError:
            logger.error('Cannot get DM table of VM LV')
            raise DRBDError('Cannot get DM table of VM LV')
        # create new DM
        logger.debug('Got table of LV "%s"', self.dm_table)
        try:
            subproc_call([self.DMSETUP, 'create', self.dm_copy], self.dm_table)
        except CalledProcessError:
            logger.error('Cannot create copy DM of LV with table "%s"',
                         self.dm_table)
            raise
        logger.debug('Setup DRBD device')
        # get drbd path
        self.drbd_id = self.allocator.new_volume()
        self.drbd_port = 7788 + self.drbd_id  # FIXME magic number
        self.drbd_path = '/dev/drbd%d' % self.drbd_id
        # wipe drbd metadata (just in case)
        try:
            subproc_call([self.DRBDMETA, '--force', self.drbd_path,
                          'v08', self.meta_volume.path, '0', 'wipe-md'])
        except CalledProcessError:
            pass
        try:
            subproc_call([self.DRBDMETA, '--force', self.drbd_path,
                          'v08', self.meta_volume.path, '0', 'create-md'])
        except CalledProcessError:
            logger.error('Cannot create DRBD external metadata on device')
            raise DRBDError('Cannot create DRBD metadata')
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'disk',
                          '/dev/mapper/%s' % self.dm_copy,
                          self.meta_volume.path,
                         '0', '--create-device'])
        except CalledProcessError:
            logger.error('Error while creating DRBD device')
            raise DRBDError('Cannot create DRBD device')
        self.drbd_table = '0 %d linear %s 0' % (
            self.volume.capacity / 512,  # FIXME comment
            self.drbd_path,
        )

        logger.debug('Setup DRBD done')
        self.state = 'SETUP'

    def connect(self, remote_addr, remote_port):
        logger.debug('Setup networking for DRBD')
        # connect to remote node
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'net',
                          '0.0.0.0:%d' % self.drbd_port,
                          '%s:%d' % (remote_addr, remote_port),
                          'C', '-m', '-S', '10000000'])
        except CalledProcessError:
            logger.error('Error while setting up network facility for DRBD')
            raise DRBDError('Cannot set up network for DRBD')

        sleep(.5)  # FIXME
        logger.debug('Set up bandwidth limit')
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'syncer', '-r',
                          self.DRBD_RATE])
        except CalledProcessError:
            logger.error('Cannot set bandwidth rate limit on DRBD')
            raise DRBDError('Error while setting bandwidth limit')

        self.state = 'CONNECTED'

    def wait_connection(self):
        self.state = 'WAIT PEER CONNECT'
        sleep(.5)  # FIXME
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'wait-connect',
                          '-t', self.DRBD_TIMEOUT,
                          '-d', self.DRBD_TIMEOUT,
                          '-o', self.DRBD_TIMEOUT])
        except CalledProcessError:
            logger.error('Error while waiting for remote DRBD to connect,'
                         ' timeout = %s', self.DRBD_TIMEOUT)
            raise DRBDError('Error while waiting DRBD connect')

        sleep(.5)  # FIXME
        self.state = 'CONNECTED'

    def switch_primary(self):
        logger.debug('Switch DRBD %s in primary mode', self.drbd_path)
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'primary', '-o'])
        except CalledProcessError:
            logger.error('Error while switching to primary role (%s)',
                         self.drbd_path)
            raise DRBDError('Cannot switch to primary role')

        self.state = 'CONNECTED PRIMARY'

    def switch_secondary(self):
        logger.debug('Switch DRBD %s in secondary mode', self.drbd_path)
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'secondary'])
        except CalledProcessError:
            logger.error('Error while switching to secondary role (%s)',
                         self.drbd_path)
            raise DRBDError('Cannot switch to secondary role')

        self.state = 'CONNECTED SECONDARY'

    def wait_sync(self):
        self.state = 'WAIT SYNC'
        sleep(.5)  # FIXME

        logger.debug('Wait sync %s', self.drbd_path)
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'wait-sync'])
        except CalledProcessError:
            logger.error('Error while waiting for synchronisation of DRBD'
                         ' device (%s)', self.drbd_path)
            raise DRBDError('Wait sync error')

        self.state = 'SYNC DONE'

    def disconnect(self):
        try:
            subproc_call([self.DRBDSETUP, self.drbd_path, 'disconnect'])
        except CalledProcessError:
            logger.error('Error while disconnecting DRBD device %s',
                         self.drbd_path)
            raise DRBDError('Cannot disconnect device')

        self.state = 'DISCONNECTED'

    def status(self):
        """DRBD status."""
        try:
            out = subproc_call([self.DRBDSETUP, self.drbd_path, 'status'])
        except CalledProcessError:
            logger.error('Error while getting DRBD status (%s)', self.drbd_path)
            raise DRBDError('Status: error while executing DRBD status')
        try:
            status = et.ElementTree().parse(StringIO(out))
        except:
            logger.error('Error while parsing status command output for DRBD'
                         ' device %s', self.drbd_path)
            raise DRBDError('Status: cannot parse output')

        self.drbd_status = dict(
            conn=status.get('cs'),
            disk=status.get('ds1'),
            rdisk=status.get('ds2'),
            role=status.get('ro1'),
            rrole=status.get('ro2'),
            percent=status.get('resynced_percent', None),
        )
        return self.drbd_status

    def takeover(self):
        """Set up DRBD device as VM backing device."""
        logger.debug('DRBD takeover %s', self.drbd_path)
        assert self.drbd_table is not None
        try:
            subproc_call([self.DMSETUP, 'load', self.volume.path],
                         self.drbd_table)
        except CalledProcessError:
            logger.error('Error while loading new table for VM LV')
            raise DRBDError('Takeover: cannot load DM table')
        try:
            subproc_call([self.DMSETUP, 'suspend', self.volume.path])
        except CalledProcessError:
            logger.error('Error while suspending VM LV')
            raise DRBDError('Takeover: cannot suspend DM')
        try:
            subproc_call([self.DMSETUP, 'resume', self.volume.path])
        except CalledProcessError:
            logger.error('Error while resuming VM LV')
            raise DRBDError('Takeover: cannot resume DM')