Skip to content
Snippets Groups Projects
cli.py 6.76 KiB
Newer Older
#!/usr/bin/env python
#coding=utf8

'''
Seblu's avatar
Seblu committed
CloudControl CLI main module
import cccli
Seblu's avatar
Seblu committed
from cccli.exception import *
Seblu's avatar
Seblu committed
from cccli.printer import Printer, color
Seblu's avatar
Seblu committed
from cccli.command import Commands, Aliases
from cccli.handler import CliHandler
from cccli.tagdisplay import TagDisplay
from sjrpc.core.exceptions import *
from sjrpc.client import SimpleRpcClient
Seblu's avatar
Seblu committed
from sjrpc.utils import RpcHandler, pure
import os, os.path
import sys
import re
import subprocess
import platform
import shlex
import StringIO
Seblu's avatar
Seblu committed
    def __init__(self, settings):
        self.settings = settings
Seblu's avatar
Seblu committed
        self.rpc = None
Seblu's avatar
Seblu committed
        self.aliases = Aliases()
        self.tagdisplay = TagDisplay()
Seblu's avatar
Seblu committed
        self.prompt = ""
Seblu's avatar
Seblu committed
    def start(self, line=""):
Seblu's avatar
Seblu committed
        '''Start a CLI'''
Seblu's avatar
Seblu committed
        # line stuff
Seblu's avatar
Seblu committed
        if line:
Seblu's avatar
Seblu committed
            sys.stdin = StringIO.StringIO(line)
        # start printer
        self.printer = Printer()
Seblu's avatar
Seblu committed
        # set interactive mode
        if sys.stderr.isatty() and sys.stdin.isatty():
            self.printer.debug("Interactive mode")
Seblu's avatar
Seblu committed
            self.prompt = "%s%s%s%s>%s%s%s "%(color["["],
                                              color["light"],
                                              color["]"],
                                              self.settings["login"],
                                              color["["],
                                              color["reset"],
                                              color["]"])
            self.printer.set_interactive()
            # load history
Seblu's avatar
Seblu committed
            self.printer.history.read(self.settings.get("history", ""))
            self.printer.history.maxsize(self.settings.get("hsize", None))
            self.printer.debug("Loaded history: %s"%len(self.printer.history))
            # enable completion
            self.printer.completion.set_completer(self._command_completer)
        # connecting
        self._connect()
        # auth
        self._auth()
        # load commands (need to be done after connection)
Seblu's avatar
Seblu committed
        self.commands = Commands(self)
        self.printer.debug("Remote functions: %d"%len(self.commands.functions))
Seblu's avatar
Seblu committed
        self.printer.debug("Loaded commands: %d"%len(self.commands))
        # load alias
Seblu's avatar
Seblu committed
        self.aliases.load(self.settings.get("alias", None))
        self.printer.debug("Loaded aliases: %d"%len(self.aliases))
Seblu's avatar
Seblu committed
        self.tagdisplay.load(self.settings.get("tagdisplay", ""))
Seblu's avatar
Seblu committed
        self._parse_loop()
Seblu's avatar
Seblu committed
        # save history
        self.printer.history.write(self.settings.get("history", None))
Seblu's avatar
Seblu committed

Seblu's avatar
Seblu committed
    def _connect(self):
        '''Connect to a cloud control server'''
Seblu's avatar
Seblu committed
        self.printer.debug("Connecting...")
Seblu's avatar
Seblu committed
        try:
            self.rpc = SimpleRpcClient.from_addr(self.settings["server"],
                                                 self.settings["port"],
                                                 enable_ssl=True,
                                                 default_handler=CliHandler(),
                                                 on_disconnect="quit",
                                                 timeout=self.settings["cmdtimeout"],
                                                 conn_timeout=self.settings["contimeout"]
                                                 )
            self.rpc.start(daemonize=True)
        except Exception as e:
            s = "Connection failure!" if not str(e) else "Connection failure: %s"%str(e)
            raise cliError(s)
Seblu's avatar
Seblu committed
        self.printer.debug("Connected.")
Seblu's avatar
Seblu committed

Seblu's avatar
Seblu committed
    def _auth(self):
Seblu's avatar
Seblu committed
        '''Handle server authentification'''
Seblu's avatar
Seblu committed
        self.printer.debug("Authenticating...")
            self.rpc.call("authentify", self.settings["login"], self.settings["pass"])
        except Exception as e:
            s = "Authentication failure!" if not str(e) else "Authentication failure: %s"%str(e)
            raise cliError(s)
        self.printer.debug("Authenticated.")
Seblu's avatar
Seblu committed
    def _parse_loop(self):
        '''Parse lines'''
Seblu's avatar
Seblu committed
        while True:
            try:
Seblu's avatar
Seblu committed
                line = self.printer.getline(self.prompt)
                self._parse(line)
            except KeyboardInterrupt:
                self.printer.out("")
Seblu's avatar
Seblu committed
            except EOFError:
                break
            except SystemExit:
                break
        self.printer.interactive("tcho!")
Seblu's avatar
Seblu committed
    def _parse(self, line):
        '''Parse a string'''
        try:
            argv = shlex.split(line, comments=True)
        except ValueError as e:
            self.printer.error("Lexer: %s"%str(e))
            return
        if len(argv) == 0:
            return
        # alias subsitution
        argv = self.aliases.substitute(argv)
        # command execution
        self._exec_command(argv)

Seblu's avatar
Seblu committed
    def _exec_command(self, argv):
        '''Execute command'''
        try:
            # handle ! in command name
            if argv[0][0] == "!":
Seblu's avatar
Seblu committed
                argv[0] = argv[0][1:]
                if not len(argv[0]):
                    return
                p = subprocess.Popen(argv, close_fds=True, shell=True)
Seblu's avatar
Seblu committed
                p.wait()
                ret = p.returncode
                return
            # handle ? in command name
            if argv[0][0] == "?":
                if len(argv[0]) > 1:
                    argv.insert(1, argv[0][1:])
                argv[0] = "help"
            # execute command
            self.printer.debug("argv: %s"%argv)
Seblu's avatar
Seblu committed
            self.commands(argv)
Seblu's avatar
Seblu committed
        except cmdBadArgument as e:
Seblu's avatar
Seblu committed
            if str(e):
Seblu's avatar
Seblu committed
                self.printer.error("Bad argument: %s."%str(e))
Seblu's avatar
Seblu committed
            else:
Seblu's avatar
Seblu committed
                self.printer.error("Bad argument.")
Seblu's avatar
Seblu committed
            usage = self.commands.usage(argv[0])
Seblu's avatar
Seblu committed
            if usage != "":
Seblu's avatar
Seblu committed
                self.printer.out(usage)
Seblu's avatar
Seblu committed
        except cmdBadName:
            self.printer.error("No command: %s."%argv[0])
        except cmdWarning as e:
            self.printer.warn("%s: %s"%(argv[0], str(e)))
        except cmdError as e:
            self.printer.error("%s: %s"%(argv[0], str(e)))
Seblu's avatar
Seblu committed
        except EOFError:
            self.printer.out("")
Seblu's avatar
Seblu committed
        except Exception as e:
            if cccli.debug: raise
Seblu's avatar
Seblu committed
            self.printer.error("%s: %s"%(type(e), str(e)))
            self.printer.warn("This is a not expected error, please report it!")
    def _command_completer(self, texte):
        '''Return the list of completion'''
        comp = self.printer.completion
        stripped = comp.get_buf()[:comp.get_begin() + 1].lstrip()
        if texte == "" and stripped != "":
            return ()
        if len(texte) > 0 and texte[0] == "!":
            return ()
        if len(texte) > 0 and texte[0] == "?":
Seblu's avatar
Seblu committed
            return [ "?%s"%c for c in  list(self.commands) + self.aliases.keys() if c.startswith(texte[1:]) ]
        return [ c for c in  list(self.commands) + self.aliases.keys() if c.startswith(texte) ]