Loading cloudcontrol/node/jobs.py +35 −24 Original line number Diff line number Diff line Loading @@ -174,6 +174,22 @@ class ForkedJob(object): self.job_done = Event() self.fork_pid = None # internal event used to wait for forked process termination self.fork_die = Event() # libev child watcher self.fork_watcher = None # return status of forked process self.return_status = None def create_fork_watcher(self): self.fork_watcher = self.job_manager.main.evloop.child(self.fork_pid, False, self.child_cb) self.fork_watcher.start() def child_cb(self, watcher, revents): self.return_status = watcher.rstatus self.fork_die.set() watcher.stop() self.fork_watcher = None def pre_job(self): """This method represents any preparation job that must be done in the Loading Loading @@ -294,18 +310,26 @@ class ForkedJob(object): if self.fork_pid == 0: # child # just hope to not receive any signals in between since there is no # way to block signals in python :( try: self.reset_signal_mask() self.close_fds() self.after_fork() except: traceback.print_exc() os._exit(1) try: self.run_job() except: sys.stderr.write('Error during job %s\n' % self.id) traceback.print_exc() os._exit(1) else: sys.stderr.write('Job execution went well %s\n' % self.id) os._exit(0) else: self.create_fork_watcher() # close child fds for fd in self.open_fds: try: Loading @@ -326,15 +350,14 @@ class ForkedJob(object): """ self.pre_job() self.run() self.job_manager.main.call_in_main_thread(self.run) def stop(self): """This would be called to stop the job. It will kill the child but it will not call `os.waitpid <http://docs.python.org/library/os.html#os.waitpid>`_ to get return status from zombie process. :py:meth:`wait` MUST be called anyway. """ if self.fork_die.is_set(): return try: os.kill(self.fork_pid, signal.SIGKILL) except OSError as exc: Loading @@ -358,27 +381,15 @@ class ForkedJob(object): if self.fork_pid is None: return try: while True: try: pid, return_status = os.waitpid(self.fork_pid, 0) except OSError as exc: if exc.errno == errno.EINTR: continue logger.error('Error while waiting for child to terminate:' ' %s (job %s)', exc.strerror, self.id) raise else: break assert pid == self.fork_pid if return_status >> 8 != 0: if return_status & 0xff == signal.SIGKILL: self.fork_die.wait() if self.return_status >> 8 != 0: if self.return_status & 0xff == signal.SIGKILL: logger.error('Job was killed') else: raise JobError('Exception during job, returned %s, signal' ' %s' % ( return_status >> 8, num_to_sig(return_status & 0xff))) self.return_status >> 8, num_to_sig(self.return_status & 0xff))) finally: self.fork_pid = None self.job_done.set() Loading Loading
cloudcontrol/node/jobs.py +35 −24 Original line number Diff line number Diff line Loading @@ -174,6 +174,22 @@ class ForkedJob(object): self.job_done = Event() self.fork_pid = None # internal event used to wait for forked process termination self.fork_die = Event() # libev child watcher self.fork_watcher = None # return status of forked process self.return_status = None def create_fork_watcher(self): self.fork_watcher = self.job_manager.main.evloop.child(self.fork_pid, False, self.child_cb) self.fork_watcher.start() def child_cb(self, watcher, revents): self.return_status = watcher.rstatus self.fork_die.set() watcher.stop() self.fork_watcher = None def pre_job(self): """This method represents any preparation job that must be done in the Loading Loading @@ -294,18 +310,26 @@ class ForkedJob(object): if self.fork_pid == 0: # child # just hope to not receive any signals in between since there is no # way to block signals in python :( try: self.reset_signal_mask() self.close_fds() self.after_fork() except: traceback.print_exc() os._exit(1) try: self.run_job() except: sys.stderr.write('Error during job %s\n' % self.id) traceback.print_exc() os._exit(1) else: sys.stderr.write('Job execution went well %s\n' % self.id) os._exit(0) else: self.create_fork_watcher() # close child fds for fd in self.open_fds: try: Loading @@ -326,15 +350,14 @@ class ForkedJob(object): """ self.pre_job() self.run() self.job_manager.main.call_in_main_thread(self.run) def stop(self): """This would be called to stop the job. It will kill the child but it will not call `os.waitpid <http://docs.python.org/library/os.html#os.waitpid>`_ to get return status from zombie process. :py:meth:`wait` MUST be called anyway. """ if self.fork_die.is_set(): return try: os.kill(self.fork_pid, signal.SIGKILL) except OSError as exc: Loading @@ -358,27 +381,15 @@ class ForkedJob(object): if self.fork_pid is None: return try: while True: try: pid, return_status = os.waitpid(self.fork_pid, 0) except OSError as exc: if exc.errno == errno.EINTR: continue logger.error('Error while waiting for child to terminate:' ' %s (job %s)', exc.strerror, self.id) raise else: break assert pid == self.fork_pid if return_status >> 8 != 0: if return_status & 0xff == signal.SIGKILL: self.fork_die.wait() if self.return_status >> 8 != 0: if self.return_status & 0xff == signal.SIGKILL: logger.error('Job was killed') else: raise JobError('Exception during job, returned %s, signal' ' %s' % ( return_status >> 8, num_to_sig(return_status & 0xff))) self.return_status >> 8, num_to_sig(self.return_status & 0xff))) finally: self.fork_pid = None self.job_done.set() Loading