#!/usr/bin/env python #coding=utf8 ''' CloudControl CLI main module ''' import os, os.path import sys import re import subprocess import shlex import StringIO import cccli from cccli.exception import * from cccli.printer import Printer, color from cccli.command import Command, Alias from sjrpc.client import SimpleRpcClient from sjrpc.utils import RpcHandler, pure from sjrpc.core.exceptions import * class Cli(object): def __init__(self, settings): self.settings = settings self.rpc = None self.alias = Alias() self.prompt = "" def start(self, line=""): '''Start a CLI''' # line stuff if line: sys.stdin = StringIO.StringIO(line) # start printer self.printer = Printer() # set interactive mode if sys.stderr.isatty() and sys.stdin.isatty(): self.printer.debug("Interactive mode") self.prompt = "%s%s%s%s>%s%s%s "%(color["["], color["light"], color["]"], self.settings["login"], color["["], color["reset"], color["]"]) self.printer.set_interactive() # load history self.printer.history.load(self.settings.get("history", "")) self.printer.history.maxsize(self.settings.get("hsize", None)) self.printer.debug("Loaded history: %s"%len(self.printer.history)) # enable completion self.printer.completion.set_completer(self._command_completer) # set prompt # load alias self.alias.load(self.settings.get("alias", "")) self.printer.debug("Alias: %s"%self.alias) # connecting self._connect() # auth self._auth() # parsing self._parse() # save history self.printer.history.write(self.settings.get("history", "")) def _connect(self): '''Connect to a cloud control server''' self.printer.debug("Connecting...") try: self.rpc = SimpleRpcClient.from_addr(self.settings["server"], self.settings["port"], enable_ssl=True, default_handler=CliHandler(), on_disconnect="quit", timeout=self.settings["timeout"] ) self.rpc.start(daemonize=True) except Exception as e: s = "Connection failure!" if not str(e) else "Connection failure: %s"%str(e) raise cliError(s) self.printer.debug("Connected.") def _auth(self): '''Handle server authentification''' self.printer.debug("Authenticating...") try: self.rpc.call("authentify", self.settings["login"], self.settings["pass"]) except Exception as e: s = "Authentication failure!" if not str(e) else "Authentication failure: %s"%str(e) raise cliError(s) self.printer.debug("Authenticated.") def _parse(self): '''Parse a line''' while True: try: try: argv = shlex.split(self.printer.getline(self.prompt), comments=True) except ValueError as e: self.printer.error("Lexer: %s"%str(e)) continue if len(argv) == 0: continue # alias subsitution if argv[0] in self.alias: oldargv = argv[1:] argv = shlex.split(self.alias[argv[0]]) argv.extend(oldargv) self._exec_command(argv) except KeyboardInterrupt: self.printer.out("") continue except EOFError: break except SystemExit: break self.printer.interactive("tcho!") def _exec_command(self, argv): '''Execute command''' self.printer.debug("argv: %s"%argv) try: if (argv[0][0] == "!"): argv[0] = argv[0][1:] if not len(argv[0]): return p = subprocess.Popen(argv, close_fds=True, shell=True) p.wait() ret = p.returncode elif (argv[0] == "?"): Command(["help"], self).call() else: Command(argv, self).call() except cmdBadArgument as e: if str(e): self.printer.error("Bad argument: %s."%str(e)) else: self.printer.error("Bad argument.") usage = Command.usage(argv[0]) if usage != "": self.printer.out("usage: %s."%usage) except cmdBadName: self.printer.error("No command: %s."%argv[0]) except cmdWarning as e: self.printer.warn("%s: %s"%(argv[0], str(e))) except cmdError as e: self.printer.error("%s: %s"%(argv[0], str(e))) except EOFError: self.printer.out("") except Exception as e: if cccli.debug: raise self.printer.error("%s: %s"%(type(e), str(e))) self.printer.warn("This is a not expected error, please report it!") def _command_completer(self, texte): '''Return the list of completion''' comp = self.printer.completion stripped = comp.get_buf()[:comp.get_begin() + 1].lstrip() if texte == "" and stripped != "": return None return [ c for c in Command.list() if c.startswith(texte) ] class CliHandler(RpcHandler): '''Handle RPC incoming request''' @pure def get_tags(self, tags=()): if "version" in tags: return { "version": cccli.version } @pure def quit(self, rpc=None): Printer().fatal("Disconnected from server!")