Commit 95380942 authored by Anael Beutot's avatar Anael Beutot
Browse files

Added base objects for job handling.

JobManager + BaseJob.
parent b5af93d2
Loading
Loading
Loading
Loading
+120 −2
Original line number Diff line number Diff line
@@ -5,5 +5,123 @@ from threading import Thread
logger = logging.getLogger(__name__)


class JobManager(Thread, object):
class JobManager(object):
    def __init__(self, main_loop):
        """
        :param main_loop: :class:`MainLoop` instance
        """
        def counter():
            i = 0
            while True:
                yield i
                i += 1

        self.job_id = counter()
        self.main = main_loop
        #: keep an index of all jobs
        self.jobs = {}

    def job_start(self):
        pass

    def job_stop(self):
        pass

    def notify(self, job):
        """Called when a job is done."""
        # by now only remove the job
        self.remove(job)

    def cancel(self, job_id):
        """Cancel a job."""
        self.jobs[job_id].running = False

    def remove(self, job):
        return self.jobs.pop(job.id)

    def create(self, job_constructor, *args, **kwargs):
        """Create a new job and populate job id."""
        job = job_constructor(self, self.main.evloop, *args, **kwargs)
        self.jobs[job.id] = job
        return job

    def get(self, job_id):
        return self.jobs[job_id]

    def start(self):
        pass
    def stop(self):
        pass


class BaseJob(Thread):
    """Base job

    Handles job notification to the job manager.
    """
    def __init__(self, job_manager, ev_loop):
        Thread.__init__(self)
        #: report progress in %
        self.progress = 0.
        #: async watcher to handle notification to main loop
        self.watcher = ev_loop.async(self.notify_cb)

        self.job_manager = job_manager

        self.error_msg = 'Unexpected exception during job execution'

        #: job id
        self.id = job_manager.job_id.next()

        self.running = False

    def pre_job(self):
        """Job preparation that is called when doing start, it can raise
        exceptions to report error to the caller.

        """
        pass

    def run_job(self):
        """Overide this method to define what your job do."""
        raise NotImplementedError

    def run(self):
        try:
            self.run_job()
        except Exception:
            logger.exception(self.error_msg)
            raise
        finally:
            self.running = False
            self.watcher.send()

    def notify_cb(self, *args):
        self.watcher.stop()
        self.job_manager.notify(self)

    def start(self):
        # first we run pre_job as it could raise an exception
        try:
            self.pre_job()
        except Exception:
            # in case of error we must remove the job from the manager
            self.job_manager.remove(self)
            raise
        # then we start the watcher when it's safe
        self.watcher.start()
        self.running = True
        Thread.start(self)

    def start_current(self):
        """Start job in current thread."""
        try:
            self.pre_job()
        except Exception:
            self.job_manager.remove(self)
            raise
        self.running = True
        self.run_job()

    def stop(self):
        self.running = False
+9 −0
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@ from sjrpc.utils import ConnectionProxy, RpcHandler, threadless
from ccnode import __version__
from ccnode.config import NodeConfigParser
from ccnode.tags import Tag, get_tags, TagDB
from ccnode.jobs import JobManager
from ccnode.exc import PluginError


@@ -210,6 +211,10 @@ class MainLoop(object):

        # tag database
        self.tag_db = TagDB(self, tags=DEFAULT_TAGS)

        # job manages
        self.job_manager = JobManager(self)

        # handlers
        self.rpc_handler = dict(
            get_tags=partial(threadless(get_tags), self.tag_db['__main__']),
@@ -242,6 +247,10 @@ class MainLoop(object):
            return {}

        return get_tags(sub_db, tags)

    @threadless
    def job_list(self):
        pass
    # End RPC handlers definitions

    def reset_handler(self, name, handl):