-
Seblu authored
fix traceback when typing enter with history disable (method Printer().ask()) clean quit on server disconnect prompt in color add login in prompt new cli exceptions class proper handle auth failure
Seblu authoredfix traceback when typing enter with history disable (method Printer().ask()) clean quit on server disconnect prompt in color add login in prompt new cli exceptions class proper handle auth failure
command.py 8.45 KiB
#!/usr/bin/env python
#coding=utf8
'''
CloudControl CLI command module
'''
import os, os.path
import sys
import re
import pprint
import ConfigParser
from cccli.exception import *
from cccli.printer import color
from sjrpc.client import SimpleRpcClient
from sjrpc.core.exceptions import *
class Command(object):
def __init__(self, argv, cli):
# check argv
if len(argv) < 1:
raise BadCommand()
# check valid command chars
if not re.match("^[a-zA-Z0-9]+", argv[0]):
raise BadCommand("Invalid command name")
cmdlist = [ x[4:] for x in dir(self) if x.startswith("cmd_") ]
matchlist = [ x for x in cmdlist if re.match("%s.+"%argv[0], x) ]
if argv[0] in cmdlist:
pass
elif len(matchlist) == 1:
argv[0] = matchlist[0]
elif len(matchlist) > 1:
raise BadCommand("Too many command: %s"%", ".join(matchlist))
else:
raise BadCommand()
self._cmd = getattr(self, "cmd_%s"%argv[0])
self._argv = argv
self.cli = cli
self.printer = cli.printer
@classmethod
def usage(cls, cmdname):
'''Return usage of a command'''
fname = "cmd_%s"%cmdname
if not hasattr(cls, fname):
raise BadArgument(cmdname)
if hasattr(getattr(cls, fname), "usage"):
return getattr(getattr(cls, fname), "usage")
return ""
def call(self):
'''Run command'''
if re.match("^[a-zA-Z0-9]+$", self._argv[0]):
name = "cmd_%s"%self._argv[0]
if hasattr(self, name):
cmd = getattr(self, name)
return cmd(self._argv)
raise BadCommand(self._argv[0])
def cmd_exit(self, argv):
'''Quit application with respect'''
raise SystemExit()
cmd_exit.usage = "exit"
cmd_exit.desc = "Quit application with respect"
def cmd_quit(self, argv):
'''Quit application with respect'''
raise SystemExit()
cmd_quit.usage = "quit"
cmd_quit.desc = "Quit application with respect"
def cmd_version(self, argv):
'''Print cli version'''
self.printer.out(cccli.version)
cmd_version.usage = "version"
cmd_version.desc = "Print cli version"
def cmd_usage(self, argv):
'''Print usage of a command'''
if len(argv) != 2:
raise BadArgument()
usage = Command.usage(argv[1])
if usage != "":
self.printer.out("usage: %s"%usage)
else:
self.printer.out("No usage.")
cmd_usage.usage = "usage [command]"
cmd_usage.desc = "Print usage of a command"
def cmd_help(self, argv):
'''Print help'''
if len(argv) == 1:
# build command list
cmdlist = list()
doclist = list()
for x in dir(self):
m = re.match("^cmd_([a-zA-Z0-9]+)$", x)
if m:
cmdlist.append(m.group(1))
if hasattr(getattr(self, x), "__doc__"):
doclist.append(getattr(getattr(self, x), "__doc__"))
# printing commands list
width = max(map(len, cmdlist)) + 3
self.printer.out("%sCommands:%s"%(color["lwhite"], color["reset"]))
for c, d in zip(cmdlist, doclist):
line = "%s"%c
line = line.ljust(width,)
line += "- %s"%d
self.printer.out(line)
elif len(argv) == 2:
fname = "cmd_%s"%argv[1]
if hasattr(self, fname):
if hasattr(getattr(self, fname), "__doc__"):
self.printer.out("Description: %s"%getattr(getattr(self, fname), "__doc__"))
if hasattr(getattr(self, fname), "usage"):
self.printer.out("Usage: %s"%getattr(getattr(self, fname), "usage"))
if hasattr(getattr(self, fname), "details"):
Self.Printer.out("Details: %s"%getattr(getattr(self, fname), "details"))
else:
raise BadArgument(argv[1])
else:
raise BadArgument()
cmd_help.usage = "help [command]"
cmd_help.desc = "Print help about a command"
def cmd_alias(self, argv):
'''Show or create alias'''
if len(argv) == 1:
for n, v in self.cli.alias.items():
self.printer.out("%s=%s"%(n, v))
elif len(argv) == 2:
if argv[1] not in self.cli.alias:
raise BadArgument(argv[1])
self.printer.out("%s=%s"%(argv[1], self.cli.alias[argv[1]]))
elif len(argv) == 3:
self.cli.alias[argv[1]] = argv[2]
self.cli.alias.save(self.cli.settings.get("alias", ""))
else:
raise BadArgument()
cmd_alias.usage = "alias [name] [value]"
cmd_alias.desc = "Show or create aliases"
def cmd_unalias(self, argv):
'''Remove an alias'''
if len(argv) != 2:
raise BadArgument()
if argv[1] not in self.cli.alias:
raise BadArgument("%s: No such alias"%argv[1])
del self.cli.alias[argv[1]]
self.cli.alias.save(self.cli.settings.get("alias", ""))
cmd_unalias.usage = "unalias [name]"
cmd_unalias.desc = "Remove an aliases"
def cmd_rcmd(self, argv):
'''Show remote commands'''
for cmds in self.cli.rpc.list_commands():
self.printer.out("%s"%cmds["name"])
cmd_rcmd.usage = "rcmd"
cmd_rcmd.desc = "Print remote command list"
def cmd_history(self, argv):
'''Show history of commands'''
if not self.cli.history:
raise cmdError("History is not available")
for l in self.cli.history:
self.printer.out(l)
cmd_history.usage = "history"
cmd_history.desc = "Show commands history"
def cmd_list(self, argv):
'''List objects'''
if len(argv) == 1:
argv.append("a")
items = self.cli.rpc.list(str.join("", argv[1:]))
for item in items:
pprint.pprint(item)
#for key, val in item.items():
# self.printer.out("%s: %s "%(key, val))
cmd_list.usage = "list [tql]"
def _startstopsdestroypauseresume(self, argv):
# arg stuff
if len(argv) == 1:
raise BadArgument()
tql = str.join(" ", argv[1:])
# print tql list result
items = self.cli.rpc.list(tql)
if len(items) == 0:
self.printer.out("No selected object")
return
self.printer.out("Your request give the following result:")
for item in items:
self.printer.out("%s"%item["id"])
self.printer.out("Count: %s"%len(items))
if self.printer.ask("Are you sure to %s? (yes/NO) "%argv[0]) != "yes":
raise cmdWarning("Aborted")
if len(items) > 5:
if self.printer.ask("You request is on more than 5 objets. Are you really sure to %s its? (Yes, I am) "%argv[0]) != "Yes, I am":
raise cmdWarning("Aborted")
self.cli.rpc[argv[0]](tql)
def cmd_start(self, argv):
'''Start objects'''
self._startstopsdestroypauseresume(argv)
cmd_start.usage = "start [tql]"
def cmd_stop(self, argv):
'''Stop objects'''
self._startstopsdestroypauseresume(argv)
cmd_stop.usage = "stop [tql]"
def cmd_pause(self, argv):
'''Pause objects'''
self._startstopsdestroypauseresume(argv)
cmd_pause.usage = "pause [tql]"
def cmd_resume(self, argv):
'''Resume objects'''
self._startstopsdestroypauseresume(argv)
cmd_resume.usage = "resume [tql]"
def cmd_destroy(self, argv):
'''Force objects to stop'''
self._startstopsdestroypauseresume(argv)
cmd_destroy.usage = "destroy [tql]"
class Alias(dict):
''' Alias wrapper'''
def load(self, filename):
'''load alias from file'''
if os.access(filename, os.R_OK):
fparser = ConfigParser.SafeConfigParser()
fparser.read(filename)
if fparser.has_section("alias"):
self.clear()
self.update(fparser.items("alias"))
def save(self, filename):
'''save alias on file'''
if os.access(filename, os.R_OK or os.W_OK):
fparser = ConfigParser.SafeConfigParser()
fparser.read(filename)
fparser.remove_section("alias")
fparser.add_section("alias")
for n,v in self.items():
fparser.set("alias", n, v)
fparser.write(open(filename, "w"))