Skip to content
cli.py 51.6 KiB
Newer Older
        :param content: content of the plugin to save
        """
        self.client.check('plugin')
        return self.server.plugins.save(name, content)

    @listed
    def delplugin(self, query):
        """ Delete a plugin on the server.

        :param query: the tql query used to select plugins to delete
        """
        objects = self.client.list(query, show=('r', 'name'), method='plugin')
        errs = Reporter()
        for obj in objects:
            if obj['r'] != 'plugin':
                errs.error(obj['id'], 'not a plugin')
            else:
                try:
                    self.server.plugins.delete(obj['name'])
                except RepositoryOperationError as err:
                    errs.error(obj['id'], 'error: %s' % err)
                else:
                    errs.success(obj['id'], 'plugin deleted')

        return errs.get_dict()

    @listed
    def installplugin(self, query, plugin):
        """ Install a plugin on matching nodes.
        """

        # Load the plugin:
        sha1_hash, _ = self.server.plugins.load(plugin)

        objects = self.client.list(query, show=('r', ), method='plugin')
        errs = Reporter()
        for obj in objects:
            if obj['r'] not in ('host', 'hv'):
                errs.error(obj['id'], 'not a host')
            else:
                try:
                    node = self.server.get_client(obj['id'])
                except KeyError:
                    errs.error(obj['id'], 'node not connected')
                    continue

                try:
                    node.plugin_install(sha1_hash, plugin)
                except RpcError as err:
                    errs.error(obj['id'], '%s (exc: %s)' % (err.message,
                                                            err.exception))
                else:
                    errs.success(obj['id'], 'plugin installed')
        return errs.get_dict()

    @listed
    def uninstallplugin(self, query, plugin):
        """ Install a plugin on matching nodes.
        """

        objects = self.client.list(query, show=('r', ), method='plugin')
        errs = Reporter()
        for obj in objects:
            if obj['r'] not in ('host', 'hv'):
                errs.error(obj['id'], 'not a host')
            else:
                try:
                    node = self.server.get_client(obj['id'])
                except KeyError:
                    errs.error(obj['id'], 'node not connected')
                    continue

                try:
                    node.plugin_uninstall(plugin)
                except RpcError as err:
                    errs.error(obj['id'], '%s (exc: %s)' % (err.message,
                                                            err.exception))
                else:
                    errs.success(obj['id'], 'plugin uninstalled')
        return errs.get_dict()

    @listed
    def helpplugin(self, query, plugin, method):
        """ Get help about a plugin installed on the matching host.
        """
        objects = self.client.list(query, show=('r', ), method='plugin')
        errs = Reporter()
        for obj in objects:
            if obj['r'] not in ('host', 'hw'):
                errs.error(obj['id'], 'not a host')
            else:
                try:
                    node = self.server.get_client(obj['id'])
                except KeyError:
                    errs.error(obj['id'], 'node not connected')
                    continue

                try:
                    help = node.plugin_help(plugin, method)
                except RpcError as err:
                    errs.error(obj['id'], '%s (exc: %s)' % (err.message,
                                                            err.exception))
                else:
                    errs.success(obj['id'], 'ok.', output=help)
        return errs.get_dict()

    @listed
    def runplugin(self, query, plugin, method, batch=None, **kwargs):
        """ Execute a plugin method on matching nodes.
        """

        # Load the plugin:
        sha1_hash, _ = self.server.plugins.load(plugin)

        objects = self.client.list(query, show=('r', ), method='plugin')
        errs = Reporter()
        for obj in objects:
            if obj['r'] not in ('host', 'hv'):
                errs.error(obj['id'], 'not a host')
            else:
                try:
                    node = self.server.get_client(obj['id'])
                except KeyError:
                    errs.error(obj['id'], 'node not connected')
                    continue

                try:
                    job_id = node.plugin_run(plugin, method, self.client.login,
                                             batch=batch, **kwargs)
                except RpcError as err:
                    errs.error(obj['id'], '%s (exc: %s)' % (err.message,
                                                            err.exception))
                else:
                    job_id = '.'.join((obj['id'], job_id))
                    errs.success(obj['id'], 'ok.', jobs=job_id)
        return errs.get_dict()
    # Election / Migration / Cloning / Allocation
    @listed
    def electiontypes(self):
        return Elector.ALGO_BY_TYPES

    @listed
    def election(self, query_vm, query_dest, mtype='cold', algo='fair', **kwargs):
        """ Consult the server for the migration of specified vm on
            an hypervisor pool.

        :param query_vm: the tql query to select VMs to migrate
        :param query_dest: the tql query to select destination hypervisors
            candidates
        :param mtype: type of migration
        :param algo: algo used for distribution
        """
        elector = Elector(self.server, query_vm, query_dest, self.client)
        return elector.election(mtype, algo, **kwargs)

    @listed
    def migrate(self, migration_plan):
        """ Launch the provided migration plan.

        :param migration_plan: the plan of the migration.
        :return: a standard error report
        """
        errs = Reporter()
        for migration in migration_plan:
            # Check if the migration type is know:
            if migration['type'] in MIGRATION_TYPES:
                mtype = MIGRATION_TYPES[migration['type']]
            else:
                errmsg = '%r unknown migration type' % migration['type']
                errs.error(migration['sid'], errmsg)
                continue

            vm = self.server.db.get_by_id(migration['sid'], ('h', 'hv', 'p'))

            # Construct the migration properties:
            migration_properties = {
                'server': self.server,
                'vm_name': vm['h'],
                'hv_source': vm['p'],
                'hv_dest': migration['did']
            }

            # Create the job:
            job = self.client.spawn_job(mtype, settings=migration_properties)
            errs.success(migration['sid'], 'migration launched, id:%s' % job.id)
    @listed
    def migrate2(self, tql_vm, tql_target, batch=None, live=False, flags=None):
        """ Launch a migration.
        """

        self.client.check('migrate')
        vms = self.client.list(tql_vm, show=('r', ), method='migrate')
        errs = Reporter()

        for vm in vms:
            if vm['r'] != 'vm':
                errs.error(vm['id'], 'not a vm')
            else:
                settings = {'server': self.server,
                            'client': self.client,
                            'vm_id': vm['id'],
                            'tql_target': tql_target,
                            'live': live,
                            'flags': flags}

                job = self.client.spawn_job(MigrationJob, batch=batch, settings=settings)
                errs.success(vm['id'], 'migration launched, id:%s' % job.id)
        return errs.get_dict()

Antoine Millet's avatar
Antoine Millet committed
    @listed
    def diskcopy(self, source_id, source_pool, source_vol, dest_id, dest_pool, dest_vol):
        """ Launch a migration.
        """

        self.client.check('diskcopy')
        settings = {'server': self.server,
                    'client': self.client,
                    'source_id': source_id,
                    'source_pool': source_pool,
                    'source_vol': source_vol,
                    'dest_id': dest_id,
                    'dest_pool': dest_pool,
                    'dest_vol': dest_vol}
        job = self.client.spawn_job(DiskCopyJob, settings=settings)
        return job.id

    @listed
    def clone(self, tql_vm, tql_dest, name):
        """ Create and launch a clone job.

        :param tql_vm: a tql matching one vm object (the cloned vm)
        :param tql_dest: a tql matching one hypervisor object (the destination
                         hypervisor)
        :param name: the new name of the VM
        """

Anael Beutot's avatar
Anael Beutot committed
        vm = self.client.list(tql_vm, show=('r', 'h', 'p'), method='clone')

        if len(vm) != 1:
            raise CloneError('VM Tql must select ONE vm')
        elif vm[0]['r'] != 'vm':
            raise CloneError('Destination Tql must select a vm')
        else:
            vm = vm[0]

Anael Beutot's avatar
Anael Beutot committed
        dest = self.client.list(tql_dest, show=('r',), method='clone')
        if len(dest) != 1:
            raise CloneError('Destination Tql must select ONE hypervisor')
        elif dest[0]['r'] != 'hv':
            raise CloneError('Destination Tql must select an hypervisor')
        else:
            dest = dest[0]

        job = self.client.spawn_job(CloneJob, settings={'server': self.server,
                                                        'client': self.client,
                                                        'vm_name': vm['h'],
                                                        'new_vm_name': name,
                                                        'hv_source': vm['p'],
                                                        'hv_dest': dest['id']})
        return job.id
    @listed
    def allocate(self, vmspec, tql_target):
        """ Allocate new VMs.

        :param vmspec: a vmspec structure
        :param tql_target: a TQL used as target restriction
        """

        self.client.check('allocate')

        # Check and expand vmspec input:
        expanded_vmspec = expand_vmspec(vmspec)

        job = self.client.spawn_job(AllocationJob, settings={'server': self.server,
                                                             'client': self.client,
                                                             'expanded_vmspec': expanded_vmspec,
                                                             'tql_target': tql_target})
        return job.id

Antoine Millet's avatar
Antoine Millet committed
    @listed
    def console(self, tql):
        """ Start a remote console on object matching the provided tql.

        :param tql: tql matching only one object on which start the console
        :return: the label of the created tunnel
        """
        objects = self.client.list(tql, show=('r', 'p', 'h'), method='console')
        if not objects:
            raise NotImplementedError('No objects matched by query')
        elif len(objects) != 1:
Antoine Millet's avatar
Antoine Millet committed
            raise NotImplementedError('Console only support one tunnel at time for now')
        errs = Reporter()
        for obj in objects:
            if obj['r'] in ('vm',):
                client = self.server.get_client(obj['p'])
                try:
                    srv_to_host_tun = client.console(obj['h'])
                except Exception as err:
                    errs.error(obj['id'], str(err))
                else:
                    cli_tun = self.client.register_tunnel('console', client, srv_to_host_tun)
                    errs.success(obj['id'], 'tunnel started.', output=cli_tun.label)
Antoine Millet's avatar
Antoine Millet committed
            else:
                errs.error(obj['id'], 'bad role')
        return errs.get_dict()

    @listed
        """ Start a remote shell on object matching the provided tql.
        :param tql: tql matching only one object on which start the shell
        :return: the label of the created tunnel
        objects = self.client.list(tql, show=('r', 'p'), method='shell')
        if not objects:
            raise NotImplementedError('No objects matched by query')
        elif len(objects) != 1:
            raise NotImplementedError('Shell only support one tunnel at time for now')
        errs = Reporter()
        for obj in objects:
            if obj['r'] in ('host', 'hv'):
                client = self.server.get_client(obj['id'])
                srv_to_host_tun = client.shell()
                cli_tun = self.client.register_tunnel('shell', client, srv_to_host_tun)
                errs.success(obj['id'], 'tunnel started.', output=cli_tun.label)
            else:
                errs.error(obj['id'], 'bad role')
        return errs.get_dict()

    @listed
    def resize(self, label, row, col, xpixel, ypixel):
        """ Send a resize event to the remote shell's tty.

        :param label: label of the tty tunnel to resize
        :param row: number of rows
        :param col: number of columns
        :param xpixel: unused
        :param ypixel: unused
        if label is None:
            tuns = [(c, st.label) for t, c, ct, st
                    in self.client.tunnels.values() if t == 'shell']
        else:
            ttype, client, ctun, stun = self.client.get_tunnel(label)
            if ttype != 'shell':
                raise ValueError('Label does not refers on a shell')
            tuns = [(client, stun.label)]

        for client, label in tuns:
            client.resize(label, row, col, xpixel, ypixel)
    def forward(self, login, port, destination='127.0.0.1'):
        """ Forward a TCP port to the client.

        :param login: login of the remote client on which establish the tunnel
        :param port: port on which establish the tunnel on destination
        :param destination: tunnel destination (from the remote client side)
        self.client.check('forward', query='id=%s' % login)
        # Create the tunnel to the node:
        try:
            host_client = self.server.get_client(login)
        except KeyError:
            raise KeyError('Specified client is not connected')
        s2n_tun = host_client.forward(port, destination)

        # Create tunnel to the CLI
        c2s_tun = self.client.register_tunnel('forward', host_client, s2n_tun)

        return c2s_tun.label
    @listed
    def dbstats(self):
        """ Get statistics about tql database.
        """
        return self.server.db.stats()

    def forward_call(self, login, func, *args, **kwargs):
        """ Forward a call to a connected client and return result.

        :param login: login of the connected client
        :param func: function to execute on the client
        :param \*args, \*\*kwargs: arguments of the call
        """
        self.client.check('forward_call')
        client = self.server.get_client(login)
        return client.conn.call(func, *args, **kwargs)


class CliClient(Client):

    """ A cli client connected to the cc-server.
    """

    ROLE = 'cli'
    RPC_HANDLER = CliHandler
    KILL_ALREADY_CONNECTED = True
    def __init__(self, *args, **kwargs):
        super(CliClient, self).__init__(*args, **kwargs)
        self._tunnels = {}  # Running tunnels for this client (as client)

    @property
    def tunnels(self):
        """ Get active client tunnels by label (a dict).
        """
        return self._tunnels

    def spawn_job(self, job_class, **kwargs):
        return self._server.jobs.spawn(job_class, self.account, **kwargs)
    def register_tunnel(self, ttype, client, tun, label=None):
        """ Create and register a tunnel for this client.

        :param ttype: type of tunnel
        :param client: client where the tunnel go
        :param tun: the tunnel of this client
        :param label: label of the tunnel to create
        """
        def cb_on_shutdown(tun):
            # Call the default callback:
            tun.cb_default_on_shutdown(tun)
            # Delete the tunnel from the current running tunnels:
            self.unregister_tunnel(tun.label)

        ctun = self.conn.create_tunnel(label=label, endpoint=tun.socket,
                                       on_shutdown=cb_on_shutdown,
                                       close_endpoint_on_shutdown=False)
        self._tunnels[ctun.label] = (ttype, client, ctun, tun)
        return ctun

    def get_tunnel(self, label):
        """ Get the tunnel binded to the provided label.

        :return: a tuple (type, remote_client, tunnel, remote_client_tunnel)
            where: **type** is a string provided on tunnel creation,
            **remote_client** the client object of the remote client on which
            the tunnel is established, **tunnel** the cli-to-server tunnel
            object from the sjRpc, **remote_client_tunnel** the
            server-to-remote-client tunnel object from the sjRpc.
        """
        return self._tunnels[label]

    def unregister_tunnel(self, label):
        try:
            del self._tunnels[label]
        except KeyError:
            pass
Antoine Millet's avatar
Antoine Millet committed
    def wall(self, sender, message):
        """ Send a wall to the client.
        """
        self.conn.call('wall', sender, message)


class MultiCliClient(CliClient):

    """ A bootstrap client connected to the cc-server.
    """

    ROLE = 'mcli'
    KILL_ALREADY_CONNECTED = False

    def _get_tql_object(self):
        tql_object = SObject(self.login)
        tql_object.register(StaticTag('r', self.role))
        tql_object.register(StaticTag('a', self._login))
        self._server.db.register(tql_object)
        return tql_object

    @property
    def login(self):
        return '%s.%s' % (self._login, self.conn.get_fd())

    @property
    def role(self):
        return 'cli'

    def shutdown(self):
        super(MultiCliClient, self).shutdown()
        # Also, remote the object from the db:
        self._server.db.unregister(self.login)


Client.register_client_class(MultiCliClient)
Client.register_client_class(CliClient)