-
Seblu authored
Introduce new class RemoteCommand, which help checking needed remote functions are available before adding a command to list of commands
Seblu authoredIntroduce new class RemoteCommand, which help checking needed remote functions are available before adding a command to list of commands
command.py 9.50 KiB
#!/usr/bin/env python
#coding=utf8
'''
CloudControl CLI command module
'''
from cccli.exception import *
from sjrpc.core.exceptions import *
from cccli.printer import Printer, color
from optparse import OptionParser
class Command(object):
'''Base of all command class'''
def __init__(self, cli, argv0):
self.cli = cli
self.printer = self.cli.printer
self.name = argv0
def __call__(self, argv):
raise NotImplementedError
def usage(self):
return "Usage: %s"%self.name
def help(self):
return self.__doc__
class OptionCommand(Command):
'''Commands with options handling'''
class OptionCommandParser(OptionParser):
'''Parser of Option for OptionCommand'''
def error(self, e):
raise cmdBadArgument(e)
def exit(self):
raise cmdExit()
def __init__(self, cli, argv0):
Command.__init__(self, cli, argv0)
self.optionparser = OptionCommand.OptionCommandParser(prog=argv0)
self.set_usage("%prog [options]")
self.options = None
self.args = list()
def usage(self):
'''Return usage string'''
return self.optionparser.format_help().strip()
def parse_args(self, argv):
'''Wrapper to parse_args'''
(self.options, self.args) = self.optionparser.parse_args(argv[1:])
def add_option(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.add_option(*args, **kwargs)
def remove_option(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.remove_option(*args, **kwargs)
def set_usage(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.set_usage(*args, **kwargs)
class RemoteCommand(OptionCommand):
'''Command which needs connection to server'''
def __init__(self, cli, argv0):
OptionCommand.__init__(self, cli, argv0)
self.rpc = cli.rpc
def remote_functions(self):
'''Return a set of needed remote functions'''
raise NotImplementedError
def print_objects(self, objectlist, ignore=None, index=False):
'''Trivial objectlist printing of tag'''
if objectlist is None:
return
_order = objectlist.get("order", None)
for (i,o) in enumerate(objectlist["objects"]):
if index:
self.printer.out("[%s] "%i, nl="")
self.print_tags(o, order=_order, ignore=ignore)
def print_tags(self, taglist, order=None, ignore=None):
'''Display a tag with tagdisplay settings'''
ignore = () if ignore is None else ignore
order = () if order is None else order
# copy dict to show
tl = taglist.copy()
# remove ignore tags
for tn in ignore:
tl.pop(tn, None)
# list to print
pls = []
# print firstly order tags
for tn in order:
tv = tl.pop(tn, None)
if tv is not None:
pls.append("%s%s:%s%s"%(self.tdtc(tn), tn, self.tdc(tn), self.tdr(tn, tv)))
# print tags without order, alpha ordered
for tn in sorted(tl.keys()):
pls.append("%s%s:%s%s"%(self.tdtc(tn), tn, self.tdc(tn), self.tdr(tn, tl[tn])))
self.printer.out("%s%s"%(" ".join(pls), color["reset"]))
class TqlCommand(RemoteCommand):
'''Command which handle TQL stuff'''
def __init__(self, cli, argv0):
RemoteCommand.__init__(self, cli, argv0)
self.set_usage("%prog [options] <tql>")
# set tql filter stuff
self.tql_filter = ""
self.add_option("-r", "--raw", action="callback", dest="raw",
callback=self._cb_raw,
help="Don't append security filter to TQL")
# set tql check stuff
self.add_option("-d", "--direct", action="store_true", dest="direct",
help="Directly send TQL to server")
# set tql status stuff
self.add_option("-q", "--quiet", action="store_false", dest="status",
help="Dont status of call request")
# index printing
self.add_option("-i", "--index", action="store_true", dest="index",
help="Print TQL line index")
# tql printer option
self.add_option("--print-tql", action="store_true", dest="tql_print",
help="Print TQL before sending to server")
# set tagdisplay stuff
self.tdr = self.cli.tagdisplay.resolve
self.tdc = self.cli.tagdisplay.color
self.tdtc = self.cli.tagdisplay.titlecolor
self.add_option("--no-tagdisplay", action="callback",
callback=self._cb_notagdisplay,
help="No tagdisplay custom display")
self.add_option("--no-color", action="callback",
callback=self._cb_nocolor,
help="No output coloration")
def _cb_notagdisplay(self, option, opt, value, parser):
'''Callback for option --no-tagdisplay'''
self.tdr = lambda tagname, tagvalue: tagvalue
self.tdc = self.cli.tagdisplay.default_color
self.tdtc = self.cli.tagdisplay.default_titlecolor
def _cb_nocolor(self, option, opt, value, parser):
'''Callback for option --no-color'''
self.tdr = lambda tagname, tagvalue: tagvalue
self.tdc = lambda tagname: ""
self.tdtc = lambda tagname: ""
def _cb_raw(self, option, opt, value, parser):
'''Callback for option --raw'''
self.tql_filter = ""
def rpccall(self, *args, **kwargs):
'''
Call a RPC method an show tql return
_callback: call function _callback after each rpccall
_status: display call status
_tql_index: is index in args where filter should be appended (def: 1)
_tql_print: print tql with filter
_exception: catch or not RPCError exception
_direct: call directly, no listing before
'''
# set rpccall default option value
_options = { "status": True,
"index": False,
"direct": False,
"exception": False,
"tql": "",
"tql_index": 1,
"tql_print": False,
"callback": None,
}
# check for options modifiers
for o in _options.keys():
_o = "_%s"%o
if _o in kwargs:
_options[o] = kwargs[_o]
del kwargs[_o]
elif o in dir(self.options):
x = getattr(self.options, o)
if x is not None:
_options[o] = x
# check tql index and get a copy
if _options["tql_index"] < 0 or _options["tql_index"] >= len(args):
raise cmdError("No indexed TQL")
# append filter (empty if raw mode)
if self.tql_filter != "":
l = list(args)
l[_options["tql_index"]] += self.tql_filter
args = tuple(l)
# Tql printer
if _options["tql_print"]:
self.printer.out("TQL: %s"%args[_options["tql_index"]])
# Tql check
if _options["direct"]:
return self._unsecure_rpccall(_options, args, kwargs)
return self._secure_rpccall(_options, args, kwargs)
def _unsecure_rpccall(self, _options, args, kwargs):
'''Just call an RPC without checking before'''
try:
d = self.rpc.call(*args, **kwargs)
if _options["callback"] is not None:
_options["callback"](d)
if _options["status"]:
self.print_objects(d, ["output"], index=_options["index"])
return d
except RpcError as e:
if _options["exception"]:
raise
raise cmdError("RPCError: %s"%str(e))
def _secure_rpccall(self, _options, args, kwargs):
'''Call RPC after listing, confirmation and with id'''
# get objects id
try:
objs = self.cli.rpc.call("list", args[_options["tql_index"]])
except RpcError as e:
raise cmdError("RPCError: %s"%str(e))
# no result, goodbye
if len(objs["objects"]) == 0:
raise cmdError("No selected object by TQL.")
self.printer.out("Objects:")
self.print_objects(objs, index=_options["index"])
self.printer.out("Objects count: %s"%len(objs["objects"]))
# be sure boby want do that
if self.printer.ask("%sProceed?%s (yes): "%(color["lred"], color["reset"])) != "yes":
raise cmdWarning("User aborted")
# bobby doing many things, he needs to be really sure!
if len(objs["objects"]) > 5:
self.printer.out("%sYou will act on more than 5 objets!%s"%(color["uyellow"], color["reset"]))
if self.printer.ask("%sAre you really sure?%s (Yes Mistress): "
%(color["lred"], color["reset"])) != "Yes Mistress":
raise cmdWarning("User aborted")
# per validated id execution (this is a kind of atomic implementation)
for obj in objs["objects"]:
try:
l = list(args)
l[_options["tql_index"]] = "id=%s"%obj["id"]
d = self.cli.rpc.call(*tuple(l), **kwargs)
if _options["callback"] is not None:
_options["callback"](d)
if _options["status"]:
self.print_objects(d, ["output"], index=False)
except RpcError as e:
self.printer.error("RPCError: %s"%str(e))