Newer
Older
#!/usr/bin/env python
#coding=utf8
'''
CloudControl CLI command module
'''
from cccli.exception import *
from sjrpc.core.exceptions import *
from cccli.printer import Printer, color
from optparse import OptionParser
'''Base of all command class'''
def __init__(self, cli, argv0):
self.name = argv0
def usage(self):
return "Usage: %s"%self.name
def help(self):
return self.__doc__
class OptionCommand(Command):
'''Add options parser to Command'''
class OptionCommandParser(OptionParser):
'''Parser of Option for OptionCommand'''
def error(self, e):
raise cmdBadArgument(e)
def exit(self):
raise cmdExit()
def __init__(self, cli, argv0):
Command.__init__(self, cli, argv0)
self.optionparser = OptionCommand.OptionCommandParser(prog=argv0)
self.options = None
self.args = list()
'''Return usage string'''
return self.optionparser.format_help().strip()
def parse_args(self, argv):
'''Wrapper to parse_args'''
(self.options, self.args) = self.optionparser.parse_args(argv[1:])
def add_option(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.add_option(*args, **kwargs)
def remove_option(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.remove_option(*args, **kwargs)
def set_usage(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.set_usage(*args, **kwargs)
class TqlCommand(OptionCommand):
'''Add Tql stuff to Command'''
def __init__(self, cli, argv0):
OptionCommand.__init__(self, cli, argv0)
self.rpc = cli.rpc
self.set_usage("%prog [options] <tql>")
# set tql filter stuff
self.tql_filter = ""
self.add_option("-r", "--raw", action="callback", dest="raw",
callback=self._cb_raw,
help="Don't append security filter to TQL")
# set tql check stuff
self.add_option("-d", "--direct", action="store_true", dest="direct",
help="Directly send TQL to server")
self.add_option("-q", "--quiet", action="store_false", dest="status",
help="Dont status of call request")
# tql printer option
self.add_option("--print-tql", action="store_true", dest="tql_print",
help="Print TQL before sending to server")
# set tagdisplay stuff
self.tdr = self.cli.tagdisplay.resolve
self.tdc = self.cli.tagdisplay.color
self.tdtc = self.cli.tagdisplay.titlecolor
self.add_option("--no-tagdisplay", action="callback", dest="tagdisplay",
callback=self._cb_notagdisplay,
help="No tagdisplay custom display")
def _cb_notagdisplay(self, option, opt, value, parser):
'''Callback for option --no-tagdisplay'''
self.tdr = lambda tagname, tagvalue: tagvalue
self.tdc = self.cli.tagdisplay.default_color
self.tdtc = self.cli.tagdisplay.default_titlecolor
def _cb_raw(self, option, opt, value, parser):
'''Callback for option --raw'''
self.tql_filter = ""
def rpccall(self, *args, **kwargs):
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
'''
Call a RPC method an show tql return
_callback: call function _callback after each rpccall
_status: display call status
_tql_index: is index in args where filter should be appended (def: 1)
_tql_print: print tql with filter
_exception: catch or not RPCError exception
'''
# set default option value
_options = { "status": True,
"direct": False,
"exception": False,
"tql": "",
"tql_index": 1,
"tql_print": False,
"callback": None,
}
# check for options modifiers
for o in _options.keys():
_o = "_%s"%o
if _o in kwargs:
_options[o] = kwargs[_o]
del kwargs[_o]
elif o in dir(self.options):
x = getattr(self.options, o)
if x is not None:
_options[o] = x
# check tql index and get a copy
if _options["tql_index"] < 0 or _options["tql_index"] >= len(args):
raise cmdError("No indexed TQL")
# append filter (empty if raw mode)
if self.tql_filter != "":
l = list(args)
l[_options["tql_index"]] += self.tql_filter
args = tuple(l)
# Tql printer
if _options["tql_print"]:
self.printer.out("TQL: %s"%args[_options["tql_index"]])
# Tql check
if _options["direct"]:
return self._unsecure_rpccall(_options, args, kwargs)
return self._secure_rpccall(_options, args, kwargs)
def _unsecure_rpccall(self, _options, args, kwargs):
'''Just call an RPC without checking before'''
try:
d = self.rpc.call(*args, **kwargs)
if _options["callback"] is not None:
_options["callback"](d)
if _options["status"]:
return d
except RpcError as e:
if _options["exception"]:
raise
raise cmdError("RPCError: %s"%str(e))
def _secure_rpccall(self, _options, args, kwargs):
'''Call RPC after listing, confirmation and with id'''
# get objects id
objs = self.cli.rpc.call("list", args[_options["tql_index"]])
except RpcError as e:
raise cmdError("RPCError: %s"%str(e))
# no result, goodbye
raise cmdError("No selected object by TQL.")
self.printer.out("Objects:")
self.print_objects(objs)
self.printer.out("Objects count: %s"%len(objs["objects"]))
# be sure boby want do that
if self.printer.ask("%sProceed?%s (yes): "%(color["lred"], color["reset"])) != "yes":
raise cmdWarning("User aborted")
# bobby doing many things, he needs to be really sure!
self.printer.out("%sYou will act on more than 5 objets!%s"%(color["uyellow"], color["reset"]))
if self.printer.ask("%sAre you really sure?%s (Yes Mistress): "
%(color["lred"], color["reset"])) != "Yes Mistress":
raise cmdWarning("User aborted")
# per validated id execution (this is a kind of atomic implementation)
l[_options["tql_index"]] = "id=%s"%obj["id"]
d = self.cli.rpc.call(*tuple(l), **kwargs)
if _options["callback"] is not None:
_options["callback"](obj["output"])
except RpcError as e:
self.printer.error("RPCError: %s"%str(e))
def print_objects(self, objectlist, ignore=None):
'''Trivial objectlist printing of tag'''
if objectlist is None:
return
_order = objectlist.get("order", None)
for o in objectlist["objects"]:
self.print_tags(o, order=_order, ignore=ignore)
def print_tags(self, taglist, order=None, ignore=None):
'''Display a tag with tagdisplay settings'''
_ignore = () if ignore is None else ignore
_order = () if order is None else order
# copy dict to show
tl = taglist.copy()
# remove ignore tags
for tn in _ignore:
tl.pop(tn, None)
# list to print
pls = []
# print firstly order tags
for tn in _order:
tv = tl.pop(tn, None)
if tv is not None:
pls.append("%s%s:%s%s"%(self.tdtc(tn), tn, self.tdc(tn), self.tdr(tn, tv)))
# print next tags without order
for (tn, tv) in tl.items():
pls.append("%s%s:%s%s"%(self.tdtc(tn), tn, self.tdc(tn), self.tdr(tn, tv)))
self.printer.out("%s%s"%(" ".join(pls), color["reset"]))