Newer
Older
#!/usr/bin/env python
#coding=utf8
'''
CloudControl CLI command module
'''
from cccli.exception import *
from sjrpc.core.exceptions import *
from cccli.printer import Printer, color
from optparse import OptionParser
'''Base of all command class'''
def __init__(self, cli, argv0):
self.name = argv0
def usage(self):
return "Usage: %s"%self.name
def help(self):
return self.__doc__
class OptionCommand(Command):
'''Add options parser to Command'''
class OptionCommandParser(OptionParser):
'''Parser of Option for OptionCommand'''
def error(self, e):
raise cmdBadArgument(e)
def exit(self):
raise cmdExit()
def __init__(self, cli, argv0):
Command.__init__(self, cli, argv0)
self.optionparser = OptionCommand.OptionCommandParser(prog=argv0)
self.options = None
self.args = list()
'''Return usage string'''
return self.optionparser.format_help().strip()
def parse_args(self, argv):
'''Wrapper to parse_args'''
(self.options, self.args) = self.optionparser.parse_args(argv[1:])
def add_option(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.add_option(*args, **kwargs)
def remove_option(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.remove_option(*args, **kwargs)
def set_usage(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.set_usage(*args, **kwargs)
class TqlCommand(OptionCommand):
'''Add Tql stuff to Command'''
def __init__(self, cli, argv0):
OptionCommand.__init__(self, cli, argv0)
self.rpc = cli.rpc
self.set_usage("%prog [options] <tql>")
# set tql filter stuff
self.tql_filter = ""
self.add_option("-r", "--raw", action="callback", dest="raw",
callback=self._cb_raw,
help="Don't append security filter to TQL")
# set tql check stuff
self.add_option("-d", "--direct", action="store_true", dest="direct",
help="Directly send TQL to server")
self.add_option("-q", "--quiet", action="store_false", dest="status",
help="Dont status of call request")
# tql printer option
self.add_option("--print-tql", action="store_true", dest="tql_print",
help="Print TQL before sending to server")
# set tagdisplay stuff
self.tdr = self.cli.tagdisplay.resolve
self.tdc = self.cli.tagdisplay.color
self.tdtc = self.cli.tagdisplay.titlecolor
self.add_option("--no-tagdisplay", action="callback", dest="tagdisplay",
callback=self._cb_notagdisplay,
help="No tagdisplay custom display")
def _cb_notagdisplay(self, option, opt, value, parser):
'''Callback for option --no-tagdisplay'''
self.tdr = lambda tagname, tagvalue: tagvalue
self.tdc = self.cli.tagdisplay.default_color
self.tdtc = self.cli.tagdisplay.default_titlecolor
def _cb_raw(self, option, opt, value, parser):
'''Callback for option --raw'''
self.tql_filter = ""
def rpccall(self, *args, **kwargs):
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
'''
Call a RPC method an show tql return
_callback: call function _callback after each rpccall
_status: display call status
_tql_index: is index in args where filter should be appended (def: 1)
_tql_print: print tql with filter
_exception: catch or not RPCError exception
'''
# set default option value
_options = { "status": True,
"direct": False,
"exception": False,
"tql": "",
"tql_index": 1,
"tql_print": False,
"callback": None,
}
# check for options modifiers
for o in _options.keys():
_o = "_%s"%o
if _o in kwargs:
_options[o] = kwargs[_o]
del kwargs[_o]
elif o in dir(self.options):
x = getattr(self.options, o)
if x is not None:
_options[o] = x
# check tql index and get a copy
if _options["tql_index"] < 0 or _options["tql_index"] >= len(args):
raise cmdError("No indexed TQL")
# append filter (empty if raw mode)
if self.tql_filter != "":
l = list(args)
l[_options["tql_index"]] += self.tql_filter
args = tuple(l)
# Tql printer
if _options["tql_print"]:
self.printer.out("TQL: %s"%args[_options["tql_index"]])
# Tql check
if _options["direct"]:
return self._unsecure_rpccall(_options, args, kwargs)
return self._secure_rpccall(_options, args, kwargs)
def _unsecure_rpccall(self, _options, args, kwargs):
'''Just call an RPC without checking before'''
try:
d = self.rpc.call(*args, **kwargs)
if _options["callback"] is not None:
_options["callback"](d)
if _options["status"]:
self.print_status(d)
return d
except RpcError as e:
if _options["exception"]:
raise
raise cmdError("RPCError: %s"%str(e))
def _secure_rpccall(self, _options, args, kwargs):
'''Call RPC after listing, confirmation and with id'''
# get objects id
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
objs = self.cli.rpc.call("list", args[_options["tql_index"]])
except RpcError as e:
raise cmdError("RPCError: %s"%str(e))
# no result, goodbye
if len(objs) == 0:
raise cmdError("No selected object by TQL.")
self.printer.out("Objects:")
self.print_taglist(objs)
self.printer.out("Objects count: %s"%len(objs))
# be sure boby want do that
if self.printer.ask("%sProceed?%s (yes): "%(color["lred"], color["reset"])) != "yes":
raise cmdWarning("User aborted")
# bobby doing many things, he needs to be really sure!
if len(objs) > 5:
self.printer.out("%sYou will act on more than 5 objets!%s"%(color["uyellow"], color["reset"]))
if self.printer.ask("%sAre you really sure?%s (Yes Mistress): "
%(color["lred"], color["reset"])) != "Yes Mistress":
raise cmdWarning("User aborted")
# per validated id execution (this is a kind of atomic implementation)
for obj in objs:
dobj = dict(obj)
try:
l = list(args)
l[_options["tql_index"]] = "id=%s"%dobj["id"]
d = self.cli.rpc.call(*tuple(l), **kwargs)
if _options["callback"] is not None:
_options["callback"](d)
if _options["status"]:
self.print_status(d)
except RpcError as e:
self.printer.error("RPCError: %s"%str(e))
def print_taglist(self, objs):
'''Trivial listing of tag'''
for o in objs:
self.print_tags(o)
def print_tags(self, taglist):
'''Display a tag with tagdisplay settings'''
line = list()
for (tn, tv) in taglist:
line.append("%s%s:%s%s"%(self.tdtc(tn), tn, self.tdc(tn), self.tdr(tn, tv)))
self.printer.out("%s%s"%(" ".join(line), color["reset"]))
def print_status(self, outputlist):
'''Display status from an object list '''
if outputlist is not None:
for o in outputlist:
self.print_tags(o[1][1])