# -*- coding: utf-8 -*- import os, time, socket, select from hashlib import md5 from datetime import datetime from threading import Lock, RLock, Thread, Event from logging import debug from utils import RWLock from drbd import DRBD from lvm import LVM from errors import (JobManagerError, JobError, XferJobError, ReceiveFileJobError, SendFileJobError, DrbdCopyJobError, LVMError, DRBDError, TCPTunnelJobError) #fixme import traceback, sys class JobManager(object): ''' ''' MAX_JID = 65535 def __init__(self): ''' ''' super(JobManager, self).__init__() self._mutex = RLock() self._next_jid = 1 # job objects indexed by job ID self._jobs = {} # new jobs IDs self._jobs_pending = [] # running jobs IDs self._jobs_running = [] # dead jobs IDs self._jobs_crashed = [] self._jobs_finished = [] def _job_crashed(self, jid): debug('JobManager._job_crashed: id=`%i`' % jid) with self._mutex: # job should exist job = self.get_job(jid) # move job to the crashed queue self._jobs_crashed.append(jid) # set the cancelled flag to block pending commands job._cancelled = True # clean any queue that may contain the job for queue in [self._jobs_pending, self._jobs_running, self._jobs_finished]: if jid in queue: queue.remove(jid) # FIXME put at the end because it may raise errors # execute job self cleanup job._cleanup() def _job_finished(self, jid): debug('JobManager._job_finished: id=`%i`' % jid) with self._mutex: # job should exist job = self.get_job(jid) # move job to the finished queue self._jobs_finished.append(jid) # remove from running queue self._jobs_running.remove(jid) # FIXME put at the end because it may raise errors # execute job self cleanup job._cleanup() def run(self): ''' ''' # FIXME while True: pass def append(self, job): ''' ''' with self._mutex: jid = self._next_jid self._next_jid += 1 self._jobs[jid] = job self._jobs_pending.append(jid) debug('JobManager.append: new job pending with id `%i`', jid) return jid def cancel(self, jid): ''' ''' with self._mutex: # job should exist job = self.get_job(jid) # job should be running if jid not in self._jobs_running: raise JobManagerError('job `%i` is not running' % jid) # ask the job to stop it's execution self._jobs[jid]._cancelled = True self._jobs[jid]._cancel() # do NOT execute _cleanup() here !!! def list(self): ''' ''' with self._mutex: pending = [] running = [] crashed = [] finished = [] orphaned = [] for jid, job in self._jobs.iteritems(): s = (jid, job.get_type()) if jid in self._jobs_pending: pending.append(s) elif jid in self._jobs_running: running.append(s) elif jid in self._jobs_crashed: crashed.append(s) elif jid in self._jobs_finished: finished.append(s) else: orphaned.append(s) return { 'pending' : pending, 'running' : running, 'crashed' : crashed, 'finished' : finished, 'orphaned' : orphaned, } def schedule_immediate(self, jid): ''' ''' debug('JobManager.schedule_immediate: id=`%i`', jid) with self._mutex: # job should exist job = self.get_job(jid) # job should be pending execution if jid not in self._jobs_pending: raise JobManagerError('job `%i` not prepared for execution'%jid) # execute job self._jobs_running.append(jid) self._jobs_pending.remove(jid) job.daemon = True job.start() def is_job(self, jid): ''' ''' with self._mutex: return jid in self._jobs def get_job(self, jid): ''' ''' with self._mutex: if self.is_job(jid): return self._jobs[jid] else: raise JobManagerError('unknown job ID `%i`' % jid) def is_job_pending(self, jid): ''' ''' with self._mutex: if not self.is_job(jid): raise JobManagerError('unknown job ID `%i`' % jid) return jid in self._jobs_pending def is_job_running(self, jid): ''' ''' with self._mutex: if not self.is_job(jid): raise JobManagerError('unknown job ID `%i`' % jid) return jid in self._jobs_running def is_job_crashed(self, jid): ''' ''' with self._mutex: if not self.is_job(jid): raise JobManagerError('unknown job ID `%i`' % jid) return jid in self._jobs_crashed def is_job_finished(self, jid): ''' ''' with self._mutex: if not self.is_job(jid): raise JobManagerError('unknown job ID `%i`' % jid) return jid in self._jobs_finished def is_job_cancelled(self, jid): ''' ''' with self._mutex: job = self.get_job(jid) return job._cancelled is True class JobLog(): ''' ''' def __init__(self): ''' ''' self._items = [] self._mutex = RLock() def __str__(self): ''' ''' res = "" for tup in self._items: res += "[%s] %s\n" % (datetime.fromtimestamp(tup[0]).strftime( "%Y-%m-%d %H:%M:%S"), tup[1]) return res def append(self, message): ''' ''' # FIXME add limit to buffer debug('JobLog.append: %s', message) with self._mutex: self._items.append((time.time(), message)) class BaseJob(Thread, object): ''' ''' def __init__(self, manager): ''' ''' super(BaseJob, self).__init__() self._manager = manager self._log = JobLog() # job state self._type = self.__class__.__name__ self._validated = None self._ready_run = False self._cancelled = False # store job in manager self._jid = self._manager.append(self) self.name = '%s-%i' % (self._type, self._jid) self._log.append('stored with id `%i` and type `%s`' % (self._jid, self._type)) def _validate(self): ''' To be implemented in derivate class ''' return False def _prepare(self): ''' To be implemented in derivate class ''' return False def _cancel(self): ''' To be implemented in derivate class, or not ''' pass def _cleanup(self): ''' To be implemented in derivate class, or not ''' pass def _job(self): ''' To be implemented in derivate class ''' self._log.append('nothing to do') raise JobError('empty job') def is_ready(self): ''' ''' return self._ready_run is True def get_id(self): ''' ''' return self._jid def get_log(self): ''' ''' return str(self._log) def get_type(self): ''' ''' return self._type def start_now(self): ''' ''' self._manager.schedule_immediate(self._jid) def run(self): ''' ''' try: if not self._ready_run: self.prepare() self._log.append("running") self._job() self._log.append("finished") self._manager._job_finished(self._jid) except Exception as err: self._log.append("*** CRASHED: `%r`: `%s`" % (err, err)) self._manager._job_crashed(self._jid) def prepare(self): ''' ''' if not self._ready_run: # run validation if not done yet if self._validated is None: try: self._validated = self._validate() except: self._validated = False finally: # do not continue if validation fails if not self._validate: self._log.append('validation failed') raise JobError('validation failed') # run validation in derivated Job class try: ready = self._prepare() except: ready = False finally: self._ready_run = ready # do not continue if preparation fails if not ready: self._log.append('preparation failed') raise JobError('preparation failed') else: self._log.append('ready for execution') class XferJob(BaseJob): ''' ''' TYPES = [ 'SendFileJob', 'ReceiveFileJob', ] # tcp port range used for transferts TCP_PORT_MIN = 42000 TCP_PORT_MAX = 43999 def __init__(self, manager, path): ''' ''' self._path = path self._md5 = md5() self._checksum = None super(XferJob, self).__init__(manager) def _validate(self): ''' ''' return self._type in self.TYPES def get_checksum(self): return self._checksum #TODO rewrite with the _cancel() and _cleanup() methods class SendFileJob(XferJob): ''' ''' def __init__(self, manager, path, remote_host, remote_port): ''' ''' self._host = remote_host self._local_port = int(remote_port) super(SendFileJob, self).__init__(manager, path) def _validate(self): ''' ''' return ( super(SendFileJob, self)._validate() and os.path.isfile(self._path) and os.access(self._path, os.F_OK | os.R_OK ) and self._local_port >= self.TCP_PORT_MIN and self._local_port <= self.TCP_PORT_MAX and isinstance(self._host, str)) def _prepare(self): ''' ''' # create socket self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) return bool(self._sock) def _job(self): ''' ''' fd = None try: # try to connect self._log.append('connecting to `%s`:`%i`' % (self._host, self._local_port)) self._sock.settimeout(10) retry = 0 connected = False while retry < 6 and not connected and not self._cancelled: retry += 1 try: self._sock.connect((self._host, self._local_port)) except socket.timeout: pass except socket.error: time.sleep(1) pass else: connected = True if self._cancelled: self._log.append('stopped connect attempt (job was cancelled)') elif connected: self._log.append('connected') # open file self._log.append('opening source file `%s`' % self._path) fd = open(self._path, 'rb') # read and send file self._log.append('sending data') while True and not self._cancelled: buf = fd.read(4096) if not buf: break self._sock.send(buf) self._md5.update(buf) # report transfert completion if self._cancelled: self._log.append('transfert aborted (job was cancelled)') else: self._log.append('no more data to send') self._checksum = self._md5.hexdigest() self._md5 = None self._log.append('transfered data checksum = `%s`' % self._checksum) else: self._log.append('remote host did not accept connection, ' 'aborting emission') except: #traceback.print_exc(file=sys.stdout) raise SendFileJobError('FIXME error reporting') finally: # close socket and files anyway try: self._sock.close() except: pass try: fd.close() except: pass #TODO rewrite with the _cancel() and _cleanup() methods class ReceiveFileJob(XferJob): ''' ''' _ports_used_local = [] _ports_used_mutex = RWLock() def __init__(self, manager, path, create=False): ''' ''' self._create = create self._local_port = None super(ReceiveFileJob, self).__init__(manager, path) def _validate(self): ''' ''' return ( super(ReceiveFileJob, self)._validate() and ( ( not self._create and os.path.isfile(self._path) and os.access(self._path, os.F_OK | os.W_OK )) or ( self._create and os.path.isdir(os.path.dirname(self._path)) and os.access(os.path.dirname(self._path), os.F_OK |os.W_OK) ) )) def _prepare(self): ''' ''' # create socket self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # bind socket to a port for port in range(self.TCP_PORT_MIN, self.TCP_PORT_MAX): # find a port to bind to try: with self._ports_used_mutex.write: if port not in self._ports_used_local: self._sock.bind(('', port)) self._local_port = port self._ports_used_local.append(port) break except socket.error: pass if not self._local_port: msg = 'no port to listen to' self._log.append(msg) raise ReceiveFileJobError(msg) return True def _job(self): ''' ''' fd = None try: self._sock.listen(1) self._sock.settimeout(1) # wait for connection self._log.append('waiting for connection on port `%i`' % self._local_port) retry = 0 conn = None while retry < 60 and not conn and not self._cancelled: retry += 1 try: conn, addr = self._sock.accept() except socket.timeout as err: pass if self._cancelled: self._log.append('stopped waiting (job was cancelled)') if conn: self._log.append('client connected from `%s`') # open file self._log.append('opening destination `%s`' % self._path) fd = open(self._path, 'wb') # write to the file self._log.append('receiving data') while True and not self._cancelled: buf = conn.recv(4096) if not buf: break fd.write(buf) self._md5.update(buf) # report transfert completion if self._cancelled: self._log.append('transfert aborted (job was cancelled)') else: self._log.append('remote host closed connection') self._checksum = self._md5.hexdigest() self._md5 = None self._log.append('transfered data checksum = `%s`' % self._checksum) else: self._log.append('no connection received, aborting reception') except: #traceback.print_exc(file=sys.stdout) raise ReceiveFileJobError('FIXME error reporting') finally: # close socket and files anyway try: self._sock.close() except: pass with self._ports_used_mutex.write: if self._local_port in self._ports_used_local: self._ports_used_local.remove(self._local_port) try: fd.close() except: pass def get_port(self): ''' ''' return int(self._local_port) class DrbdCopyJob(BaseJob): ''' ''' def __init__(self, manager, lvm, drbd_pool, source_vg, source_lv): ''' ''' self._mutex_step = Lock() self._lvm = lvm self._pool = drbd_pool self._source_vgname = source_vg self._source_lvname = source_lv super(DrbdCopyJob, self).__init__(manager) self._drbd = self._pool.new_device() # checkpoint events self._event_connected_ready = Event() self._event_connected = Event() self._event_rolechosen_ready = Event() self._event_rolechosen = Event() self._event_sleep_ready = Event() self._event_sleep = Event() def _cancel(self): ''' ''' # release some pending calls self._event_connected_ready.set() self._event_connected.set() self._event_rolechosen_ready.set() self._event_rolechosen.set() self._event_sleep_ready.set() self._event_sleep.set() # disconnect the node self._drbd.disconnect() def _cleanup(self): ''' ''' # destroy drbd device try: self._log.append('destroying the drbd device') self._drbd.destroy() except Exception as err: self._log.append('failed to destroy the drbd device') else: self._log.append('successfully destroyed the drbd device') # we wait for any potential job step to finish # should be quick after the drbd destruction (kills all waits) with self._mutex_step: # destroy drbd meta try: self._log.append('removing the drbd meta volume') self._lvm.get_vg(self._source_vgname).remove(self._meta_lvname) except Exception as err: self._log.append('failed to remove the drbd meta volume') else: self._log.append('successfully removed the drbd meta volume') # remove source LV copy try: self._log.append('removing the DM device copy') self._lvm.dm_remove(self._copy_dm) except Exception as err: self._log.append('failed to remove the DM device copy') else: self._log.append('successfully removed DM device copy') def _prepare(self): ''' ''' self._lvm.reload() # get source LV handle try: lv = self._lvm.get_lv(self._source_vgname, self._source_lvname) except: msg = 'failed to fetch LV `%s/%s`' % (self._source_vgname, self._source_lvname) self._log.append(msg) raise DrbdCopyJobError(msg) # get source LV infos self._source_path = lv.path self._source_size = lv.size self._source_dm = lv.dm_name() self._source_table = lv.dm_table() self._drbd_table = None # we got here, so all went fine # just a safety return lv.size > 0 def _job(self): ''' ''' # checkpoint if self._cancelled: return self._lvm.reload() with self._mutex_step: # create drbd meta try: self._log.append('creating a drbd meta volume') # now let's create a meta device for DRBD in the same VG # MS = C/32768 + 4MiB && MS >= 128MiB # http://www.drbd.org/users-guide/ch-internals.html self._meta_lvname = '%s.drbdmeta' % self._source_lvname self._meta_path = '/dev/%s/%s' % (self._source_vgname, self._meta_lvname) self._meta_size = int(max(self._source_size/32768 + 4 * 2**20, 128 * 2**20)) self._lvm.get_vg(self._source_vgname).create(self._meta_lvname, self._meta_size) self._lvm.reload() self._meta_dm = self._lvm.get_lv(self._source_vgname, self._meta_lvname).dm_name() except Exception as err: self._log.append('failed to create a drbd meta volume') raise err else: self._log.append('successfully created the drbd meta volume' ' `%s`' % self._meta_path) # checkpoint if self._cancelled: return with self._mutex_step: # create a copy of the source LV try: self._log.append('creating a device copy of `%s`' % self._source_path) self._copy_dm = '%s.%s.copy' % (self._source_vgname, self._source_lvname) self._copy_path = '/dev/mapper/%s' % self._copy_dm self._lvm.dm_create(self._copy_dm, self._source_table) except Exception as err: self._log.append('failed to create a device copy') raise err else: self._log.append('successfully copied device as `%s`' % self._copy_path) # checkpoint if self._cancelled: return with self._mutex_step: # setup the drbd device try: self._log.append('configuring the drbd device `%s`' % self._drbd.get_path()) self._drbd.setup(self._copy_path, self._meta_path) self._drbd_table = '0 %d linear %s 0' % (self._source_size / 512 , self._drbd.get_path()) except Exception as err: self._log.append('failed to configure the drbd device') raise err else: self._log.append('successfully configured the drbd device') # blocking checkpoint # wait for call to connect() if self._cancelled: return self._event_connected_ready.set() self._event_connected.wait() if self._cancelled: return with self._mutex_step: # wait for a connection with the remote host try: self._log.append('waiting for node connection') self._drbd.wait_connection() except Exception as err: self._log.append('failed to establish a connection') raise err else: self._log.append('connected with a remote node') # blocking checkpoint # wait for call to role() if self._cancelled: return self._event_rolechosen_ready.set() self._event_rolechosen.wait() if self._cancelled: return # wait for the sync completion try: self._log.append('waiting for initial device sync') self._drbd.wait_sync() except Exception as err: import traceback traceback.print_exc() self._log.append('failed to sync the DRBD') raise err else: self._log.append('DRBD is sync\'ed !') # sleep until the DRBD is shutdown self._log.append('job entering sleep') self._event_sleep_ready.set() self._event_sleep.wait() self._log.append('job leaving sleep') def drbd_get_port(self): ''' ''' return self._drbd.get_port() def drbd_connect(self, remote_addr, remote_port): ''' ''' if not self._event_connected.is_set(): if not self._event_connected_ready.is_set(): self._log.append('can\'t connect right now, waiting for ' 'initialization to complete...') self._event_connected_ready.wait() with self._mutex_step: # set endpoint self._drbd.connect(remote_addr, remote_port) # unlock any connection wait self._event_connected.set() else: raise DrbdCopyJobError('already connected') def drbd_role(self, primary=None): ''' ''' if self._event_connected.is_set(): self._event_rolechosen_ready.wait() if primary is True or primary is False: # change node role if primary: self._log.append('switching to primary mode') self._drbd.primary() else: self._log.append('switching to secondary mode') self._drbd.secondary() # unlock any role change wait self._event_rolechosen.set() else: raise DrbdCopyJobError('cannot change role, not connected yet') def drbd_waitsync(self): ''' ''' self._log.append('waiting for sync') # wait for initial sync and setup to be completed self._event_sleep_ready.wait() # wait for additionnal DRBD sync with self._mutex_step: self._drbd.wait_sync() def drbd_takeover(self, state): ''' ''' # FIXME comment that # FIXME check events with self._mutex_step: # if state is True: if self._drbd_table is not None: table = self._drbd_table else: raise DrbdCopyJobError('no DRBD table available yet') else: table = self._source_table # self._lvm.dm_set_table(self._source_dm, table) def drbd_status(self): ''' ''' return self._drbd.status() class TCPTunnelJob(BaseJob): ''' ''' # tcp port range used for tunnels TCP_PORT_MIN = 44000 TCP_PORT_MAX = 45999 # global port list _ports_used = [] _ports_used_mutex = RWLock() def __init__(self, manager): ''' ''' super(TCPTunnelJob, self).__init__(manager) self._mutex = Lock() self._event_client_conf = Event() self._event_server_conf = Event() self._event_server_bound = Event() self._server_port = None # set here because of _cancel() self._sock_client = None self._sock_server_listen = None self._sock_server = None def _cancel(self): ''' ''' # explicitely close connections, to release blocking socket calls if self._sock_client: try: self._sock_client.close() except: pass if self._sock_server_listen: try: self._sock_server_listen.close() except: pass if self._sock_server: try: self._sock_server.close() except: pass def _cleanup(self): ''' ''' # unlock events self._event_client_conf.set() self._event_server_conf.set() self._event_server_bound.set() # destroy sockets self._sock_client = None self._sock_server_listen = None self._sock_server = None def _validate(self): ''' ''' return True # FIXME maybe def _prepare(self): ''' ''' return True # FIXME maybe def _job(self): ''' ''' try: # wait for tunnel configuration for server if not self._cancelled: self._log.append('waiting for tunnel server configuration') self._event_server_conf.wait() ## setup server # create listening socket self._sock_server_listen = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # bind listening socket to a port self._server_port = None for port in range(self.TCP_PORT_MIN, self.TCP_PORT_MAX): # find a port to bind to try: with self._ports_used_mutex.write: if port not in self._ports_used: addr = '127.0.0.1' if self._server_local else '0.0.0.0' self._sock_server_listen.bind((addr, port)) #FIXME self._server_port = port self._ports_used.append(port) break except socket.error: pass if not self._server_port: msg = 'no port to listen to' self._log.append(msg) raise TCPTunnelJobError(msg) else: self._event_server_bound.set() self._log.append('server bound to `%s:%d`' % (addr, self._server_port)) # wait for tunnel configuration for client if not self._cancelled: self._log.append('waiting for tunnel client configuration') self._event_client_conf.wait() # loop and reconnect forever untill cancellation while not self._cancelled: ## wait for a client to connect self._log.append('waiting for a client to connect') self._sock_server_listen.settimeout(1) self._sock_server_listen.listen(1) self._sock_server = None while not self._cancelled and self._sock_server is None: try: self._sock_server, local_addr = self._sock_server_listen.accept() self._log.append('client connected from `%s`' % (local_addr,)) except socket.timeout: pass if self._sock_server is None: break # job cancelled ## connect to the endpoint timeout = 15 # FIXME must be argument retry_interval = 5 self._sock_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock_client.settimeout(timeout) connected = False while not self._cancelled and not connected: try: self._log.append('connecting to `%s`' % (self._endpoint,)) self._sock_client.connect(self._endpoint) connected = True except socket.timeout: self._log.append('no response after %d seconds' % timeout) except socket.error: self._log.append('socket error (connection refused ?)') time.sleep(retry_interval) if self._cancelled: break if not connected: continue ## tunnel data between sockets # init socket poller mask_ro = select.EPOLLIN ^ select.EPOLLERR ^ select.EPOLLHUP mask_rw = mask_ro ^ select.EPOLLOUT poller = select.epoll() poller.register(self._sock_server, mask_ro) poller.register(self._sock_client, mask_ro) # forward data until a connection is closed buf_server = '' buf_client = '' empty_buf = 0 connected = True while not self._cancelled and connected: # wait for events on both sockets poll = poller.poll() for fd, events in poll: # events are for server socket if fd == self._sock_server.fileno(): # read available if events & select.EPOLLIN: # read incoming data try: read = self._sock_server.recv(4096) except socket.error: connected = False if not len(read): empty_buf += 1 else: empty_buf = 0 buf_server += read # set the other socket to notify us when it's # available for writing if len(buf_server): poller.modify(self._sock_client, mask_rw) # write available if events & select.EPOLLOUT: # try to send the whole buffer try: sent = self._sock_server.send(buf_client) except socket.error: connected = False # drop sent data from the buffer buf_client = buf_client[sent:] # if the buffer becomes empty, stop write polling if not len(buf_client): poller.modify(self._sock_server, mask_ro) # events for client socket else: # read available if events & select.EPOLLIN: # read incoming data try: read = self._sock_client.recv(4096) except socket.error: connected = False if not len(read): empty_buf += 1 else: empty_buf = 0 buf_client += read # set the other socket to notify us when it's # available for writing if len(buf_client): poller.modify(self._sock_server, mask_rw) # write available if events & select.EPOLLOUT: # try to send the whole buffer try: sent = self._sock_client.send(buf_server) except socket.error: connected = False # drop sent data from the buffer buf_server = buf_server[sent:] # if the buffer becomes empty, stop write polling if not len(buf_server): poller.modify(self._sock_client, mask_ro) if empty_buf >= 10: connected = False if connected is False: self._log.append('disconnected') try: self._sock_server.close() except: pass try: self._sock_client.close() except: pass except Exception as err: traceback.print_exc() raise err def tunnel_get_server_port(self): ''' ''' self._event_server_bound.wait() return self._server_port def tunnel_connect(self, endpoint): ''' ''' with self._mutex: if not self._event_client_conf.is_set(): self._endpoint = endpoint self._event_client_conf.set() else: raise TCPTunnelJobError('remote endpoint already set') def tunnel_listen(self, local): ''' ''' with self._mutex: if not self._event_server_conf.is_set(): self._server_local = local is True self._event_server_conf.set() else: raise TCPTunnelJobError('server parameters already set')