Commit 7b0ff548 authored by Anael Beutot's avatar Anael Beutot
Browse files

Use declarative API for RPC handlers

Define a decorator for libvirt handlers, those can be used only when the
connection to the libvirt is active
parent b2783ba2
Loading
Loading
Loading
Loading
+13 −16
Original line number Diff line number Diff line
@@ -37,7 +37,7 @@ import pyev
from sjrpc.utils import pass_connection, threadless
from sjrpc.core.exceptions import RpcError
from cloudcontrol.common.client.tags import Tag, tag_inspector, TagDB
from cloudcontrol.common.client.plugins import Base as BasePlugin
from cloudcontrol.common.client.plugins import Base as BasePlugin, rpc_handler
from cloudcontrol.common.jobs import JobsManager, JobsStore
from cloudcontrol.common.helpers.logger import patch_logging; patch_logging()

@@ -191,21 +191,6 @@ class Handler(BasePlugin):
            self.tag_db['__main__']['disk']._calculate_value().split(),
        ))

        # rpc handler
        self.rpc_handler.update(dict(
            execute_command=self.execute_command,
            node_shutdown=self.node_shutdown,
            shell=self.shell,
            resize=self.shell_resize,
            forward=self.forward,
            job_cancel=self.job_cancel,
            job_purge=self.job_purge,
            job_attachment=self.job_attachment,
            script_run=self.script_run,
            plugin_install=self.plugin_install,
            plugin_uninstall=self.plugin_uninstall,
            plugin_run=self.plugin_run
        ))
        # running shells
        self.shells = dict()
        # running remote commands
@@ -292,6 +277,7 @@ class Handler(BasePlugin):
        else:
            logger.debug('Plugins state saved')

    @rpc_handler
    def execute_command(self, command):
        """Execute an arbitrary shell command on the host.

@@ -329,6 +315,7 @@ class Handler(BasePlugin):
                                       remote_command.returncode)
        return stdout

    @rpc_handler
    def node_shutdown(self, reboot=True, gracefull=True):
        """Halt/Reboot the node.

@@ -359,6 +346,7 @@ class Handler(BasePlugin):

    @threadless
    @pass_connection
    @rpc_handler
    def shell(self, conn, shell='/bin/bash'):
        """Create a shell tunnel and return the label of the created tunnel.

@@ -369,12 +357,14 @@ class Handler(BasePlugin):
        return remote_shell.label

    @threadless
    @rpc_handler('resize')
    def shell_resize(self, label, row, col, xpixel, ypixel):
        """Resize the shell's attached terminal."""
        logger.debug('Shell resize')
        self.shells[label].resize(row, col, xpixel, ypixel)

    @pass_connection
    @rpc_handler
    def forward(self, conn, label, port, destination='127.0.0.1'):
        """TCP port forwarding."""
        try:
@@ -390,6 +380,7 @@ class Handler(BasePlugin):
        conn.create_tunnel(label=label, endpoint=sock)

    @pass_connection
    @rpc_handler
    def script_run(self, conn, sha1, script, owner, batch=None, *args):
        # retrive script if not here
        filename = os.path.join(self.scripts_dir, sha1)
@@ -420,12 +411,15 @@ class Handler(BasePlugin):
                args=args,
        )).id

    @rpc_handler
    def job_cancel(self, job_id):
        self.jobs_manager.get(job_id).cancel()

    @rpc_handler
    def job_purge(self, job_id):
        self.jobs_manager.purge(job_id)

    @rpc_handler
    def job_attachment(self, job_id, name):
        """
        :param name: attachment name
@@ -437,6 +431,7 @@ class Handler(BasePlugin):
            raise

    @pass_connection
    @rpc_handler
    def plugin_install(self, conn, sha1, name):
        """
        :param conn: RPC connection
@@ -463,6 +458,7 @@ class Handler(BasePlugin):
                                    name, sha1, content)
        self.update_plugin_index()

    @rpc_handler
    def plugin_uninstall(self, name):
        plugin = self.plugins.pop(name, None)
        if plugin is None:
@@ -470,6 +466,7 @@ class Handler(BasePlugin):
        plugin.uninstall()
        self.update_plugin_index()

    @rpc_handler
    def plugin_run(self, name, method, owner, batch=None, **kwargs):
        if name not in self.plugins:
            raise KeyError('Plugin %r is not running' % name)
+45 −64
Original line number Diff line number Diff line
@@ -22,6 +22,11 @@ from xml.etree import cElementTree as et
import libvirt
from sjrpc.utils import threadless, pass_connection
from cloudcontrol.common.client.tags import Tag, tag_inspector
from cloudcontrol.common.client.plugins import (
    rpc_handler,
    rpc_handler_decorator_factory,
    get_rpc_handlers,
)

from cloudcontrol.node.host import Handler as HostHandler
from cloudcontrol.node.hypervisor import tags
@@ -37,6 +42,10 @@ from cloudcontrol.node.hypervisor.jobs import (
logger = logging.getLogger(__name__)


libvirt_handler_marker = '_libvirt_rpc_handler'
libvirt_handler = rpc_handler_decorator_factory(libvirt_handler_marker)


# FIXME find a way to refactor Handler and Hypervisor class
class Handler(HostHandler):
    def __init__(self, *args, **kwargs):
@@ -53,6 +62,8 @@ class Handler(HostHandler):
        self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb)
        self.hypervisor = None
        self._virt_connected = False
        # list of libvirt related RPC handlers
        self.virt_handlers = get_rpc_handlers(self, libvirt_handler_marker)

        # register tags
        self.tag_db.add_tags(tag_inspector(tags, self))
@@ -117,43 +128,10 @@ class Handler(HostHandler):
        for tag in ('cpualloc', 'cpurunning', 'memalloc', 'memrunning'):
            self.tag_db['__main__'][tag].update_value()

        self.rpc_handler.update(dict(
            vm_define=self.vm_define,
            vm_undefine=self.vm_undefine,
            vm_export=self.vm_export,
            vm_stop=self.vm_stop,
            vm_start=self.vm_start,
            vm_suspend=self.vm_suspend,
            vm_resume=self.vm_resume,
        ))
        self.main.reset_handler('vm_define', self.vm_define)
        self.main.reset_handler('vm_undefine', self.vm_undefine)
        self.main.reset_handler('vm_export', self.vm_export)
        self.main.reset_handler('vm_stop', self.vm_stop)
        self.main.reset_handler('vm_destroy', self.vm_destroy)
        self.main.reset_handler('vm_start', self.vm_start)
        self.main.reset_handler('vm_suspend', self.vm_suspend)
        self.main.reset_handler('vm_resume', self.vm_resume)
        self.main.reset_handler('vm_migrate_tunneled', self.vm_migrate_tunneled)
        self.main.reset_handler('vol_create', self.vol_create)
        self.main.reset_handler('vol_delete', self.vol_delete)
        self.main.reset_handler('vol_import', self.vol_import)
        self.main.reset_handler('vol_import_wait', self.vol_import_wait)
        self.main.reset_handler('vol_export', self.vol_export)
        self.main.reset_handler('tun_setup', self.tun_setup)
        self.main.reset_handler('tun_connect', self.tun_connect)
        self.main.reset_handler('tun_connect_hv', self.tun_connect_hv)
        self.main.reset_handler('tun_destroy', self.tun_destroy)
        self.main.reset_handler('drbd_setup', self.drbd_setup)
        self.main.reset_handler('drbd_connect', self.drbd_connect)
        self.main.reset_handler('drbd_role', self.drbd_role)
        self.main.reset_handler('drbd_takeover', self.drbd_takeover)
        self.main.reset_handler('drbd_sync_status', self.drbd_sync_status)
        self.main.reset_handler('drbd_shutdown', self.drbd_shutdown)
        self.main.reset_handler('vm_open_console', self.vm_open_console)
        self.main.reset_handler('vm_disable_virtio_cache',
                                self.vm_disable_virtio_cache)
        self.main.reset_handler('vm_set_autostart', self.vm_set_autostart)
        # register libvirt handlers
        self.rpc_handler.update(self.virt_handlers)
        for k, v in self.virt_handlers.iteritems():
            self.main.reset_handler(k, v)

        # if everything went fine, unregister the timer
        self.timer.stop()
@@ -193,33 +171,9 @@ class Handler(HostHandler):
        self.hypervisor = None

        # remove handlers related to libvirt
        self.main.remove_handler('vm_define')
        self.main.remove_handler('vm_undefine')
        self.main.remove_handler('vm_export')
        self.main.remove_handler('vm_stop')
        self.main.remove_handler('vm_destroy')
        self.main.remove_handler('vm_start')
        self.main.remove_handler('vm_suspend')
        self.main.remove_handler('vm_resume')
        self.main.remove_handler('vm_migrate_tunneled')
        self.main.remove_handler('vol_create')
        self.main.remove_handler('vol_delete')
        self.main.remove_handler('vol_import')
        self.main.remove_handler('vol_import_wait')
        self.main.remove_handler('vol_export')
        self.main.remove_handler('tun_setup')
        self.main.remove_handler('tun_connect')
        self.main.remove_handler('tun_connect_hv')
        self.main.remove_handler('tun_destroy')
        self.main.remove_handler('drbd_setup')
        self.main.remove_handler('drbd_connect')
        self.main.remove_handler('drbd_role')
        self.main.remove_handler('drbd_takeover')
        self.main.remove_handler('drbd_sync_status')
        self.main.remove_handler('drbd_shutdown')
        self.main.remove_handler('vm_open_console')
        self.main.remove_handler('vm_disable_virtio_cache')
        self.main.remove_handler('vm_set_autostart')
        for handler in self.virt_handlers:
            del self.rpc_handler[handler]
            self.main.remove_handler[handler]
        # launch connection timer
        self.timer.start()

@@ -233,6 +187,7 @@ class Handler(HostHandler):
            if dom is not None:
                yield dom

    @libvirt_handler
    def vm_define(self, data, format='xml'):
        logger.debug('VM define')
        if format != 'xml':
@@ -240,12 +195,14 @@ class Handler(HostHandler):

        return self.hypervisor.vm_define(data)

    @libvirt_handler
    def vm_undefine(self, name):
        logger.debug('VM undefine %s', name)
        vm = self.hypervisor.domains.get(name)
        if vm is not None:
            vm.undefine()

    @libvirt_handler
    def vm_export(self, name, format='xml'):
        logger.debug('VM export %s', name)
        if format != 'xml':
@@ -258,6 +215,7 @@ class Handler(HostHandler):

        return vm.lv_dom.XMLDesc(0)

    @libvirt_handler
    def vm_stop(self, name):
        logger.debug('VM stop %s', name)
        try:
@@ -270,6 +228,7 @@ class Handler(HostHandler):
            logger.error(msg)
            raise UndefinedDomain(msg)

    @libvirt_handler
    def vm_destroy(self, name):
        logger.debug('VM destroy %s', name)
        try:
@@ -286,6 +245,7 @@ class Handler(HostHandler):
            logger.error(msg)
            raise UndefinedDomain(msg)

    @libvirt_handler
    def vm_start(self, name, pause=False):
        """
        :param str name: VM name to start
@@ -302,6 +262,7 @@ class Handler(HostHandler):
            logger.error(msg)
            raise UndefinedDomain(msg)

    @libvirt_handler
    def vm_suspend(self, name):
        logger.debug('VM suspend %s', name)
        try:
@@ -314,6 +275,7 @@ class Handler(HostHandler):
            logger.error(msg)
            raise UndefinedDomain(msg)

    @libvirt_handler
    def vm_resume(self, name):
        logger.debug('VM resume %s', name)
        try:
@@ -326,6 +288,7 @@ class Handler(HostHandler):
            logger.error(msg)
            raise UndefinedDomain(msg)

    @libvirt_handler
    def vm_migrate_tunneled(self, name, tun_res, migtun_res, unsafe=False,
                            timeout=60.):
        """Live migrate VM through TCP tunnel.
@@ -376,6 +339,7 @@ class Handler(HostHandler):

    @threadless
    @pass_connection
    @libvirt_handler
    def vm_open_console(self, conn, name):
        """
        :param conn: sjRPC connection instance
@@ -403,6 +367,7 @@ class Handler(HostHandler):
        proto = conn.create_tunnel(endpoint=endpoint, on_shutdown=on_shutdown)
        return proto.label

    @libvirt_handler
    def vm_disable_virtio_cache(self, name):
        """Set virtio cache to none on VM disks.

@@ -441,6 +406,7 @@ class Handler(HostHandler):
            logger.exception('Cannot update XML file for domain %s', name)
            raise

    @libvirt_handler
    def vm_set_autostart(self, name, autostart=True):
        """Set autostart on VM.

@@ -452,6 +418,7 @@ class Handler(HostHandler):
        # update autostart value now instead of 10 seconds lag
        vm.tags['autostart'].update_value()

    @libvirt_handler
    def vol_create(self, pool, name, size):
        logger.debug('Volume create %s, pool %s, size %s', name, pool, size)
        try:
@@ -460,6 +427,7 @@ class Handler(HostHandler):
            logger.exception('Error while creating volume')
            raise

    @libvirt_handler
    def vol_delete(self, pool, name):
        logger.debug('Volume delete %s, pool %s', name, pool)
        try:
@@ -468,6 +436,7 @@ class Handler(HostHandler):
            logger.exception('Error while deleting volume')
            raise

    @libvirt_handler
    def vol_import(self, pool, name):
        """
        :param pool: pool name where the volume is
@@ -492,6 +461,7 @@ class Handler(HostHandler):

        return dict(id=job.id, port=job.port)

    @libvirt_handler
    def vol_import_wait(self, job_id):
        """Block until completion of the given job id."""
        job = self.main.job_manager.get(job_id)
@@ -501,6 +471,7 @@ class Handler(HostHandler):

        return dict(id=job.id, log='', checksum=job.checksum)

    @libvirt_handler
    def vol_import_cancel(self, job_id):
        """Cancel import job."""
        logger.debug('Cancel import job')
@@ -510,6 +481,7 @@ class Handler(HostHandler):
        job.join()  # we don't call wait as it is already called in
                    # vol_import_wait handler

    @libvirt_handler
    def vol_export(self, pool, name, raddr, rport):
        """
        :param pool: pool name where the volume is
@@ -556,6 +528,7 @@ class Handler(HostHandler):
        )

    @threadless
    @rpc_handler
    def tun_connect(self, res, remote_res, remote_ip):
        """Connect tunnel to the other end.

@@ -569,6 +542,7 @@ class Handler(HostHandler):
        job.start()

    @threadless
    @rpc_handler
    def tun_connect_hv(self, res, migration=False):
        """Connect tunnel to local libvirt Unix socket.

@@ -580,6 +554,7 @@ class Handler(HostHandler):
        job.start()

    @threadless
    @rpc_handler
    def tun_destroy(self, res):
        """Close given tunnel.

@@ -590,6 +565,7 @@ class Handler(HostHandler):
        self.main.job_manager.cancel(job.id)
        job.wait()

    @libvirt_handler
    def drbd_setup(self, pool, name):
        """Create DRBD volumes.

@@ -621,6 +597,7 @@ class Handler(HostHandler):
            port=job.drbd_port,
        )

    @libvirt_handler
    def drbd_connect(self, res, remote_res, remote_ip):
        """Set up DRBD in connect mode. (Wait for connection and try to connect
        to the remote peer.
@@ -633,6 +610,7 @@ class Handler(HostHandler):
        job.connect(remote_ip, remote_res['port'])
        job.wait_connection()

    @libvirt_handler
    def drbd_role(self, res, primary):
        """Set up DRBD role.

@@ -645,6 +623,7 @@ class Handler(HostHandler):
        else:
            job.switch_secondary()

    @libvirt_handler
    def drbd_takeover(self, res, state):
        """Set up DRBD device as the VM disk. FIXME

@@ -654,6 +633,7 @@ class Handler(HostHandler):
        job = self.main.job_manager.get(res['jid'])
        job.takeover()

    @libvirt_handler
    def drbd_sync_status(self, res):
        """Return synchronization status of a current DRBD job.

@@ -667,6 +647,7 @@ class Handler(HostHandler):
        logger.debug('DRBD status %s', result)
        return result

    @libvirt_handler
    def drbd_shutdown(self, res):
        """Destroy DRBD related block devices.