# -*- coding: utf-8 -*- import os, time, socket from hashlib import md5 from datetime import datetime from threading import Thread, RLock from logging import debug from utils import RWLock from errors import (JobManagerError, JobError, XferJobError, ReceiveFileJobError, SendFileJobError) #fixme import traceback, sys class JobManager(Thread, object): ''' ''' MAX_JID = 65535 def __init__(self, hypervisor): ''' ''' super(JobManager, self).__init__() self._mutex = RLock() self._next_jid = 1 # job objects indexed by job ID self._jobs = {} # new jobs IDs self._jobs_pending = [] # running jobs IDs self._jobs_running = [] # dead jobs IDs self._jobs_crashed = [] self._jobs_finished = [] def _job_crashed(self, jid): ''' ''' debug('JobManager._job_crashed: id=`%i`' % jid) with self._mutex: # job should exist if jid not in self._jobs: raise JobManagerError('unknown job ID `%i`', jid) # move job to the crashed queue self._jobs_crashed.append(jid) # clean any queue that may contain the job for queue in [self._jobs_pending, self._jobs_running, self._jobs_finished]: if jid in queue: queue.remove(jid) def _job_finished(self, jid): debug('JobManager._job_finished: id=`%i`' % jid) with self._mutex: # job should exist if jid not in self._jobs: raise JobManagerError('unknown job ID `%i`', jid) # move job to the finished queue self._jobs_finished.append(jid) # remove from running queue self._jobs_running.remove(jid) def run(self): ''' ''' while True: pass def append(self, job): ''' ''' with self._mutex: jid = self._next_jid self._next_jid += 1 self._jobs[jid] = job self._jobs_pending.append(jid) debug('JobManager.append: new job pending with id `%i`', jid) return jid def cancel(self, jid): ''' ''' with self._mutex: # job should exist if jid not in self._jobs: raise JobManagerError('unknown job ID `%i`' % jid) # job should be running if jid not in self._jobs_running: raise JobManagerError('job `%i` is not running' % jid) # ask the job to stop it's execution self._jobs[jid]._cancelled = True def schedule_immediate(self, jid): ''' ''' debug('JobManager.schedule_immediate: id=`%i`', jid) with self._mutex: # job should exist if jid not in self._jobs: raise JobManagerError('unknown job ID `%i`' % jid) # job should be pending execution if jid not in self._jobs_pending: raise JobManagerError('job `%i` not prepared for execution'%jid) # execute job self._jobs_running.append(jid) self._jobs_pending.remove(jid) self._jobs[jid].start() def is_job(self, jid): ''' ''' with self._mutex: return jid in self._jobs def get_job(self, jid): ''' ''' with self._mutex: if self.is_job(jid): return self._jobs[jid] else: raise JobManagerError('unknown job ID `%i`' % jid) def is_job_pending(self, jid): ''' ''' with self._mutex: if not self.is_job(jid): raise JobManagerError('unknown job ID `%i`' % jid) return jid in self._jobs_pending def is_job_running(self, jid): ''' ''' with self._mutex: if not self.is_job(jid): raise JobManagerError('unknown job ID `%i`' % jid) return jid in self._jobs_running def is_job_crashed(self, jid): ''' ''' with self._mutex: if not self.is_job(jid): raise JobManagerError('unknown job ID `%i`' % jid) return jid in self._jobs_crashed def is_job_finished(self, jid): ''' ''' with self._mutex: if not self.is_job(jid): raise JobManagerError('unknown job ID `%i`' % jid) return jid in self._jobs_finished def is_job_cancelled(self, jid): ''' ''' with self._mutex: job = self.get_job(jid) return job._cancelled is True class JobLog(): ''' ''' def __init__(self): ''' ''' self._items = [] self._mutex = RLock() def __str__(self): ''' ''' res = "" for tup in self._items: res += "[%s] %s\n" % (datetime.fromtimestamp(tup[0]).strftime( "%Y-%m-%d %H:%M:%S"), tup[1]) return res def append(self, message): ''' ''' debug('JobLog.append: %s', message) with self._mutex: self._items.append((time.time(), message)) class BaseJob(Thread, object): ''' ''' def __init__(self, manager): ''' ''' super(BaseJob, self).__init__() self._manager = manager self._log = JobLog() # job state self._type = self.__class__.__name__ self._validated = None self._ready_run = False self._cancelled = False # store job in manager self._jid = self._manager.append(self) self.name = '%s-%i' % (self._type, self._jid) self._log.append('stored with id `%i` and type `%s`' % (self._jid, self._type)) def _validate(self): ''' To be implemented in derivate class ''' return False def _prepare(self): ''' To be implemented in derivate class ''' return False def _job(self): ''' To be implemented in derivate class ''' self._log.append('nothing to do') raise JobError('empty job') def is_ready(): ''' ''' return self._ready_run is True def get_id(self): ''' ''' return self._jid def get_log(self): ''' ''' return str(self._log) def start_now(self): ''' ''' self._manager.schedule_immediate(self._jid) def run(self): ''' ''' try: if not self._ready_run: self.prepare() self._log.append("running") self._job() self._log.append("finished") self._manager._job_finished(self._jid) except Exception as err: self._log.append("crashed: `%r`: `%s`" % (err, err)) self._manager._job_crashed(self._jid) def prepare(self): ''' ''' if not self._ready_run: # run validation if not done yet if self._validated is None: try: self._validated = self._validate() except: self._validated = False finally: # do not continue if validation fails if not self._validate: self._log.append('validation failed') raise JobError('validation failed') # run validation in derivated Job class try: ready = self._prepare() except: ready = False finally: self._ready_run = ready # do not continue if preparation fails if not ready: self._log.append('preparation failed') raise JobError('preparation failed') else: self._log.append('ready for execution') class XferJob(BaseJob): ''' ''' TYPES = [ 'SendFileJob', 'ReceiveFileJob', ] # tcp port range used for transferts TCP_PORT_MIN = 42000 TCP_PORT_MAX = 42999 def __init__(self, manager, path): ''' ''' self._path = path self._md5 = md5() self._checksum = None super(XferJob, self).__init__(manager) def _validate(self): ''' ''' return self._type in self.TYPES def get_checksum(self): return self._checksum class SendFileJob(XferJob): ''' ''' def __init__(self, manager, path, remote_host, remote_port): ''' ''' self._host = remote_host self._port = int(remote_port) super(SendFileJob, self).__init__(manager, path) def _validate(self): ''' ''' return ( super(SendFileJob, self)._validate() and os.path.isfile(self._path) and os.access(self._path, os.F_OK | os.R_OK ) and self._port >= self.TCP_PORT_MIN and self._port <= self.TCP_PORT_MAX and isinstance(self._host, str)) def _prepare(self): ''' ''' # create socket self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) return bool(self._sock) def _job(self): ''' ''' fd = None try: # try to connect self._log.append('connecting to `%s`:`%i`' % (self._host, self._port)) self._sock.settimeout(10) retry = 0 connected = False while retry < 6 and not connected and not self._cancelled: retry += 1 try: self._sock.connect((self._host, self._port)) except socket.timeout: pass except socket.error: time.sleep(1) pass else: connected = True if self._cancelled: self._log.append('stopped connect attempt (job was cancelled)') elif connected: self._log.append('connected') # open file self._log.append('opening source file `%s`' % self._path) fd = open(self._path, 'rb') # read and send file self._log.append('sending data') while True and not self._cancelled: buf = fd.read(4096) if not buf: break self._sock.send(buf) self._md5.update(buf) # report transfert completion if self._cancelled: self._log.append('transfert aborted (job was cancelled)') else: self._log.append('no more data to send') self._checksum = self._md5.hexdigest() self._md5 = None self._log.append('transfered data checksum = `%s`' % self._checksum) else: self._log.append('remote host did not accept connection, ' 'aborting emission') except: #traceback.print_exc(file=sys.stdout) raise SendFileJobError('FIXME error reporting') finally: # close socket and files anyway try: self._sock.close() except: pass try: fd.close() except: pass class ReceiveFileJob(XferJob): ''' ''' _ports_used = [] _ports_used_mutex = RWLock() def __init__(self, manager, path, create=False): ''' ''' self._create = create self._port = None super(ReceiveFileJob, self).__init__(manager, path) def _validate(self): ''' ''' return ( super(ReceiveFileJob, self)._validate() and ( ( not self._create and os.path.isfile(self._path) and os.access(self._path, os.F_OK | os.W_OK )) or ( self._create and os.path.isdir(os.path.dirname(self._path)) and os.access(os.path.dirname(self._path), os.F_OK |os.W_OK) ) )) def _prepare(self): ''' ''' # create socket self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # bind socket to a port for port in range(self.TCP_PORT_MIN, self.TCP_PORT_MAX): # find a port to bind to try: with self._ports_used_mutex.write: if port not in self._ports_used: self._sock.bind(('', port)) self._port = port self._ports_used.append(port) break except socket.error: pass if not self._port: msg = 'no port to listen to' self._log.append(msg) raise ReceiveFileJobError(msg) return True def _job(self): ''' ''' fd = None try: self._sock.listen(1) self._sock.settimeout(1) # wait for connection self._log.append('waiting for connection on port `%i`' % self._port) retry = 0 conn = None while retry < 60 and not conn and not self._cancelled: retry += 1 try: conn, addr = self._sock.accept() except socket.timeout as err: pass if self._cancelled: self._log.append('stopped waiting (job was cancelled)') if conn: self._log.append('client connected from `%s`') # open file self._log.append('opening destination `%s`' % self._path) fd = open(self._path, 'wb') # write to the file self._log.append('receiving data') while True and not self._cancelled: buf = conn.recv(4096) if not buf: break fd.write(buf) self._md5.update(buf) # report transfert completion if self._cancelled: self._log.append('transfert aborted (job was cancelled)') else: self._log.append('remote host closed connection') self._checksum = self._md5.hexdigest() self._md5 = None self._log.append('transfered data checksum = `%s`' % self._checksum) else: self._log.append('no connection received, aborting reception') except: #traceback.print_exc(file=sys.stdout) raise ReceiveFileJobError('FIXME error reporting') finally: # close socket and files anyway try: self._sock.close() except: pass with self._ports_used_mutex.write: if self._port in self._ports_used: self._ports_used.remove(port) try: fd.close() except: pass def get_port(self): ''' ''' return int(self._port) if __name__ == '__main__': jm = JobManager(None) jm.start() #r = ReceiveFileJob(jm, '/tmp/receive') #r.start_now() s = SendFileJob(jm, '/etc/hosts', '127.0.0.1', '42000') s.start_now()