cli.py 6.64 KiB
#!/usr/bin/env python
#coding=utf8
'''
CloudControl CLI main module
'''
import cccli
from cccli.exception import *
from cccli.printer import Printer, color
from cccli.command import Commands, Aliases
from cccli.handler import CliHandler
from cccli.tagdisplay import TagDisplay
from sjrpc.core.exceptions import *
from sjrpc.client import SimpleRpcClient
from sjrpc.utils import RpcHandler, pure
import os, os.path
import sys
import re
import subprocess
import platform
import shlex
import StringIO
class Cli(object):
def __init__(self, settings):
self.settings = settings
self.rpc = None
self.aliases = Aliases()
self.tagdisplay = TagDisplay()
self.prompt = ""
def start(self, line=""):
'''Start a CLI'''
# line stuff
if line:
sys.stdin = StringIO.StringIO(line)
# start printer
self.printer = Printer()
# set interactive mode
if sys.stderr.isatty() and sys.stdin.isatty():
self.printer.debug("Interactive mode")
self.prompt = "%s%s%s%s>%s%s%s "%(color["["],
color["light"],
color["]"],
self.settings["login"],
color["["],
color["reset"],
color["]"])
self.printer.set_interactive()
# load history
self.printer.history.read(self.settings.get("history", ""))
self.printer.history.maxsize(self.settings.get("hsize", None))
self.printer.debug("Loaded history: %s"%len(self.printer.history))
# enable completion
self.printer.completion.set_completer(self._command_completer)
# connecting
self._connect()
# auth
self._auth()
# load commands (need to be done after connection)
self.commands = Commands(self)
self.printer.debug("Remote functions: %d"%len(self.commands.functions))
self.printer.debug("Loaded commands: %d"%len(self.commands))
# load alias
self.aliases.load(self.settings.get("alias", None))
self.printer.debug("Loaded aliases: %d"%len(self.aliases))
# load tagdisplay
self.tagdisplay.load(self.settings.get("tagdisplay", ""))
# parsing
self._parse_loop()
# save history
self.printer.history.write(self.settings.get("history", None))
def _connect(self):
'''Connect to a cloud control server'''
self.printer.debug("Connecting...")
try:
self.rpc = SimpleRpcClient.from_addr(self.settings["server"],
self.settings["port"],
enable_ssl=True,
default_handler=CliHandler(),
on_disconnect="quit",
timeout=self.settings["timeout"]
)
self.rpc.start(daemonize=True)
except Exception as e:
s = "Connection failure!" if not str(e) else "Connection failure: %s"%str(e)
raise cliError(s)
self.printer.debug("Connected.")
def _auth(self):
'''Handle server authentification'''
self.printer.debug("Authenticating...")
try:
self.rpc.call("authentify", self.settings["login"], self.settings["pass"])
except Exception as e:
s = "Authentication failure!" if not str(e) else "Authentication failure: %s"%str(e)
raise cliError(s)
self.printer.debug("Authenticated.")
def _parse_loop(self):
'''Parse lines'''
while True:
try:
line = self.printer.getline(self.prompt)
self._parse(line)
except KeyboardInterrupt:
self.printer.out("")
except EOFError:
break
except SystemExit:
break
self.printer.interactive("tcho!")
def _parse(self, line):
'''Parse a string'''
try:
argv = shlex.split(line, comments=True)
except ValueError as e:
self.printer.error("Lexer: %s"%str(e))
return
if len(argv) == 0:
return
# alias subsitution
argv = self.aliases.substitute(argv)
# command execution
self._exec_command(argv)
def _exec_command(self, argv):
'''Execute command'''
try:
# handle ! in command name
if argv[0][0] == "!":
argv[0] = argv[0][1:]
if not len(argv[0]):
return
p = subprocess.Popen(argv, close_fds=True, shell=True)
p.wait()
ret = p.returncode
return
# handle ? in command name
if argv[0][0] == "?":
if len(argv[0]) > 1:
argv.insert(1, argv[0][1:])
argv[0] = "help"
# execute command
self.printer.debug("argv: %s"%argv)
self.commands(argv)
except cmdExit:
pass
except cmdBadArgument as e:
if str(e):
self.printer.error("Bad argument: %s."%str(e))
else:
self.printer.error("Bad argument.")
usage = self.commands.usage(argv[0])
if usage != "":
self.printer.out(usage)
except cmdBadName:
self.printer.error("No command: %s."%argv[0])
except cmdWarning as e:
self.printer.warn("%s: %s"%(argv[0], str(e)))
except cmdError as e:
self.printer.error("%s: %s"%(argv[0], str(e)))
except EOFError:
self.printer.out("")
except Exception as e:
if cccli.debug: raise
self.printer.error("%s: %s"%(type(e), str(e)))
self.printer.warn("This is a not expected error, please report it!")
def _command_completer(self, texte):
'''Return the list of completion'''
comp = self.printer.completion
stripped = comp.get_buf()[:comp.get_begin() + 1].lstrip()
if texte == "" and stripped != "":
return ()
if len(texte) > 0 and texte[0] == "!":
return ()
if len(texte) > 0 and texte[0] == "?":
return [ "?%s"%c for c in list(self.commands) + self.aliases.keys() if c.startswith(texte[1:]) ]
return [ c for c in list(self.commands) + self.aliases.keys() if c.startswith(texte) ]