#!/usr/bin/env python #coding=utf8 ''' CloudControl CLI main module ''' import cccli from cccli.exception import * from cccli.printer import Printer, color from cccli.commands import Commands, Alias from cccli.handler import CliHandler from cccli.tagdisplay import TagDisplay from sjrpc.core.exceptions import * from sjrpc.client import SimpleRpcClient from sjrpc.utils import RpcHandler, pure import os, os.path import sys import re import subprocess import platform import shlex import StringIO class Cli(object): def __init__(self, settings): self.settings = settings self.rpc = None self.alias = Alias() self.tagdisplay = TagDisplay() self.prompt = "" def start(self, line=""): '''Start a CLI''' # line stuff if line: sys.stdin = StringIO.StringIO(line) # start printer self.printer = Printer() # set interactive mode if sys.stderr.isatty() and sys.stdin.isatty(): self.printer.debug("Interactive mode") self.prompt = "%s%s%s%s>%s%s%s "%(color["["], color["light"], color["]"], self.settings["login"], color["["], color["reset"], color["]"]) self.printer.set_interactive() # load history self.printer.history.read(self.settings.get("history", "")) self.printer.history.maxsize(self.settings.get("hsize", None)) self.printer.debug("Loaded history: %s"%len(self.printer.history)) # enable completion self.printer.completion.set_completer(self._command_completer) # load commands self.commands = Commands(self) self.printer.debug("Loaded commands: %d"%len(self.commands)) # load alias self.alias.load(self.settings.get("alias", "")) self.printer.debug("Loaded aliases: %d"%len(self.alias)) # load tagdisplay self.tagdisplay.load(self.settings.get("tag", "")) # connecting self._connect() # auth self._auth() # parsing self._parse() # save history self.printer.history.write(self.settings.get("history", "")) def _connect(self): '''Connect to a cloud control server''' self.printer.debug("Connecting...") try: self.rpc = SimpleRpcClient.from_addr(self.settings["server"], self.settings["port"], enable_ssl=True, default_handler=CliHandler(), on_disconnect="quit", timeout=self.settings["timeout"] ) self.rpc.start(daemonize=True) except Exception as e: s = "Connection failure!" if not str(e) else "Connection failure: %s"%str(e) raise cliError(s) self.printer.debug("Connected.") def _auth(self): '''Handle server authentification''' self.printer.debug("Authenticating...") try: self.rpc.call("authentify", self.settings["login"], self.settings["pass"]) except Exception as e: s = "Authentication failure!" if not str(e) else "Authentication failure: %s"%str(e) raise cliError(s) self.printer.debug("Authenticated.") def _parse(self): '''Parse a line''' while True: try: try: argv = shlex.split(self.printer.getline(self.prompt), comments=True) except ValueError as e: self.printer.error("Lexer: %s"%str(e)) continue if len(argv) == 0: continue # alias subsitution if argv[0] in self.alias: oldargv = argv[1:] argv = shlex.split(self.alias[argv[0]]) argv.extend(oldargv) self._exec_command(argv) except KeyboardInterrupt: self.printer.out("") continue except EOFError: break except SystemExit: break self.printer.interactive("tcho!") def _exec_command(self, argv): '''Execute command''' try: # handle ! in command name if argv[0][0] == "!": argv[0] = argv[0][1:] if not len(argv[0]): return p = subprocess.Popen(argv, close_fds=True, shell=True) p.wait() ret = p.returncode return # handle ? in command name if argv[0][0] == "?": if len(argv[0]) > 1: argv.insert(1, argv[0][1:]) argv[0] = "help" # execute command self.printer.debug("argv: %s"%argv) self.commands(argv) except cmdBadArgument as e: if str(e): self.printer.error("Bad argument: %s."%str(e)) else: self.printer.error("Bad argument.") usage = self.commands.usage(argv[0]) if usage != "": self.printer.out(usage) except cmdBadName: self.printer.error("No command: %s."%argv[0]) except cmdWarning as e: self.printer.warn("%s: %s"%(argv[0], str(e))) except cmdError as e: self.printer.error("%s: %s"%(argv[0], str(e))) except EOFError: self.printer.out("") except Exception as e: if cccli.debug: raise self.printer.error("%s: %s"%(type(e), str(e))) self.printer.warn("This is a not expected error, please report it!") def _command_completer(self, texte): '''Return the list of completion''' comp = self.printer.completion stripped = comp.get_buf()[:comp.get_begin() + 1].lstrip() if texte == "" and stripped != "": return () if len(texte) > 0 and texte[0] == "!": return () if len(texte) > 0 and texte[0] == "?": return [ "?%s"%c for c in list(self.commands) + self.alias.keys() if c.startswith(texte[1:]) ] return [ c for c in list(self.commands) + self.alias.keys() if c.startswith(texte) ]