# -*- coding: utf-8 -*- import os, time, socket from datetime import datetime from threading import Thread, RLock from utils import RWLock from errors import (JobManagerError, JobError, XferJobError, ReceiveFileJobError, SendFileJobError) #fixme import traceback, sys class JobManager(Thread, object): ''' ''' MAX_JID = 65535 def __init__(self, hypervisor): ''' ''' super(JobManager, self).__init__() self._mutex = RLock() self._next_jid = 1 self._jobs = {} self._jobs_pending = [] self._jobs_running = [] self._jobs_crashed = [] self._jobs_canceled = [] self._jobs_finished = [] def _job_crashed(self, jid): ''' ''' with self._mutex: # job should exist if jid not in self._jobs: raise JobManagerError('unknown job ID `%i`' % jid) # move job to the crashed queue self._jobs_crashed.append(jid) # clean any queue that may contain the job for queue in [self._jobs_pending, self._jobs_running, self._jobs_canceled, self._jobs_finished]: if jid in queue: queue.remove(jid) def _job_finished(self, jid): with self._mutex: # job should exist if jid not in self._jobs: raise JobManagerError('unknown job ID `%i`' % jid) # move job to the finished queue self._jobs_finished.append(jid) # remove from running queue self._jobs_running.remove(jid) def append(self, job): ''' ''' with self._mutex: jid = self._next_jid self._next_jid += 1 self._jobs[jid] = job self._jobs_pending.append(jid) return jid def run(self): ''' ''' while True: pass def schedule_immediate(self, jid): ''' ''' with self._mutex: # job should exist if jid not in self._jobs: raise JobManagerError('unknown job ID `%i`' % jid) # job should be pending execution if jid not in self._jobs_pending: raise JobManagerError('job `%i` not prepared for execution' % jid) # execute job self._jobs_running.append(jid) self._jobs_pending.remove(jid) self._jobs[jid].start() def get_job(self, jid): ''' ''' with self._mutex: if self.is_job(jid): return self._jobs[jid] else: raise JobManagerError('unknown job ID `%i`' % jid) def is_job(self, jid): ''' ''' with self._mutex: return jid in self._jobs def is_job_running(self, jid): ''' ''' with self._mutex: if not self.is_job(jid): raise JobManagerError('unknown job ID `%i`' % jid) return jid in self._jobs_running def is_job_finished(self, jid): ''' ''' with self._mutex: if not self.is_job(jid): raise JobManagerError('unknown job ID `%i`' % jid) return jid in self._jobs_finished class JobLog(): ''' ''' def __init__(self): ''' ''' self._items = [] self._mutex = RLock() def __str__(self): ''' ''' res = "" for tup in self._items: res += "[%s] %s\n" % (datetime.fromtimestamp(tup[0]).strftime( "%Y-%m-%d %H:%M:%S"), tup[1]) return res def append(self, message): ''' ''' print "JobLog : %s" % message with self._mutex: self._items.append((time.time(), message)) class BaseJob(Thread, object): ''' ''' def __init__(self, manager): ''' ''' super(BaseJob, self).__init__() self._manager = manager self._log = JobLog() # job state self._type = self.__class__.__name__ self._validated = None self._ready_run = False # store job in manager self._jid = self._manager.append(self) self._log.append('stored with id `%i` and type `%s`' % (self._jid, self._type)) def _validate(self): ''' To be implemented in derivate class ''' return False def _prepare(self): ''' To be implemented in derivate class ''' return False def _job(self): ''' To be implemented in derivate class ''' self._log.append('nothing to do') raise JobError('empty job') def is_ready(): ''' ''' return self._ready_run is True def get_id(self): ''' ''' return self._jid def get_log(self): ''' ''' return str(self._log) def start_now(self): ''' ''' self._manager.schedule_immediate(self._jid) def run(self): ''' ''' try: if not self._ready_run: self.prepare() self._log.append("running") self._job() self._log.append("finished") self._manager._job_finished(self._jid) except Exception as err: self._log.append("crashed: `%r`: `%s`" % (err, err)) self._manager._job_crashed(self._jid) def prepare(self): ''' ''' if not self._ready_run: # run validation if not done yet if self._validated is None: try: self._validated = self._validate() except: self._validated = False finally: # do not continue if validation fails if not self._validate: self._log.append('validation failed') raise JobError('validation failed') # run validation in derivated Job class try: ready = self._prepare() except: ready = False finally: self._ready_run = ready # do not continue if preparation fails if not ready: self._log.append('preparation failed') raise JobError('preparation failed') else: self._log.append('ready for execution') class BlowJob(BaseJob): ''' ''' def __init__(self, manager): raise Exception('I LOVE MY KITTY') class XferJob(BaseJob): ''' ''' TYPES = [ 'SendFileJob', 'ReceiveFileJob', ] # tcp port range used for transferts TCP_PORT_MIN = 42000 TCP_PORT_MAX = 42999 def __init__(self, manager, path): ''' ''' self._path = path self._csum = None super(XferJob, self).__init__(manager) def _validate(self): ''' ''' return self._type in self.TYPES def get_checksum(self): # FIXME lol return '625f4986bccba2dbd70cdd9ef80af787' class SendFileJob(XferJob): ''' ''' def __init__(self, manager, path, remote_host, remote_port): ''' ''' self._host = remote_host self._port = int(remote_port) super(SendFileJob, self).__init__(manager, path) def _validate(self): ''' ''' return ( super(SendFileJob, self)._validate() and os.path.isfile(self._path) and os.access(self._path, os.F_OK | os.R_OK ) and self._port >= self.TCP_PORT_MIN and self._port <= self.TCP_PORT_MAX and isinstance(self._host, str)) def _prepare(self): ''' ''' # create socket self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) return bool(self._sock) def _job(self): ''' ''' fd = None try: # try to connect self._log.append('connecting to `%s`:`%i`' % (self._host, self._port)) self._sock.connect((self._host, self._port)) self._log.append('connected') # open file self._log.append('opening source file `%s`' % self._path) fd = open(self._path, 'rb') # read and send file self._log.append('sending data') while True: buf = fd.read(4096) if not buf: break self._sock.send(buf) except: traceback.print_exc(file=sys.stdout) raise SendFileJobError('FIXME error reporting') finally: # close socket and files anyway try: self._sock.close() except: pass if fd: try: fd.close() except: pass class ReceiveFileJob(XferJob): ''' ''' _ports_used = [] _ports_used_mutex = RWLock() def __init__(self, manager, path, create=False): ''' ''' self._create = create self._port = None super(ReceiveFileJob, self).__init__(manager, path) def _validate(self): ''' ''' return ( super(ReceiveFileJob, self)._validate() and ( ( not self._create and os.path.isfile(self._path) and os.access(self._path, os.F_OK | os.W_OK )) or ( self._create and os.path.isdir(os.path.dirname(self._path)) and os.access(os.path.dirname(self._path), os.F_OK |os.W_OK) ) )) def _prepare(self): ''' ''' # create socket self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # bind socket to a port for port in range(self.TCP_PORT_MIN, self.TCP_PORT_MAX): # find a port to bind to try: with self._ports_used_mutex.write: if port not in self._ports_used: self._sock.bind(('', port)) self._port = port self._ports_used.append(port) break except socket.error: pass if not self._port: self._log.append('no port to listen to') raise ReceiveFileJobError('no port to listen to') return True def _job(self): ''' ''' fd = None try: self._sock.listen(1) # wait for connection self._log.append('waiting for connection on port `%i`' % self._port) conn, addr = self._sock.accept() if conn: self._log.append('client connected from `%s`') # open file self._log.append('opening destination `%s`' % self._path) fd = open(self._path, 'wb') # write to the file self._log.append('receiving data') while True: buf = conn.recv(4096) if not buf: break fd.write(buf) self._log.append('remote host closed connection') else: self._log.append('no connection received, aborting reception') except: traceback.print_exc(file=sys.stdout) raise ReceiveFileJobError('FIXME error reporting') finally: # close socket and files anyway try: self._sock.close() except: pass if fd: try: fd.close() except: pass def get_port(self): ''' ''' return int(self._port) if __name__ == '__main__': jm = JobManager(None) jm.start() #r = ReceiveFileJob(jm, '/tmp/receive') #r.start_now() s = SendFileJob(jm, '/etc/hosts', '127.0.0.1', '42000') s.start_now()