diff --git a/cloudcontrol/node/exc.py b/cloudcontrol/node/exc.py index 783c82afa182716f84413a818e5d6190c7fbe314..43e61598af2313333c43eeef22aea641229bf7bb 100644 --- a/cloudcontrol/node/exc.py +++ b/cloudcontrol/node/exc.py @@ -36,3 +36,7 @@ class ConsoleAlreadyOpened(CCNodeError): class VMMigrationError(CCNodeError): pass + + +class JobError(CCNodeError): + pass diff --git a/cloudcontrol/node/jobs.py b/cloudcontrol/node/jobs.py index 96ea8bb2d5782b64f58968abaa4dd1cf90f3c972..6c886550a08500e8f8c0a704ea080b74f706c41d 100644 --- a/cloudcontrol/node/jobs.py +++ b/cloudcontrol/node/jobs.py @@ -1,5 +1,13 @@ +import errno import logging -from threading import Thread +import os +import resource +import signal +import subprocess +import sys +from threading import Thread, Event + +from cloudcontrol.node.exc import JobError logger = logging.getLogger(__name__) @@ -130,3 +138,155 @@ class BaseThreadedJob(Thread): def stop(self): self.running = False + + +class BaseIOJob(object): + """Job that can set ionice and executes in a fork. + + When inherit, you must define open_fds property that list file descriptors + that must be kept in the child and closed in the parent. + + .. warning:: + logging should not be used in the child as this would cause a deadlock + to occur, see http://bugs.python.org/issue6721 + """ + + IO_NICE = 7 + + def __init__(self, job_manager): + self.job_manager = job_manager + + #: job id + self.id = job_manager.job_id.next() + + self.running = False + # event for other thread to wait for job termination + self.job_done = Event() + + self.fork_pid = None + + def pre_job(self): + # overide in sub class + pass + + def run_job(self): + # overide in sub class + pass + + def set_io_nice(self): + try: + subprocess.check_call(['ionice', '-n%d' % self.IO_NICE, + '-p%d' % os.getpid()], close_fds=True) + except subprocess.CalledProcessError as exc: + sys.stderr.write('Cannot set ionice, return code %s\n' % exc.returncode) + + def close_fds(self): + """Close all fds uneeded fds in children.""" + # get max fd + limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1] + if limit == resource.RLIM_INFINITY: + max_fd = 2048 + else: + max_fd = limit + + exclude_fd = self.open_fds + if self.job_manager.main.config.debug: + exclude_fd += [0, 1, 2] # debug + for fd in xrange(max_fd, -1, -1): + if fd in exclude_fd: + continue + try: + os.close(fd) + except OSError as exc: + if exc.errno != errno.EBADF: + raise + # wasn't open + if not self.job_manager.main.config.debug: + sys.stdin = open(os.devnull) + sys.stdout = open(os.devnull) + sys.stderr = open(os.devnull) + assert sys.stdin.fileno() == 0 + assert sys.stdout.fileno() == 1 + assert sys.stderr.fileno() == 2 + + def reset_signal_mask(self): + signal.signal(signal.SIGTERM, lambda *args: os._exit(1)) + signal.signal(signal.SIGUSR1, signal.SIG_IGN) + signal.signal(signal.SIGINT, signal.SIG_IGN) + + def run(self): + try: + self.fork_pid = os.fork() + except OSError as exc: + logger.error('Cannot fork: %s', os.strerror(exc.errno)) + raise + self.running = True + + if self.fork_pid == 0: + # child + self.reset_signal_mask() + self.close_fds() + self.set_io_nice() + try: + self.run_job() + except: + sys.stderr.write('Error during job\n') + os._exit(1) + else: + sys.stderr.write('Job execution went well\n') + os._exit(0) + else: + # close child fds + for fd in self.open_fds: + try: + os.close(fd) + except OSError: + logger.error('Error while closing fds in parent') + raise + + def start(self): + self.pre_job() + self.run() + + def stop(self): + try: + os.kill(self.fork_pid, signal.SIGKILL) + except OSError as exc: + if exc.errno == errno.ESRCH: + logger.debug('Child already killed') + return + + logger.error('Cannot kill child for IO job: %s', + os.strerror(exc.errno)) + raise + + def notify(self): + self.job_manager.notify(self) + + def wait(self): + if self.fork_pid is None: + return + try: + while True: + try: + pid, return_status = os.waitpid(self.fork_pid, 0) + except OSError as exc: + if exc.errno == errno.EINTR: + continue + + logger.error('Error while waiting for child to terminate: %s', + os.strerror(exc.errno)) + raise + else: + break + assert pid == self.fork_pid + if return_status >> 8 != 0: + raise JobError('Exception during job, returned %s', + return_status) + finally: + self.fork_pid = None + self.job_done.set() + self.notify() + + def join(self): + self.job_done.wait()