# -*- coding: utf-8 -*- import xml.dom.minidom, os from time import sleep from errors import DRBDPoolError, DRBDError from utils import RWLock, Exec, Enum class DRBDPool(object): ''' ''' MAX_DEV = 16 _dev = dict([(i, None) for i in range(0, MAX_DEV)]) _dev_mutex = RWLock() def _release_device(self, minor): ''' ''' with self._dev_mutex.write: self._dev[minor] = None def get_device(self, minor): ''' ''' with self._dev_mutex.read: if minor > 0 and minor < self.MAX_DEV: if self._dev[minor] is not None: return self._dev[minor] else: raise DRBDPoolError('device ID `%i` is not allocated' % minor) else: raise DRBDPoolError('invalid device ID') def new_device(self): ''' ''' with self._dev_mutex.write: for minor, dev in self._dev.iteritems(): if dev is None: drbd = DRBD(self, minor) self._dev[minor] = drbd return drbd class DRBD(object): ''' ''' # tools binaries BIN_PATH = '/sbin' DRBDADM = os.path.join(BIN_PATH, 'drbdadm') DRBDMETA = os.path.join(BIN_PATH, 'drbdmeta') DRBDSETUP = os.path.join(BIN_PATH, 'drbdsetup') MODPROBE = os.path.join(BIN_PATH, 'modprobe') RMMOD = os.path.join(BIN_PATH, 'rmmod') # connection states CSTATES = Enum( STANDALONE = "StandAlone", DISCONNECTING = "Disconnecting", UNCONNECTED = "Unconnected", TIMEOUT = "Timeout", BROKEN_PIPE = "BrokenPipe", NETWORK_FAILURE = "NetworkFailure", PROTOCOL_ERROR = "ProtocolError", WF_CONNECTION = "WFConnection", WF_REPORT_PARAMS = "WFReportParams", TEAR_DOWN = "TearDown", CONNECTED = "Connected", STARTING_SYNC_S = "StartingSyncS", STARTING_SYNC_T = "StartingSyncT", WF_BITMAP_S = "WFBitMapS", WF_BITMAP_T = "WFBitMapT", WF_SYNC_UUID = "WFSyncUUID", SYNC_SOURCE = "SyncSource", SYNC_TARGET = "SyncTarget", PAUSED_SYNC_S = "PausedSyncS", PAUSED_SYNC_T = "PausedSyncT", VERIFY_S = "VerifyS", VERIFY_T = "VerifyT", ) # node roles NROLES = Enum( PRIMARY = "Primary", SECONDARY = "Secondary", UNKNOWN = "Unknown", ) # disk states DSTATES = Enum( DISKLESS = "Diskless", ATTACHING = "Attaching", FAILED = "Failed", NEGOTIATING = "Negotiating", INCONSISTENT = "Inconsistent", OUTDATED = "Outdated", UNKNOWN = "DUnknown", CONSISTENT = "Consistent", UP_TO_DATE = "UpToDate", ) _mutex = RWLock() def __init__(self, manager, minor): ''' ''' self._manager = manager self._minor = int(minor) self._path = '/dev/drbd%i' % self._minor self._meta = None self._dm_table = None # load kernel driver, do not check for error Exec.silent([self.RMMOD, 'drbd']) Exec.silent([self.MODPROBE, 'drbd', 'minor_count=100', 'usermode_helper=/bin/true']) # check that binaries are available for cmd in [self.DRBDADM, self.DRBDMETA, self.DRBDSETUP]: if not os.access(cmd, os.F_OK): raise DRBDError('failed to init DRBD device, support tool `%s`' ' is not available' % cmd) # check that device is not used if self.status()['conn'] is not None: print DRBDError('device minor `%i` is already configured, cannot' ' use it' % self._minor) def destroy(self): ''' ''' # bring down drbd device Exec.silent([self.DRBDSETUP, self._path, 'detach']) Exec.silent([self.DRBDSETUP, self._path, 'down']) # release device from the pool self._manager._release_device(self._minor) # suicide self._path = None self._minor = None def get_minor(self): ''' ''' return self._minor def get_port(self): ''' ''' return 7788 + self._minor # FIXME magic number def get_path(self): ''' ''' with self._mutex.read: return self._path def status(self, nolock = False): ''' ''' if not nolock: self._mutex.read.acquire() # fetch xml status try: rc, output = Exec.call([self.DRBDSETUP, self._path, 'status']) except Exception as err: if not nolock: self._mutex.read.release() raise err else: if rc: raise DRBDError('failed to get device status') # parse status try: xroot = xml.dom.minidom.parseString(output[0]) xres = xroot.firstChild status = {} status['conn'] = self.CSTATES.get(xres.getAttribute('cs')) status['disk'] = self.DSTATES.get(xres.getAttribute('ds1')) status['rdisk'] = self.DSTATES.get(xres.getAttribute('ds2')) status['role'] = self.NROLES.get(xres.getAttribute('ro1')) status['rrole'] = self.NROLES.get(xres.getAttribute('ro2')) if xres.hasAttribute('resynced_percent'): status['percent'] = xres.getAttribute('resynced_percent') else: status['percent'] = None return status except Exception as err: return {'conn' : None} finally: if not nolock: self._mutex.read.release() def setup(self, sourcedev, metadev): ''' ''' with self._mutex.write: if self.status(nolock=True)['conn'] is None: # wipe and init meta device rc = Exec.silent([self.DRBDMETA, '-f', self._path, 'v08', metadev, '0', 'wipe-md']) if rc != 0: raise DRBDError('failed to clean meta device, maybe it is' ' in use or does not exist') rc = Exec.silent([self.DRBDMETA, '-f', self._path, 'v08', metadev, '0', 'create-md']) if rc != 0: raise DRBDError('failed to create meta device, maybe it is' ' in use or does not exist') else: self._meta = metadev # create the drbd disk rc = Exec.silent([self.DRBDSETUP, self._path, 'disk', sourcedev, metadev, '0', '--create-device']) if rc != 0: raise DRBDError('failed to setup DRBD disk') else: raise DRBDError('cannot setup device, it is already configured') def connect(self, remote, remoteport, rate=50000): ''' ''' with self._mutex.write: # connect to remote node rc = Exec.silent([self.DRBDSETUP, self._path, 'net', '0.0.0.0:%i' % (self.get_port()), '%s:%i' % (remote, remoteport), 'C', '-m', '-S', '10000000']) if rc != 0: raise DRBDError('failed to initiate connection to remote node,' ' local port may already be in use') # FIXME magic sleep seems to be mandatory sleep(0.5) # force sync rate rc = Exec.silent([self.DRBDSETUP, self._path, 'syncer', '-r', str(rate)]) if rc != 0: raise DRBDError('failed to set sync rate') def disconnect(self): ''' ''' # disconnect from remote node Exec.silent([self.DRBDSETUP, self._path, 'disconnect']) def primary(self): ''' Switch the drbd to primary mode ''' with self._mutex.write: rc = Exec.silent([self.DRBDSETUP, self._path, 'primary', '-o']) if rc != 0: raise DRBDError('failed to switch to primary node mode') def secondary(self): ''' Switch the drbd to secondary mode ''' with self._mutex.write: rc = Exec.silent([self.DRBDSETUP, self._path, 'secondary']) if rc != 0: raise DRBDError('failed to switch to secondary node mode') def wait_connection(self, timeout=300): ''' Wait for connection with the remote drbd node ''' with self._mutex.read: sleep(0.5) #FIXME really needed ? rc = Exec.silent([self.DRBDSETUP, self._path, 'wait-connect', '-t', str(timeout), '-d', str(timeout), '-o', str(timeout)]) # FIXME magic sleep, seems to be mandatory sleep(0.5) try: if rc != 0 or self.status()['conn'] != self.CSTATES.CONNECTED: raise DRBDError('no connection after `%i` seconds' %timeout) except Exception as err: print err def wait_sync(self): ''' Wait for synchronization of the drbd device ''' with self._mutex.read: rc = Exec.silent([self.DRBDSETUP, self._path, 'wait-sync']) # FIXME magic sleep, seems to be mandatory sleep(0.5) status = self.status() if (rc != 0 or not status or ('disk' in status and status['disk'] != self.DSTATES.UP_TO_DATE)): raise DRBDError('synchronization failed')