Skip to content
Snippets Groups Projects
Commit f4553ddc authored by Thibault VINCENT's avatar Thibault VINCENT
Browse files

fix: various improvements in job system

 * md5 hash of transfered file is not faked anymore
 * implemented job cancellation
 * xfer tcp ports were not released, exhaustion was possible
parent b592e804
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
import os, time, socket
from hashlib import md5
from datetime import datetime
from threading import Thread, RLock
from logging import debug
......@@ -22,11 +23,14 @@ class JobManager(Thread, object):
super(JobManager, self).__init__()
self._mutex = RLock()
self._next_jid = 1
# job objects indexed by job ID
self._jobs = {}
# new jobs IDs
self._jobs_pending = []
# running jobs IDs
self._jobs_running = []
# dead jobs IDs
self._jobs_crashed = []
self._jobs_canceled = []
self._jobs_finished = []
def _job_crashed(self, jid):
......@@ -41,7 +45,7 @@ class JobManager(Thread, object):
self._jobs_crashed.append(jid)
# clean any queue that may contain the job
for queue in [self._jobs_pending, self._jobs_running,
self._jobs_canceled, self._jobs_finished]:
self._jobs_finished]:
if jid in queue:
queue.remove(jid)
......@@ -56,6 +60,12 @@ class JobManager(Thread, object):
# remove from running queue
self._jobs_running.remove(jid)
def run(self):
'''
'''
while True:
pass
def append(self, job):
'''
'''
......@@ -67,11 +77,18 @@ class JobManager(Thread, object):
debug('JobManager.append: new job pending with id `%i`', jid)
return jid
def run(self):
def cancel(self, jid):
'''
'''
while True:
pass
with self._mutex:
# job should exist
if jid not in self._jobs:
raise JobManagerError('unknown job ID `%i`' % jid)
# job should be running
if jid not in self._jobs_running:
raise JobManagerError('job `%i` is not running' % jid)
# ask the job to stop it's execution
self._jobs[jid]._cancelled = True
def schedule_immediate(self, jid):
'''
......@@ -83,13 +100,18 @@ class JobManager(Thread, object):
raise JobManagerError('unknown job ID `%i`' % jid)
# job should be pending execution
if jid not in self._jobs_pending:
raise JobManagerError('job `%i` not prepared for execution' %
jid)
raise JobManagerError('job `%i` not prepared for execution'%jid)
# execute job
self._jobs_running.append(jid)
self._jobs_pending.remove(jid)
self._jobs[jid].start()
def is_job(self, jid):
'''
'''
with self._mutex:
return jid in self._jobs
def get_job(self, jid):
'''
'''
......@@ -99,11 +121,13 @@ class JobManager(Thread, object):
else:
raise JobManagerError('unknown job ID `%i`' % jid)
def is_job(self, jid):
def is_job_pending(self, jid):
'''
'''
with self._mutex:
return jid in self._jobs
if not self.is_job(jid):
raise JobManagerError('unknown job ID `%i`' % jid)
return jid in self._jobs_pending
def is_job_running(self, jid):
'''
......@@ -113,6 +137,14 @@ class JobManager(Thread, object):
raise JobManagerError('unknown job ID `%i`' % jid)
return jid in self._jobs_running
def is_job_crashed(self, jid):
'''
'''
with self._mutex:
if not self.is_job(jid):
raise JobManagerError('unknown job ID `%i`' % jid)
return jid in self._jobs_crashed
def is_job_finished(self, jid):
'''
'''
......@@ -120,6 +152,13 @@ class JobManager(Thread, object):
if not self.is_job(jid):
raise JobManagerError('unknown job ID `%i`' % jid)
return jid in self._jobs_finished
def is_job_cancelled(self, jid):
'''
'''
with self._mutex:
job = self.get_job(jid)
return job._cancelled is True
class JobLog():
......@@ -161,8 +200,10 @@ class BaseJob(Thread, object):
self._type = self.__class__.__name__
self._validated = None
self._ready_run = False
self._cancelled = False
# store job in manager
self._jid = self._manager.append(self)
self.name = '%s-%i' % (self._type, self._jid)
self._log.append('stored with id `%i` and type `%s`' % (self._jid,
self._type))
......@@ -249,13 +290,6 @@ class BaseJob(Thread, object):
self._log.append('ready for execution')
class BlowJob(BaseJob):
'''
'''
def __init__(self, manager):
raise Exception('I LOVE MY KITTY')
class XferJob(BaseJob):
'''
'''
......@@ -272,7 +306,8 @@ class XferJob(BaseJob):
'''
'''
self._path = path
self._csum = None
self._md5 = md5()
self._checksum = None
super(XferJob, self).__init__(manager)
def _validate(self):
......@@ -281,8 +316,7 @@ class XferJob(BaseJob):
return self._type in self.TYPES
def get_checksum(self):
# FIXME lol
return '625f4986bccba2dbd70cdd9ef80af787'
return self._checksum
class SendFileJob(XferJob):
......@@ -320,20 +354,49 @@ class SendFileJob(XferJob):
# try to connect
self._log.append('connecting to `%s`:`%i`' % (self._host,
self._port))
self._sock.connect((self._host, self._port))
self._log.append('connected')
# open file
self._log.append('opening source file `%s`' % self._path)
fd = open(self._path, 'rb')
# read and send file
self._log.append('sending data')
while True:
buf = fd.read(4096)
if not buf:
break
self._sock.send(buf)
self._sock.settimeout(10)
retry = 0
connected = False
while retry < 6 and not connected and not self._cancelled:
retry += 1
try:
self._sock.connect((self._host, self._port))
except socket.timeout:
pass
except socket.error:
time.sleep(1)
pass
else:
connected = True
if self._cancelled:
self._log.append('stopped connect attempt (job was cancelled)')
elif connected:
self._log.append('connected')
# open file
self._log.append('opening source file `%s`' % self._path)
fd = open(self._path, 'rb')
# read and send file
self._log.append('sending data')
while True and not self._cancelled:
buf = fd.read(4096)
if not buf:
break
self._sock.send(buf)
self._md5.update(buf)
# report transfert completion
if self._cancelled:
self._log.append('transfert aborted (job was cancelled)')
else:
self._log.append('no more data to send')
self._checksum = self._md5.hexdigest()
self._md5 = None
self._log.append('transfered data checksum = `%s`' %
self._checksum)
else:
self._log.append('remote host did not accept connection, '
'aborting emission')
except:
traceback.print_exc(file=sys.stdout)
#traceback.print_exc(file=sys.stdout)
raise SendFileJobError('FIXME error reporting')
finally:
# close socket and files anyway
......@@ -341,11 +404,10 @@ class SendFileJob(XferJob):
self._sock.close()
except:
pass
if fd:
try:
fd.close()
except:
pass
try:
fd.close()
except:
pass
class ReceiveFileJob(XferJob):
......@@ -394,8 +456,9 @@ class ReceiveFileJob(XferJob):
except socket.error:
pass
if not self._port:
self._log.append('no port to listen to')
raise ReceiveFileJobError('no port to listen to')
msg = 'no port to listen to'
self._log.append(msg)
raise ReceiveFileJobError(msg)
return True
def _job(self):
......@@ -404,9 +467,19 @@ class ReceiveFileJob(XferJob):
fd = None
try:
self._sock.listen(1)
self._sock.settimeout(1)
# wait for connection
self._log.append('waiting for connection on port `%i`' % self._port)
conn, addr = self._sock.accept()
retry = 0
conn = None
while retry < 60 and not conn and not self._cancelled:
retry += 1
try:
conn, addr = self._sock.accept()
except socket.timeout as err:
pass
if self._cancelled:
self._log.append('stopped waiting (job was cancelled)')
if conn:
self._log.append('client connected from `%s`')
# open file
......@@ -414,16 +487,25 @@ class ReceiveFileJob(XferJob):
fd = open(self._path, 'wb')
# write to the file
self._log.append('receiving data')
while True:
while True and not self._cancelled:
buf = conn.recv(4096)
if not buf:
break
fd.write(buf)
self._log.append('remote host closed connection')
self._md5.update(buf)
# report transfert completion
if self._cancelled:
self._log.append('transfert aborted (job was cancelled)')
else:
self._log.append('remote host closed connection')
self._checksum = self._md5.hexdigest()
self._md5 = None
self._log.append('transfered data checksum = `%s`' %
self._checksum)
else:
self._log.append('no connection received, aborting reception')
except:
traceback.print_exc(file=sys.stdout)
#traceback.print_exc(file=sys.stdout)
raise ReceiveFileJobError('FIXME error reporting')
finally:
# close socket and files anyway
......@@ -431,11 +513,13 @@ class ReceiveFileJob(XferJob):
self._sock.close()
except:
pass
if fd:
try:
fd.close()
except:
pass
with self._ports_used_mutex.write:
if self._port in self._ports_used:
self._ports_used.remove(port)
try:
fd.close()
except:
pass
def get_port(self):
'''
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment