command.py 14.94 KiB
#!/usr/bin/env python
#coding=utf8
'''
CloudControl CLI command module
'''
import re
import ConfigParser
import os
import shlex
import imp
from cccli.exception import *
from sjrpc.core.exceptions import *
from cccli.printer import Printer, color
from optparse import OptionParser
import cccli.commands
class Commands(object):
'''Command manager'''
def __init__(self, cli):
# save cli context
self.cli = cli
# build command list
self.cmds = self.load_commands(cccli.commands.__path__[0], Command)
# build remote function list
try:
self.functions = set([ c["name"] for c in self.cli.rpc.call("functions") ])
except RpcError as e:
raise cliError("RPCError: Unable to retrieve remote commands: %s"%str(e))
# remove not available remote commands
for cname in tuple(self.cmds):
cobj = self.cmds[cname](self.cli, cname)
if isinstance(cobj, RemoteCommand):
try:
if len(cobj.remote_functions()) == 0:
raise NotImplementedError("No remote function")
if not cobj.remote_functions().issubset(self.functions):
del self.cmds[cname]
self.cli.printer.debug("Command %s not available"%cname)
except NotImplementedError as e:
self.cli.printer.debug("Command %s lack of remote_functions"%cname)
del self.cmds[cname]
def __len__(self):
return len(self.cmds)
def __contains__(self, item):
return item in self.cmds.keys()
def __iter__(self):
return iter(self.cmds.keys())
def __repr__(self):
return repr(self.cmds.keys())
def __call__(self, argv):
# check argv
if len(argv) == 0:
raise cmdBadName()
# find right commands to call
if argv[0] not in self:
matchlist = [ x for x in self if re.match("%s.+"%re.escape(argv[0]), x) ]
if len(matchlist) == 1:
argv[0] = matchlist[0]
else:
raise cmdBadName()
# create class and call it
cmd = self.cmds[argv[0]](self.cli, argv[0])
return cmd(argv)
def usage(self, argv0):
'''Return usage of a command'''
u = self.cmds[argv0](self.cli, argv0).usage()
return u if u is not None else ""
def help(self, argv0):
'''Return of a command'''
h = self.cmds[argv0](self.cli, argv0).help()
return h if h is not None else ""
def load_commands(self, path, cls):
'''Load sublasss of cls from package name'''
cmds=dict()
for name in os.listdir(path):
if name.endswith(".py") and not name.startswith("__"):
fpath = os.path.join(path, name)
module = imp.load_source(os.path.splitext(name)[0], fpath)
for key, entry in module.__dict__.items():
try:
if issubclass(entry, cls) and entry.__name__.startswith("Command_"):
cmds[entry.__name__[8:]] = entry
except TypeError:
continue
return cmds
class Aliases(dict):
''' Aliases manager'''
def load(self, filename):
'''load alias from file'''
if filename is not None:
fparser = ConfigParser.RawConfigParser()
fparser.read(filename)
if fparser.has_section("alias"):
self.clear()
self.update(fparser.items("alias"))
def save(self, filename):
'''save alias on file'''
if filename is not None:
fparser = ConfigParser.RawConfigParser()
fparser.read(filename)
fparser.remove_section("alias")
fparser.add_section("alias")
for n,v in self.items():
fparser.set("alias", n, v)
fparser.write(open(filename, "w"))
def substitute(self, argv):
# trip is the number of subtitution of an alias
# maximum number of trip is set to 10
for trip in range(10):
if argv[0] in self:
oldargv = argv[1:]
argv = shlex.split(self[argv[0]])
argv.extend(oldargv)
return argv
class Command(object):
'''Base of all command class'''
def __init__(self, cli, argv0):
self.cli = cli
self.printer = self.cli.printer
self.name = argv0
def __call__(self, argv):
'''Code called when command is invoked'''
raise NotImplementedError
def name(self):
'''Name of the command'''
raise NotImplementedError
def usage(self):
return "Usage: %s"%self.name
def help(self):
return self.__doc__
class OptionCommand(Command):
'''Commands with options handling'''
class OptionCommandParser(OptionParser):
'''Parser of Option for OptionCommand'''
def error(self, e):
raise cmdBadArgument(e)
def exit(self):
raise cmdExit()
def __init__(self, cli, argv0):
Command.__init__(self, cli, argv0)
self.optionparser = OptionCommand.OptionCommandParser(prog=argv0)
self.set_usage("%prog [options]")
self.options = None
self.args = list()
def usage(self):
'''Return usage string'''
return self.optionparser.format_help().strip()
def parse_args(self, argv):
'''Wrapper to parse_args'''
(self.options, self.args) = self.optionparser.parse_args(argv[1:])
def add_option(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.add_option(*args, **kwargs)
def remove_option(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.remove_option(*args, **kwargs)
def set_usage(self, *args, **kwargs):
'''Proxy to OptionParser'''
self.optionparser.set_usage(*args, **kwargs)
class RemoteCommand(OptionCommand):
'''Command which needs connection to server'''
def __init__(self, cli, argv0):
OptionCommand.__init__(self, cli, argv0)
# ignore tag option
self.add_option("-I", "--ignore-tag", action="append",
dest="ignore", default=[],
help="Don't display a tag in objects")
# no print count at end option
self.add_option("--no-count", action="store_true",
dest="nocount", default=False,
help="Don't print count at end of objects")
# index printing
self.add_option("-i", "--index", action="store_true",
dest="index", default=False,
help="Print object lines indexed")
self.rpc = cli.rpc
def remote_functions(self):
'''Return a set of needed remote functions'''
raise NotImplementedError
def print_objects(self, objectlist):
'''Trivial objectlist printing of tag'''
if objectlist is None:
return
# get default index from local options
_order = objectlist.get("order", None)
for (i, o) in enumerate(objectlist["objects"]):
if self.options.index:
self.printer.out("[%s] "%i, nl="")
self.print_tags(o, order=_order)
if not self.options.nocount:
length = len(objectlist["objects"])
if length > 1:
self.printer.out("Objects count: %s" % length)
def print_tags(self, taglist, order=None):
'''Display a tag with tagdisplay settings'''
order = () if order is None else order
# copy dict to show
tl = taglist.copy()
# remove ignore tags
for tn in self.options.ignore:
tl.pop(tn, None)
# list to print
pls = []
# print firstly order tags
for tn in order:
tv = tl.pop(tn, None)
if tv is not None:
pls.append("%s%s:%s%s"%(self.tdtc(tn), tn, self.tdc(tn), self.tdr(tn, tv)))
# print tags without order, alpha ordered
for tn in sorted(tl.keys()):
pls.append("%s%s:%s%s"%(self.tdtc(tn), tn, self.tdc(tn), self.tdr(tn, tl[tn])))
self.printer.out("%s%s"%(" ".join(pls), color["reset"]))
class TqlCommand(RemoteCommand):
'''Command which handle TQL stuff'''
def __init__(self, cli, argv0):
RemoteCommand.__init__(self, cli, argv0)
self.set_usage("%prog [options] <tql>")
# set tql filter stuff
self.tql_filter = ""
self.add_option("-r", "--raw", action="callback", dest="raw",
callback=self._cb_raw,
help="Don't append security filter to TQL")
# set tql check stuff
self.add_option("-d", "--direct", action="store_true",
dest="direct", default=False,
help="Directly send TQL to server")
# set tql status stuff
self.add_option("-q", "--quiet", action="store_false",
dest="status", default=True,
help="Dont status of call request")
# tql printer option
self.add_option("--print-tql", action="store_true",
dest="tql_print", default=False,
help="Print TQL before sending to server")
# set tagdisplay stuff
self.tdr = self.cli.tagdisplay.resolve
self.tdc = self.cli.tagdisplay.color
self.tdtc = self.cli.tagdisplay.titlecolor
self.add_option("--no-tagdisplay", action="callback",
callback=self._cb_notagdisplay,
help="No tagdisplay custom display")
self.add_option("--no-color", action="callback",
callback=self._cb_nocolor,
help="No output coloration")
def _cb_notagdisplay(self, option, opt, value, parser):
'''Callback for option --no-tagdisplay'''
self.tdr = lambda tagname, tagvalue: tagvalue
self.tdc = self.cli.tagdisplay.default_color
self.tdtc = self.cli.tagdisplay.default_titlecolor
def _cb_nocolor(self, option, opt, value, parser):
'''Callback for option --no-color'''
self.tdr = lambda tagname, tagvalue: tagvalue
self.tdc = lambda tagname: ""
self.tdtc = lambda tagname: ""
def _cb_raw(self, option, opt, value, parser):
'''Callback for option --raw'''
self.tql_filter = ""
def rpccall(self, *args, **kwargs):
'''
Call a RPC method an show tql return
_callback: call function _callback after each rpccall
_status: display call status
_tql_index: is index in args where filter should be appended (def: 1)
_tql_print: print tql with filter
_exception: catch or not RPCError exception
_direct: call directly, no listing before
_arg_list: list of arg which are added to args one by one (=> multiple call)
'''
# set on line modified options
_saved_options = {}
for _o in kwargs.copy():
if _o.startswith("_"):
o = _o[1:]
_saved_options[o] = getattr(self.options, o) if hasattr(self.options, o) else None
setattr(self.options, o, kwargs[_o])
kwargs.pop(_o)
# set tql_index to 1 if any
if not hasattr(self.options, "tql_index"):
self.options.tql_index = 1
# check tql index
if self.options.tql_index < 0 or self.options.tql_index >= len(args):
raise cmdError("No indexed TQL")
# append filter (empty if raw mode)
if self.tql_filter != "":
l = list(args)
l[self.options.tql_index] += self.tql_filter
args = tuple(l)
# Tql printer
if self.options.tql_print:
self.printer.out("TQL: %s"%args[self.options.tql_index])
# Tql check
if self.options.direct:
out= self._unsecure_rpccall(args, **kwargs)
else:
out = self._secure_rpccall(args, **kwargs)
# restore saved options
for (k, v) in _saved_options.items():
if v is None:
delattr(self.options, k)
else:
setattr(self.options, k, v)
# return output
return out
def _unsecure_rpccall(self, args, **kwargs):
'''Just call an RPC without checking before'''
try:
if not hasattr(self.options, "arg_list"):
d = self.rpc.call(*args, **kwargs)
if hasattr(self.options, "callback"):
self.options.callback(d)
if hasattr(self.options, "status") and self.options.status:
self.options.ignore += [ "output" ]
self.print_objects(d)
else:
# arg_list mode call command for each argument
# return is returned as a list
d = []
for arg in self.options.arg_list:
args2 = args + [ arg ]
r = self.rpc.call(*args2, **kwargs)
d.append(r)
if hasattr(self.options, "callback"):
self.options.callback(r)
if hasattr(self.options, "status") and self.options.status:
self.options.ignore += [ "output" ]
self.print_objects(r)
return d
except RpcError as e:
if hasattr(self.options, "exception"):
raise
raise cmdError("RPCError: %s"%str(e))
def _secure_rpccall(self, args, **kwargs):
'''Call RPC after listing, confirmation and with id'''
# get objects id
try:
objs = self.cli.rpc.call("list", args[self.options.tql_index])
except RpcError as e:
raise cmdError("RPCError: %s"%str(e))
# no result, goodbye
if len(objs["objects"]) == 0:
raise cmdError("No selected object by TQL.")
self.printer.out("Objects:")
self.print_objects(objs)
# be sure boby want do that
if self.printer.ask("%sProceed?%s (yes): "%(color["lred"], color["reset"])) != "yes":
raise cmdWarning("User aborted")
# bobby doing many things, he needs to be really sure!
if len(objs["objects"]) > 5:
self.printer.out("%sYou will act on more than 5 objets!%s"%(color["uyellow"], color["reset"]))
if self.printer.ask("%sAre you really sure?%s (Yes Mistress): "
%(color["lred"], color["reset"])) != "Yes Mistress":
raise cmdWarning("User aborted")
# per validated id execution (this is a kind of atomic implementation)
for obj in objs["objects"]:
# make a list from tupe for editing
args2 = list(args)
# ovewrite tql object by id from object
args2[self.options.tql_index] = "id=%s"%obj["id"]
# now we can make call
self._unsecure_rpccall(args2, **kwargs)