Newer
Older
import socket
import logging
from os.path import exists as path_exists
from time import sleep
from StringIO import StringIO
from subprocess import CalledProcessError
from xml.etree import ElementTree as et
from cloudcontrol.node.exc import TunnelError, DRBDAllocationError, DRBDError
from cloudcontrol.node.jobs import BaseIOJob, ForkedJob
from cloudcontrol.node.utils import SocketBuffer, subproc_call, Singleton
logger = logging.getLogger(__name__)
class ImportVolume(BaseIOJob):
"""Import volume job.
"""
BUFFER_LEN = 8192 * 16
HASH = md5
def __init__(self, job_manager, volume):
BaseIOJob.__init__(self, job_manager)
self.checksum = None
self.volume = volume
# where the other node will connect
self.port = None
# fds
self.sock = None
self.client_sock = None
self.disk = None
@property
def open_fds(self):
return [fo.fileno() for fo in (self.sock, self.client_sock, self.disk)
if fo is not None]
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
def clean_fds(self):
if self.sock is not None:
self.sock.close()
self.sock = None
if self.client_sock is not None:
self.client_sock.close()
self.client_sock = None
if self.disk is not None:
self.disk.close()
self.disk = None
def pre_job(self):
"""
:returns: port number the socket is listening on
"""
# create socket
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
logger.exception('Error while creating socket for volume export')
self.clean_fds()
raise
try:
self.sock.settimeout(10.)
except socket.error:
logger.exception('Cannot set timeout on socket for volume export')
self.clean_fds()
raise
try:
self.sock.bind(('0.0.0.0', 0))
except socket.error:
logger.exception('Error while binding socket for volume export')
self.clean_fds()
raise
try:
self.sock.listen(1)
except socket.error:
logger.exception('Error while listening on socket')
self.clean_fds()
raise
# open local disk
try:
self.disk = io.open(self.volume.path, 'wb', 0)
except IOError:
logger.exception('Error while trying to open local disk')
self.clean_fds()
raise
self.port = self.sock.getsockname()[1]
return self.port
def run_job(self):
try:
self.client_sock, _ = self.sock.accept()
except socket.timeout:
sys.stderr.write('Error for importing job: client did not connect\n')
self.clean_fds()
raise
except socket.error:
sys.stderr.write('Error while accepting socket\n')
self.clean_fds()
raise
# close the listening socket
self.sock.close()
self.sock = None
checksum = self.HASH()
# start downloading disk image
while self.running:
try:
received = [] # keep a list of received buffers in order to do
# only one concatenation in the end
total_received = 0
while True:
recv_buf = self.client_sock.recv(self.BUFFER_LEN - total_received)
# sys.stderr.write('Received %d\n' % len(recv_buf))
if not recv_buf: # EOF
# in case received in not empty, we will come back here
# once again and it returns EOF one more time
break
total_received += len(recv_buf)
received.append(recv_buf)
if total_received == self.BUFFER_LEN:
break
except socket.error:
sys.stderr.write('Error while receiving disk image\n')
self.clean_fds()
raise
buffer_ = b''.join(received)
if not buffer_:
sys.stderr.write('Received EOF import job\n')
break
checksum.update(buffer_)
try:
written = 0
# FIXME never write small chuncks
# in which case does disk.write would not write all the buffer ?
to_send = buffer_
while True:
written += self.disk.write(to_send)
# sys.stderr.write('Written %s to disk\n' % written)
to_send = buffer(buffer_, written)
if not to_send:
break
except IOError:
sys.stderr.write('Error while writing image to disk\n')
self.clean_fds()
raise
# here we could not have received the full disk but we don't consider
# this as an error in the import part
self.checksum = checksum.hexdigest()
# clean the fds
self.clean_fds()
sys.stderr.write('Volume import done\n')
class ExportVolume(BaseIOJob):
"""Export volume job.
"""
BUFFER_LEN = 8192 * 16
HASH = md5
def __init__(self, job_manager, volume, raddr, rport):
"""
:param volume: :class:`Volume` instance
:param raddr: remote IP address
:param rport: remote TCP port
"""
BaseIOJob.__init__(self, job_manager)
# where to connect to send the volume
self.raddr = raddr
self.rport = rport
self.volume = volume
self.checksum = None
# fds
self.sock = None
self.disk = None
@property
def open_fds(self):
return [fo.fileno() for fo in (self.sock, self.disk)
if fo is not None]
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def clean_fds(self):
if self.sock is not None:
self.sock.close()
self.sock = None
if self.disk is not None:
self.disk.close()
self.disk = None
def pre_job(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# connect to the remote host
try:
self.sock.connect((self.raddr, self.rport))
except socket.error as exc:
logger.exception('Error while trying to connect to remote host %s',
os.strerror(exc.errno))
self.clean_fds()
raise
# open local volume
try:
self.disk = io.open(self.volume.path, 'rb', 0)
except IOError:
logger.exception('Error while opening disk for export job')
self.clean_fds()
raise
def run_job(self):
checksum = self.HASH()
# sent_count = 0
# do copy
while self.running:
try:
read = self.disk.read(self.BUFFER_LEN)
except IOError:
sys.stderr.write('Error while reading from disk\n')
self.clean_fds()
break
# read length may be less than BUFFER_LEN but we don't care as it
# will go over TCP
if not read: # end of file
# sys.stderr.write('EOF, exported %d bytes\n' % sent_count)
break
# sent_count += len(read)
# sys.stderr.write('Read %d from disk\n' % len(read))
checksum.update(read)
try:
self.sock.sendall(read)
except socket.error:
sys.stderr.write('Error while sending through socket\n')
self.clean_fds()
break
self.checksum = checksum.hexdigest()
self.clean_fds()
"""Handles a TCP tunnel."""
BUFFER_LEN = 8096
def __init__(self, job_manager, connect=None, listen='0.0.0.0'):
"""
:param job_manager: :class:`JobManager` instance
:param connect: where to connect one end of the tunnel (a tuple, as
given to socket.connect)
:param listen: which interface to listen to for the other end of the
tunnel
"""
ForkedJob.__init__(self, job_manager)
# create a new libev loop that will run inside our child
self.ev_loop = pyev.Loop()
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
self.connect = connect
self.listen = listen
#: port is assigned by the kernel
self.port = None
# keep state information for both ends
self.listen_state = 'CLOSED'
self.connect_state = 'CLOSED'
#: very basic error report
self.error = None
# these are the watchers
self.source_reader = None
self.source_writer = None
self.dest_reader = None
self.dest_writer = None
#: source_sock is the socket that will listen for remote|local to happen
self.source_sock = None
#: dest sock connects to an other setuped tunnel
self.dest_sock = None
# input buffer is used for data that is coming from source_sock and goes
# to dest_sock
self.input_buffer = SocketBuffer()
# output_buffer is usde for data that is coming from dest_sock and goes
# to source_sock
self.output_buffer = SocketBuffer()
@property
def open_fds(self):
return [fo.fileno() for fo in (self.source_sock, self.dest_sock)
if fo is not None]
def after_fork(self):
self.ev_loop.reset()
# as this could be called from child, don't use logger (this is for
# debug anyway)
sys.stderr.write('Closing job %d' % self.id)
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
# stop watchers
if self.source_reader is not None:
self.source_reader.stop()
self.source_reader = None
if self.source_writer is not None:
self.source_writer.stop()
self.source_writer = None
if self.dest_reader is not None:
self.dest_reader.stop()
self.dest_reader = None
if self.dest_writer is not None:
self.dest_writer.stop()
self.dest_writer = None
# close sockets
if self.source_sock is not None:
self.source_sock.close()
self.source_sock = None
if self.dest_sock is not None:
self.dest_sock.close()
self.dest_sock = None
# clear buffers (this memory won't be needed anyway)
self.input_buffer = None
self.output_buffer = None
# reset states
self.listen_state = 'CLOSED'
self.connect_state = 'CLOSED'
def stop(self):
self.close()
def setup_listen(self, interface=None):
"""Setup source socket.
:param interface: specify which interface to listen onto
"""
if interface is not None:
self.listening = interface
logger.debug('Setup listening %s %d', self.listen, self.id)
try:
self.source_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
logger.exception('Error while creating source_sock for tunnel job'
' %d', self.id)
self.close()
raise
try:
self.source_sock.setblocking(0)
except socket.error:
logger.exception('Cannot set source_sock in blocking mode for'
' tunnel job %d', self.id)
self.close()
raise
try:
self.source_sock.bind((self.listen, 0))
except socket.error:
logger.exception('Error while binding source_sock for tunnel job'
' %d', self.id)
self.close()
raise
self.port = self.source_sock.getsockname()[1]
try:
self.source_sock.listen(1)
except socket.error:
logger.exception('Error while listening on source_sock for tunnel'
' job %d', self.id)
self.close()
raise
self.listen_state = 'LISTENING'
# ready to accept
self.source_reader = self.ev_loop.io(self.source_sock,
pyev.EV_READ, self.accept_cb)
self.source_reader.start()
def setup_connect(self, endpoint=None):
"""Start connection to remote end.
:param endpoint: specify where to connect (same as connect argument in
constructor), can be specified in both places
"""
if endpoint is not None:
self.connect = endpoint
if self.connect is None:
raise TunnelError('Remote endpoint to connect to was not specified')
logger.debug('Connect to endpoint %s %d', self.connect, self.id)
try:
if isinstance(self.connect, tuple):
addr_family = socket.AF_INET
else:
addr_family = socket.AF_UNIX
self.dest_sock = socket.socket(addr_family, socket.SOCK_STREAM)
except socket.error:
logger.exception('Error while creating dest_sock for tunnel job'
' %d', self.id)
self.close()
raise
try:
self.dest_sock.setblocking(0)
except socket.error:
logger.exception('Error while sitting non block mode on dest_sock'
' for tunnel job %d', self.id)
raise
error = self.dest_sock.connect_ex(self.connect)
if error and error != errno.EINPROGRESS:
raise socket.error('Error during connect for tunnel job, %s' %
os.strerror(error))
self.dest_writer = self.ev_loop.io(self.dest_sock,
pyev.EV_WRITE, self.connect_cb)
self.dest_writer.start()
self.connect_state = 'CONNECTING'
def run_job(self):
sys.stderr.write('Will start ev loop in child\n')
self.ev_loop.start()
def accept_cb(self, watcher, revents):
try:
new_source, remote = self.source_sock.accept()
except socket.error as exc:
if exc.errno == errno.EAGAIN or errno.EWOULDBLOCK:
# we will come back
return
# else
self.fatal_exc('Error while accepting new connection on'
' sock_source for tunnel job')
# everything went fine
self.source_sock.close() # we won't accept connections
self.source_sock = new_source
# set new socket non blocking
try:
self.source_sock.setblocking(0)
except socket.error as exc:
self.fatal_exc('Cannot set source socket in non blocking for'
' tunnel job: %s', os.strerror(exc.errno))
self.source_reader.stop()
self.source_reader = self.ev_loop.io(new_source, pyev.EV_READ,
self.read_cb)
self.source_writer = self.ev_loop.io(new_source, pyev.EV_WRITE,
self.write_cb)
sys.stderr.write('Successfully accepted remote client %s for tunnel'
' job %d\n' % (remote, self.id))
self.listen_state = 'CONNECTED'
if self.connect_state == 'CONNECTED':
# start the watchers only if both ends are ready to accept data
self.source_reader.start()
self.dest_reader.start()
def connect_cb(self, watcher, revents):
# check that connection was a success
error = self.dest_sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if error:
self.fatal('Error during connect for tunnel job, %s\n' %
os.strerror(error))
# else we setup watcher with proper events
self.dest_reader = self.ev_loop.io(self.dest_sock, pyev.EV_READ,
self.read_cb)
self.dest_writer.stop()
self.dest_writer = self.ev_loop.io(self.dest_sock, pyev.EV_WRITE,
self.write_cb)
sys.stderr.write('Successfully connected to remote endpoint %s %d\n' %
(self.connect, self.id))
self.connect_state = 'CONNECTED'
if self.listen_state == 'CONNECTED':
# start the watchers only if both ends are ready to accept data
self.source_reader.start()
self.dest_reader.start()
def read_cb(self, watcher, revents):
if watcher == self.dest_reader:
# sys.stderr.write('Read event on dest %s\n' % self.id)
sock = self.dest_sock
buffer_ = self.output_buffer
other_watcher = self.source_writer
else:
# sys.stderr.write('Read event on source %s\n' % self.id)
sock = self.source_sock
buffer_ = self.input_buffer
other_watcher = self.dest_writer
# sys.stderr.write('Will loop into event\n')
while True:
try:
incoming = sock.recv(self.BUFFER_LEN)
except socket.error as exc:
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
self.fatal_exc('Unexpected error while reading on socket'
' for tunnel job, %s\n', os.strerror(exc.errno))
# sys.stderr.write('Read %d bytes\n' % len(incoming))
buffer_.append(incoming)
if buffer_.is_full():
# sys.stderr.write('Buffer is full\n')
watcher.stop()
break
# we did read some bytes that we could write to the other end
if not buffer_.is_empty():
# sys.stderr.write('Starting other watcher\n')
# sys.stderr.write('Read event done\n')
def write_cb(self, watcher, revents):
if watcher == self.dest_writer:
# sys.stderr.write('Write event on dest %s', self.id)
sock = self.dest_sock
buffer_ = self.input_buffer
other_watcher = self.source_reader
else:
# sys.stderr.write('Write event on source %s\n' % self.id)
sock = self.source_sock
buffer_ = self.output_buffer
other_watcher = self.dest_reader
while True:
try:
to_send = buffer_.popleft()
except IndexError:
# buffer is empty, we should stop write event
# sys.stderr.write('Buffer is empty\n')
watcher.stop()
break
send_buffer = to_send
total_sent = 0
while True:
try:
written = sock.send(send_buffer)
except socket.error as exc:
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
buffer_.appendleft(to_send[total_sent:])
self.fatal_exc('Unexpected error while writting on socket'
' for tunnel job, %s',
os.strerror(exc.errno))
# sys.stderr.write('Written %d bytes\n' % written)
if written == len(send_buffer):
break
# else
total_sent += written
send_buffer = buffer(to_send, total_sent)
# if we can read on the other end
if not buffer_.is_full():
# sys.stderr.write('Starting other watcher\n')
# sys.stderr.write('Proccessed write event\n')
class DRBDAllocator(object):
"""Keeps a list of allocated DRBD devices."""
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
RMMOD = '/sbin/rmmod'
MODPROBE = '/sbin/modprobe'
#: maximum number of DRBD devices
MINOR_MAX = 100
def __init__(self):
self.volumes = set()
self.reload_kernel_module()
def new_volume(self):
for i in xrange(self.MINOR_MAX):
if i not in self.volumes:
self.volumes.add(i)
break
else:
raise DRBDAllocationError('Cannot allocate DRBD volume')
return i
def remove_volume(self, id_):
self.volumes.remove(id_)
def reload_kernel_module(self):
# FIXME find an other way to set parameters to drbd module
# try to remove kernel module
try:
subproc_call([self.RMMOD, 'drbd'])
except CalledProcessError:
# this is not an error if drbd module wasn't loaded
if 'drbd' in open('/proc/modules').read():
logger.error('Cannot remove drbd kernel module')
raise
# load kernel module with proper parameters
try:
# we use greater minor_count than the default which seems to small.
# we set usermode helper to bin true because by default, the module
# is calling some drbd helpers that returns non 0 value and make the
# synchronisation halt.
subproc_call([self.MODPROBE, 'drbd',
'minor_count=%d' % self.MINOR_MAX,
'usermode_helper=/bin/true'])
except CalledProcessError:
logger.error('Cannot load drbd kernel module')
class DRBD(object):
"""Manage DRBD job."""
DMSETUP = '/sbin/dmsetup'
DRBDSETUP = '/sbin/drbdsetup'
DRBDMETA = '/sbin/drbdmeta'
DRBD_TIMEOUT = '30'
DRBD_RATE = '50000'
def __init__(self, job_manager, ev_loop, storage_index, lvm_pool, lvm_volume):
"""
:param job_manager: :class:`JobManager` instance
:param ev_loop: ev loop instance
:param storage_index: :class:`StorageIndex` instance
:param lvm_pool: :class:`Storage` instance
:param lvm_volume: :class:`Volume` instance
"""
#: job id
self.id = job_manager.job_id.next()
self.allocator = DRBDAllocator()
# define a set of states
self.state = 'INIT'
self.storage = storage_index
self.pool = lvm_pool
self.volume = lvm_volume
self.meta_volume = None
#: DRBD id as returned by DRBDAllocator
self.drbd_id = None
self.drbd_port = None
#: DRBD device full path
self.drbd_path = None
self.drbd_table = None
self.drbd_status = dict(conn=None)
#: name of DM copy of LV
self.dm_table = None
self.dm_copy = '%s-%s.copy' % (
'vg', self.volume.name.replace('-', '--'))
# each step is executed in the RPC call thread, thus exception are
# propagated directly to the cc-server
def stop(self):
pass
def cleanup(self):
# reset DM to initial state
try:
table = subproc_call([self.DMSETUP, 'table', self.volume.path])
except CalledProcessError:
logger.error('Error while getting table of VM LV')
else:
if table != self.dm_table:
try:
subproc_call([self.DMSETUP, 'load', self.volume.path],
self.dm_table)
subproc_call([self.DMSETUP, 'suspend', self.volume.path])
subproc_call([self.DMSETUP, 'resume', self.volume.path])
except CalledProcessError:
logger.error('Error while loading back VM LV table')
# FIXME this is kind of critical, we should tell the user to
# call a Gaetant
# stop drbd volume
# if path_exists(self.drbd_path):
if self.drbd_id is not None:
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'disconnect'])
except CalledProcessError:
logger.error('Error while disconnecting DRBD device %s',
self.drbd_path)
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'secondary'])
except CalledProcessError:
logger.error('Error while switching DRBD device to secondary'
' (%s)', self.drbd_path)
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'detach'])
except CalledProcessError:
logger.error('Error while detaching DRBD device %s',
self.drbd_path)
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'down'])
except CalledProcessError:
logger.error('Error while bringing down DRBD device %s',
self.drbd_path)
self.allocator.remove_volume(self.drbd_id)
self.drbd_id = None
self.drbd_port = None
self.drbd_path = None
self.drbd_table = None
self.drbd_status = dict(conn=None)
# remove drbd meta volume
if self.meta_volume is not None:
try:
self.storage.delete_volume(
self.pool.name,
self.volume.name + '.drbdmeta',
)
except: # FIXME
logger.exception('Error while removing DRBD metadata LV')
self.meta_volume = None
# remove copy DM
if path_exists('/dev/mapper/' + self.dm_copy):
try:
subproc_call([self.DMSETUP, 'remove', self.dm_copy])
except CalledProcessError:
logger.error('Error while removing DM copy')
self.dm_table = None
# set mapper
def setup(self):
logger.debug('Create DRBD meta device')
self.meta_volume = self.storage.create_volume(
self.pool.name,
self.volume.name + '.drbdmeta',
# see
# http://www.drbd.org/users-guide/ch-internals.html#s-meta-data-size
# for external metadata size calculation
max(self.volume.capacity / 32768 + 4 * 2 ** 20, 128 * 2 ** 20),
)
logger.debug('Create a copy DM of the LV')
# get LV table
try:
self.dm_table = subproc_call([self.DMSETUP, 'table',
'--showkeys', self.volume.path])
except CalledProcessError:
logger.error('Cannot get DM table of VM LV')
raise DRBDError('Cannot get DM table of VM LV')
# create new DM
logger.debug('Got table of LV "%s"', self.dm_table)
try:
subproc_call([self.DMSETUP, 'create', self.dm_copy], self.dm_table)
except CalledProcessError:
logger.error('Cannot create copy DM of LV with table "%s"',
self.dm_table)
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
raise
logger.debug('Setup DRBD device')
# get drbd path
self.drbd_id = self.allocator.new_volume()
self.drbd_port = 7788 + self.drbd_id # FIXME magic number
self.drbd_path = '/dev/drbd%d' % self.drbd_id
# wipe drbd metadata (just in case)
try:
subproc_call([self.DRBDMETA, '--force', self.drbd_path,
'v08', self.meta_volume.path, '0', 'wipe-md'])
except CalledProcessError:
pass
try:
subproc_call([self.DRBDMETA, '--force', self.drbd_path,
'v08', self.meta_volume.path, '0', 'create-md'])
except CalledProcessError:
logger.error('Cannot create DRBD external metadata on device')
raise DRBDError('Cannot create DRBD metadata')
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'disk',
'/dev/mapper/%s' % self.dm_copy,
self.meta_volume.path,
'0', '--create-device'])
except CalledProcessError:
logger.error('Error while creating DRBD device')
raise DRBDError('Cannot create DRBD device')
self.drbd_table = '0 %d linear %s 0' % (
self.volume.capacity / 512, # FIXME comment
self.drbd_path,
)
logger.debug('Setup DRBD done')
self.state = 'SETUP'
def connect(self, remote_addr, remote_port):
logger.debug('Setup networking for DRBD')
# connect to remote node
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'net',
'0.0.0.0:%d' % self.drbd_port,
'%s:%d' % (remote_addr, remote_port),
'C', '-m', '-S', '10000000'])
except CalledProcessError:
logger.error('Error while setting up network facility for DRBD')
raise DRBDError('Cannot set up network for DRBD')
sleep(.5) # FIXME
logger.debug('Set up bandwidth limit')
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'syncer', '-r',
self.DRBD_RATE])
except CalledProcessError:
logger.error('Cannot set bandwidth rate limit on DRBD')
raise DRBDError('Error while setting bandwidth limit')
self.state = 'CONNECTED'
def wait_connection(self):
self.state = 'WAIT PEER CONNECT'
sleep(.5) # FIXME
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'wait-connect',
'-t', self.DRBD_TIMEOUT,
'-d', self.DRBD_TIMEOUT,
'-o', self.DRBD_TIMEOUT])
except CalledProcessError:
logger.error('Error while waiting for remote DRBD to connect,'
' timeout = %s', self.DRBD_TIMEOUT)
raise DRBDError('Error while waiting DRBD connect')
sleep(.5) # FIXME
self.state = 'CONNECTED'
def switch_primary(self):
logger.debug('Switch DRBD %s in primary mode', self.drbd_path)
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'primary', '-o'])
except CalledProcessError:
logger.error('Error while switching to primary role (%s)',
self.drbd_path)
raise DRBDError('Cannot switch to primary role')
self.state = 'CONNECTED PRIMARY'
def switch_secondary(self):
logger.debug('Switch DRBD %s in secondary mode', self.drbd_path)
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'secondary'])
except CalledProcessError:
logger.error('Error while switching to secondary role (%s)',
self.drbd_path)
raise DRBDError('Cannot switch to secondary role')
self.state = 'CONNECTED SECONDARY'
def wait_sync(self):
self.state = 'WAIT SYNC'
sleep(.5) # FIXME
logger.debug('Wait sync %s', self.drbd_path)
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'wait-sync'])
except CalledProcessError:
logger.error('Error while waiting for synchronisation of DRBD'
' device (%s)', self.drbd_path)
raise DRBDError('Wait sync error')
self.state = 'SYNC DONE'
def disconnect(self):
try:
subproc_call([self.DRBDSETUP, self.drbd_path, 'disconnect'])
except CalledProcessError:
logger.error('Error while disconnecting DRBD device %s',
self.drbd_path)
raise DRBDError('Cannot disconnect device')
self.state = 'DISCONNECTED'
def status(self):
"""DRBD status."""
try:
out = subproc_call([self.DRBDSETUP, self.drbd_path, 'status'])
except CalledProcessError:
logger.error('Error while getting DRBD status (%s)', self.drbd_path)
raise DRBDError('Status: error while executing DRBD status')
try:
status = et.ElementTree().parse(StringIO(out))
except:
logger.error('Error while parsing status command output for DRBD'
' device %s', self.drbd_path)
raise DRBDError('Status: cannot parse output')
self.drbd_status = dict(
conn=status.get('cs'),
disk=status.get('ds1'),
rdisk=status.get('ds2'),
role=status.get('ro1'),
rrole=status.get('ro2'),
percent=status.get('resynced_percent', None),
)
return self.drbd_status
def takeover(self):
"""Set up DRBD device as VM backing device."""
logger.debug('DRBD takeover %s', self.drbd_path)
assert self.drbd_table is not None
try:
subproc_call([self.DMSETUP, 'load', self.volume.path],
self.drbd_table)
except CalledProcessError:
logger.error('Error while loading new table for VM LV')
raise DRBDError('Takeover: cannot load DM table')
try:
subproc_call([self.DMSETUP, 'suspend', self.volume.path])
except CalledProcessError:
logger.error('Error while suspending VM LV')
raise DRBDError('Takeover: cannot suspend DM')
try:
subproc_call([self.DMSETUP, 'resume', self.volume.path])
except CalledProcessError:
logger.error('Error while resuming VM LV')
raise DRBDError('Takeover: cannot resume DM')