Loading ccnode/jobs.py +295 −18 Original line number Diff line number Diff line Loading @@ -7,8 +7,7 @@ from threading import Lock, RLock, Thread, Event from logging import debug from utils import RWLock from drbd import DRBD from errors import (JobManagerError, JobError, XferJobError, ReceiveFileJobError, SendFileJobError, LVMError, DRBDError) from lvm import LVM from errors import (JobManagerError, JobError, XferJobError, ReceiveFileJobError, SendFileJobError, DrbdCopyJobError, LVMError, DRBDError, TCPTunnelJobError) Loading Loading @@ -356,7 +355,7 @@ class XferJob(BaseJob): # tcp port range used for transferts TCP_PORT_MIN = 42000 TCP_PORT_MAX = 42999 TCP_PORT_MAX = 43999 def __init__(self, manager, path): ''' Loading @@ -375,6 +374,7 @@ class XferJob(BaseJob): return self._checksum #TODO rewrite with the _cancel() and _cleanup() methods class SendFileJob(XferJob): ''' ''' Loading @@ -382,7 +382,7 @@ class SendFileJob(XferJob): ''' ''' self._host = remote_host self._port = int(remote_port) self._local_port = int(remote_port) super(SendFileJob, self).__init__(manager, path) def _validate(self): Loading @@ -391,8 +391,8 @@ class SendFileJob(XferJob): return ( super(SendFileJob, self)._validate() and os.path.isfile(self._path) and os.access(self._path, os.F_OK | os.R_OK ) and self._port >= self.TCP_PORT_MIN and self._port <= self.TCP_PORT_MAX and self._local_port >= self.TCP_PORT_MIN and self._local_port <= self.TCP_PORT_MAX and isinstance(self._host, str)) def _prepare(self): Loading @@ -409,14 +409,14 @@ class SendFileJob(XferJob): try: # try to connect self._log.append('connecting to `%s`:`%i`' % (self._host, self._port)) self._local_port)) self._sock.settimeout(10) retry = 0 connected = False while retry < 6 and not connected and not self._cancelled: retry += 1 try: self._sock.connect((self._host, self._port)) self._sock.connect((self._host, self._local_port)) except socket.timeout: pass except socket.error: Loading Loading @@ -466,17 +466,18 @@ class SendFileJob(XferJob): pass #TODO rewrite with the _cancel() and _cleanup() methods class ReceiveFileJob(XferJob): ''' ''' _ports_used = [] _ports_used_local = [] _ports_used_mutex = RWLock() def __init__(self, manager, path, create=False): ''' ''' self._create = create self._port = None self._local_port = None super(ReceiveFileJob, self).__init__(manager, path) def _validate(self): Loading Loading @@ -504,14 +505,14 @@ class ReceiveFileJob(XferJob): # find a port to bind to try: with self._ports_used_mutex.write: if port not in self._ports_used: if port not in self._ports_used_local: self._sock.bind(('', port)) self._port = port self._ports_used.append(port) self._local_port = port self._ports_used_local.append(port) break except socket.error: pass if not self._port: if not self._local_port: msg = 'no port to listen to' self._log.append(msg) raise ReceiveFileJobError(msg) Loading @@ -525,7 +526,7 @@ class ReceiveFileJob(XferJob): self._sock.listen(1) self._sock.settimeout(1) # wait for connection self._log.append('waiting for connection on port `%i`' % self._port) self._log.append('waiting for connection on port `%i`' % self._local_port) retry = 0 conn = None while retry < 60 and not conn and not self._cancelled: Loading Loading @@ -570,8 +571,8 @@ class ReceiveFileJob(XferJob): except: pass with self._ports_used_mutex.write: if self._port in self._ports_used: self._ports_used.remove(port) if self._local_port in self._ports_used_local: self._ports_used_local.remove(self._local_port) try: fd.close() except: Loading @@ -580,7 +581,283 @@ class ReceiveFileJob(XferJob): def get_port(self): ''' ''' return int(self._port) return int(self._local_port) class DrbdCopyJob(BaseJob): ''' ''' def __init__(self, manager, lvm, drbd_pool, source_vg, source_lv): ''' ''' self._mutex_step = Lock() self._lvm = lvm self._pool = drbd_pool self._source_vgname = source_vg self._source_lvname = source_lv super(DrbdCopyJob, self).__init__(manager) self._drbd = self._pool.new_device() # checkpoint events self._event_connected_ready = Event() self._event_connected = Event() self._event_rolechosen_ready = Event() self._event_rolechosen = Event() self._event_sleep_ready = Event() self._event_sleep = Event() def _cancel(self): ''' ''' # release some pending calls self._event_sleep_ready.set() self._event_sleep.set() def _cleanup(self): ''' ''' # destroy drbd device try: self._log.append('destroying the drbd device') self._drbd.destroy() except Exception as err: self._log.append('failed to destroy the drbd device') else: self._log.append('successfully destroyed the drbd device') # we wait for any potential job step to finish # should be quick after the drbd destruction (kills all waits) with self._mutex_step: # destroy drbd meta try: self._log.append('removing the drbd meta volume') self._lvm.get_vg(self._source_vgname).remove(self._meta_lvname) except Exception as err: self._log.append('failed to remove the drbd meta volume') else: self._log.append('successfully removed the drbd meta volume') # remove source LV copy try: self._log.append('removing the DM device copy') self._lvm.dm_remove(self._copy_dm) except Exception as err: self._log.append('failed to remove the DM device copy') else: self._log.append('successfully removed DM device copy') def _prepare(self): ''' ''' self._lvm.reload() # get source LV handle try: lv = self._lvm.get_lv(self._source_vgname, self._source_lvname) except: msg = 'failed to fetch LV `%s/%s`' % (self._source_vgname, self._source_lvname) self._log.append(msg) raise DrbdCopyJobError(msg) # get source LV infos self._source_path = lv.path self._source_size = lv.size self._source_dm = lv.dm_name() self._source_table = lv.dm_table() self._drbd_table = None # we got here, so all went fine # just a safety return lv.size > 0 def _job(self): ''' ''' # checkpoint if self._cancelled: return self._lvm.reload() with self._mutex_step: # create drbd meta try: self._log.append('creating a drbd meta volume') # now let's create a meta device for DRBD in the same VG # MS = C/32768 + 4MiB && MS >= 128MiB # http://www.drbd.org/users-guide/ch-internals.html self._meta_lvname = '%s.drbdmeta' % self._source_lvname self._meta_path = '/dev/%s/%s' % (self._source_vgname, self._meta_lvname) self._meta_size = int(max(self._source_size/32768 + 4 * 2**20, 128 * 2**20)) self._lvm.get_vg(self._source_vgname).create(self._meta_lvname, self._meta_size) self._lvm.reload() self._meta_dm = self._lvm.get_lv(self._source_vgname, self._meta_lvname).dm_name() except Exception as err: self._log.append('failed to create a drbd meta volume') raise err else: self._log.append('successfully created the drbd meta volume' ' `%s`' % self._meta_path) # checkpoint if self._cancelled: return with self._mutex_step: # create a copy of the source LV try: self._log.append('creating a device copy of `%s`' % self._source_path) self._copy_dm = '%s.%s.copy' % (self._source_vgname, self._source_lvname) self._copy_path = '/dev/mapper/%s' % self._copy_dm self._lvm.dm_create(self._copy_dm, self._source_table) except Exception as err: self._log.append('failed to create a device copy') raise err else: self._log.append('successfully copied device as `%s`' % self._copy_path) # checkpoint if self._cancelled: return with self._mutex_step: # setup the drbd device try: self._log.append('configuring the drbd device `%s`' % self._drbd.get_path()) self._drbd.setup(self._copy_path, self._meta_path) self._drbd_table = '0 %d linear %s 0' % (self._source_size / 512 , self._source_path) except Exception as err: self._log.append('failed to configure the drbd device') raise err else: self._log.append('successfully configured the drbd device') # blocking checkpoint # wait for call to connect() if self._cancelled: return self._event_connected_ready.set() self._event_connected.wait() if self._cancelled: return with self._mutex_step: # wait for a connection with the remote host try: self._log.append('waiting for node connection') self._drbd.wait_connection() except Exception as err: self._log.append('failed to establish a connection') raise err else: self._log.append('connected with a remote node') # blocking checkpoint # wait for call to role() if self._cancelled: return self._event_rolechosen_ready.set() self._event_rolechosen.wait() if self._cancelled: return # wait for the sync completion try: self._log.append('waiting for initial device sync') self._drbd.wait_sync() except Exception as err: self._log.append('failed to sync the DRBD') raise err else: self._log.append('DRBD is sync\'ed !') # sleep until the DRBD is shutdown self._log.append('job entering sleep') self._event_sleep_ready.set() self._event_sleep.wait() self._log.append('job leaving sleep') def drbd_get_port(self): ''' ''' return self._drbd.get_port() def drbd_connect(self, remote_addr, remote_port): ''' ''' if not self._event_connected.is_set(): if not self._event_connected_ready.is_set(): self._log.append('can\'t connect right now, waiting for ' 'initialization to complete...') self._event_connected_ready.wait() with self._mutex_step: # set endpoint self._drbd.connect(remote_addr, remote_port) # unlock any connection wait self._event_connected.set() else: raise DrbdCopyJobError('already connected') def drbd_role(self, primary=None): ''' ''' if self._event_connected.is_set(): self._event_rolechosen_ready.wait() if primary is True or primary is False: # change node role if primary: self._log.append('switching to primary mode') self._drbd.primary() else: self._log.append('switching to secondary mode') self._drbd.secondary() # unlock any role change wait self._event_rolechosen.set() else: raise DrbdCopyJobError('cannot change role, not connected yet') def drbd_waitsync(self): ''' ''' self._log.append('waiting for sync') # wait for initial sync and setup to be completed self._event_sleep_ready.wait() # wait for DRBD sync with self._mutex_step: self._drbd.wait_sync() def drbd_takeover(self, state): ''' ''' # FIXME comment that # FIXME check events with self._mutex_step: # if state is True: if self._drbd_table is not None: table = self._drbd_table else: raise DrbdCopyJobError('no DRBD table available yet') else: table = self._source_table # self._lvm.dm_set_table(self._source_dm, table) def drbd_stop(self): ''' ''' self._event_sleep_ready.wait(5) # to fix thread sync issue if self._event_sleep_ready.is_set(): with self._mutex_step: # unlock job termination self._event_sleep.set() else: raise DrbdCopyJobError('can\'t stop now') class TCPTunnelJob(BaseJob): Loading Loading
ccnode/jobs.py +295 −18 Original line number Diff line number Diff line Loading @@ -7,8 +7,7 @@ from threading import Lock, RLock, Thread, Event from logging import debug from utils import RWLock from drbd import DRBD from errors import (JobManagerError, JobError, XferJobError, ReceiveFileJobError, SendFileJobError, LVMError, DRBDError) from lvm import LVM from errors import (JobManagerError, JobError, XferJobError, ReceiveFileJobError, SendFileJobError, DrbdCopyJobError, LVMError, DRBDError, TCPTunnelJobError) Loading Loading @@ -356,7 +355,7 @@ class XferJob(BaseJob): # tcp port range used for transferts TCP_PORT_MIN = 42000 TCP_PORT_MAX = 42999 TCP_PORT_MAX = 43999 def __init__(self, manager, path): ''' Loading @@ -375,6 +374,7 @@ class XferJob(BaseJob): return self._checksum #TODO rewrite with the _cancel() and _cleanup() methods class SendFileJob(XferJob): ''' ''' Loading @@ -382,7 +382,7 @@ class SendFileJob(XferJob): ''' ''' self._host = remote_host self._port = int(remote_port) self._local_port = int(remote_port) super(SendFileJob, self).__init__(manager, path) def _validate(self): Loading @@ -391,8 +391,8 @@ class SendFileJob(XferJob): return ( super(SendFileJob, self)._validate() and os.path.isfile(self._path) and os.access(self._path, os.F_OK | os.R_OK ) and self._port >= self.TCP_PORT_MIN and self._port <= self.TCP_PORT_MAX and self._local_port >= self.TCP_PORT_MIN and self._local_port <= self.TCP_PORT_MAX and isinstance(self._host, str)) def _prepare(self): Loading @@ -409,14 +409,14 @@ class SendFileJob(XferJob): try: # try to connect self._log.append('connecting to `%s`:`%i`' % (self._host, self._port)) self._local_port)) self._sock.settimeout(10) retry = 0 connected = False while retry < 6 and not connected and not self._cancelled: retry += 1 try: self._sock.connect((self._host, self._port)) self._sock.connect((self._host, self._local_port)) except socket.timeout: pass except socket.error: Loading Loading @@ -466,17 +466,18 @@ class SendFileJob(XferJob): pass #TODO rewrite with the _cancel() and _cleanup() methods class ReceiveFileJob(XferJob): ''' ''' _ports_used = [] _ports_used_local = [] _ports_used_mutex = RWLock() def __init__(self, manager, path, create=False): ''' ''' self._create = create self._port = None self._local_port = None super(ReceiveFileJob, self).__init__(manager, path) def _validate(self): Loading Loading @@ -504,14 +505,14 @@ class ReceiveFileJob(XferJob): # find a port to bind to try: with self._ports_used_mutex.write: if port not in self._ports_used: if port not in self._ports_used_local: self._sock.bind(('', port)) self._port = port self._ports_used.append(port) self._local_port = port self._ports_used_local.append(port) break except socket.error: pass if not self._port: if not self._local_port: msg = 'no port to listen to' self._log.append(msg) raise ReceiveFileJobError(msg) Loading @@ -525,7 +526,7 @@ class ReceiveFileJob(XferJob): self._sock.listen(1) self._sock.settimeout(1) # wait for connection self._log.append('waiting for connection on port `%i`' % self._port) self._log.append('waiting for connection on port `%i`' % self._local_port) retry = 0 conn = None while retry < 60 and not conn and not self._cancelled: Loading Loading @@ -570,8 +571,8 @@ class ReceiveFileJob(XferJob): except: pass with self._ports_used_mutex.write: if self._port in self._ports_used: self._ports_used.remove(port) if self._local_port in self._ports_used_local: self._ports_used_local.remove(self._local_port) try: fd.close() except: Loading @@ -580,7 +581,283 @@ class ReceiveFileJob(XferJob): def get_port(self): ''' ''' return int(self._port) return int(self._local_port) class DrbdCopyJob(BaseJob): ''' ''' def __init__(self, manager, lvm, drbd_pool, source_vg, source_lv): ''' ''' self._mutex_step = Lock() self._lvm = lvm self._pool = drbd_pool self._source_vgname = source_vg self._source_lvname = source_lv super(DrbdCopyJob, self).__init__(manager) self._drbd = self._pool.new_device() # checkpoint events self._event_connected_ready = Event() self._event_connected = Event() self._event_rolechosen_ready = Event() self._event_rolechosen = Event() self._event_sleep_ready = Event() self._event_sleep = Event() def _cancel(self): ''' ''' # release some pending calls self._event_sleep_ready.set() self._event_sleep.set() def _cleanup(self): ''' ''' # destroy drbd device try: self._log.append('destroying the drbd device') self._drbd.destroy() except Exception as err: self._log.append('failed to destroy the drbd device') else: self._log.append('successfully destroyed the drbd device') # we wait for any potential job step to finish # should be quick after the drbd destruction (kills all waits) with self._mutex_step: # destroy drbd meta try: self._log.append('removing the drbd meta volume') self._lvm.get_vg(self._source_vgname).remove(self._meta_lvname) except Exception as err: self._log.append('failed to remove the drbd meta volume') else: self._log.append('successfully removed the drbd meta volume') # remove source LV copy try: self._log.append('removing the DM device copy') self._lvm.dm_remove(self._copy_dm) except Exception as err: self._log.append('failed to remove the DM device copy') else: self._log.append('successfully removed DM device copy') def _prepare(self): ''' ''' self._lvm.reload() # get source LV handle try: lv = self._lvm.get_lv(self._source_vgname, self._source_lvname) except: msg = 'failed to fetch LV `%s/%s`' % (self._source_vgname, self._source_lvname) self._log.append(msg) raise DrbdCopyJobError(msg) # get source LV infos self._source_path = lv.path self._source_size = lv.size self._source_dm = lv.dm_name() self._source_table = lv.dm_table() self._drbd_table = None # we got here, so all went fine # just a safety return lv.size > 0 def _job(self): ''' ''' # checkpoint if self._cancelled: return self._lvm.reload() with self._mutex_step: # create drbd meta try: self._log.append('creating a drbd meta volume') # now let's create a meta device for DRBD in the same VG # MS = C/32768 + 4MiB && MS >= 128MiB # http://www.drbd.org/users-guide/ch-internals.html self._meta_lvname = '%s.drbdmeta' % self._source_lvname self._meta_path = '/dev/%s/%s' % (self._source_vgname, self._meta_lvname) self._meta_size = int(max(self._source_size/32768 + 4 * 2**20, 128 * 2**20)) self._lvm.get_vg(self._source_vgname).create(self._meta_lvname, self._meta_size) self._lvm.reload() self._meta_dm = self._lvm.get_lv(self._source_vgname, self._meta_lvname).dm_name() except Exception as err: self._log.append('failed to create a drbd meta volume') raise err else: self._log.append('successfully created the drbd meta volume' ' `%s`' % self._meta_path) # checkpoint if self._cancelled: return with self._mutex_step: # create a copy of the source LV try: self._log.append('creating a device copy of `%s`' % self._source_path) self._copy_dm = '%s.%s.copy' % (self._source_vgname, self._source_lvname) self._copy_path = '/dev/mapper/%s' % self._copy_dm self._lvm.dm_create(self._copy_dm, self._source_table) except Exception as err: self._log.append('failed to create a device copy') raise err else: self._log.append('successfully copied device as `%s`' % self._copy_path) # checkpoint if self._cancelled: return with self._mutex_step: # setup the drbd device try: self._log.append('configuring the drbd device `%s`' % self._drbd.get_path()) self._drbd.setup(self._copy_path, self._meta_path) self._drbd_table = '0 %d linear %s 0' % (self._source_size / 512 , self._source_path) except Exception as err: self._log.append('failed to configure the drbd device') raise err else: self._log.append('successfully configured the drbd device') # blocking checkpoint # wait for call to connect() if self._cancelled: return self._event_connected_ready.set() self._event_connected.wait() if self._cancelled: return with self._mutex_step: # wait for a connection with the remote host try: self._log.append('waiting for node connection') self._drbd.wait_connection() except Exception as err: self._log.append('failed to establish a connection') raise err else: self._log.append('connected with a remote node') # blocking checkpoint # wait for call to role() if self._cancelled: return self._event_rolechosen_ready.set() self._event_rolechosen.wait() if self._cancelled: return # wait for the sync completion try: self._log.append('waiting for initial device sync') self._drbd.wait_sync() except Exception as err: self._log.append('failed to sync the DRBD') raise err else: self._log.append('DRBD is sync\'ed !') # sleep until the DRBD is shutdown self._log.append('job entering sleep') self._event_sleep_ready.set() self._event_sleep.wait() self._log.append('job leaving sleep') def drbd_get_port(self): ''' ''' return self._drbd.get_port() def drbd_connect(self, remote_addr, remote_port): ''' ''' if not self._event_connected.is_set(): if not self._event_connected_ready.is_set(): self._log.append('can\'t connect right now, waiting for ' 'initialization to complete...') self._event_connected_ready.wait() with self._mutex_step: # set endpoint self._drbd.connect(remote_addr, remote_port) # unlock any connection wait self._event_connected.set() else: raise DrbdCopyJobError('already connected') def drbd_role(self, primary=None): ''' ''' if self._event_connected.is_set(): self._event_rolechosen_ready.wait() if primary is True or primary is False: # change node role if primary: self._log.append('switching to primary mode') self._drbd.primary() else: self._log.append('switching to secondary mode') self._drbd.secondary() # unlock any role change wait self._event_rolechosen.set() else: raise DrbdCopyJobError('cannot change role, not connected yet') def drbd_waitsync(self): ''' ''' self._log.append('waiting for sync') # wait for initial sync and setup to be completed self._event_sleep_ready.wait() # wait for DRBD sync with self._mutex_step: self._drbd.wait_sync() def drbd_takeover(self, state): ''' ''' # FIXME comment that # FIXME check events with self._mutex_step: # if state is True: if self._drbd_table is not None: table = self._drbd_table else: raise DrbdCopyJobError('no DRBD table available yet') else: table = self._source_table # self._lvm.dm_set_table(self._source_dm, table) def drbd_stop(self): ''' ''' self._event_sleep_ready.wait(5) # to fix thread sync issue if self._event_sleep_ready.is_set(): with self._mutex_step: # unlock job termination self._event_sleep.set() else: raise DrbdCopyJobError('can\'t stop now') class TCPTunnelJob(BaseJob): Loading