Skip to content
Snippets Groups Projects
cli.py 6.61 KiB
Newer Older
#!/usr/bin/env python
#coding=utf8

'''
Seblu's avatar
Seblu committed
CloudControl CLI main module
Seblu's avatar
Seblu committed

import cccli
Seblu's avatar
Seblu committed
from cccli.exception import *
Seblu's avatar
Seblu committed
from cccli.printer import Printer, color
Seblu's avatar
Seblu committed
from cccli.commands import Commands, Aliases
from cccli.handler import CliHandler
from cccli.tagdisplay import TagDisplay
from sjrpc.core.exceptions import *
from sjrpc.client import SimpleRpcClient
Seblu's avatar
Seblu committed
from sjrpc.utils import RpcHandler, pure
import os, os.path
import sys
import re
import subprocess
import platform
import shlex
import StringIO
Seblu's avatar
Seblu committed
    def __init__(self, settings):
        self.settings = settings
Seblu's avatar
Seblu committed
        self.rpc = None
Seblu's avatar
Seblu committed
        self.aliases = Aliases()
        self.tagdisplay = TagDisplay()
Seblu's avatar
Seblu committed
        self.prompt = ""
Seblu's avatar
Seblu committed
    def start(self, line=""):
Seblu's avatar
Seblu committed
        '''Start a CLI'''
Seblu's avatar
Seblu committed
        # line stuff
Seblu's avatar
Seblu committed
        if line:
Seblu's avatar
Seblu committed
            sys.stdin = StringIO.StringIO(line)
        # start printer
        self.printer = Printer()
Seblu's avatar
Seblu committed
        # set interactive mode
        if sys.stderr.isatty() and sys.stdin.isatty():
            self.printer.debug("Interactive mode")
Seblu's avatar
Seblu committed
            self.prompt = "%s%s%s%s>%s%s%s "%(color["["],
                                              color["light"],
                                              color["]"],
                                              self.settings["login"],
                                              color["["],
                                              color["reset"],
                                              color["]"])
            self.printer.set_interactive()
            # load history
Seblu's avatar
Seblu committed
            self.printer.history.read(self.settings.get("history", ""))
            self.printer.history.maxsize(self.settings.get("hsize", None))
            self.printer.debug("Loaded history: %s"%len(self.printer.history))
            # enable completion
            self.printer.completion.set_completer(self._command_completer)
Seblu's avatar
Seblu committed
        # load commands
        self.commands = Commands(self)
        self.printer.debug("Loaded commands: %d"%len(self.commands))
        # load alias
Seblu's avatar
Seblu committed
        self.aliases.load(self.settings.get("alias", None))
        self.printer.debug("Loaded aliases: %d"%len(self.aliases))
Seblu's avatar
Seblu committed
        self.tagdisplay.load(self.settings.get("tagdisplay", ""))
        # connecting
        self._connect()
        # auth
        self._auth()
Seblu's avatar
Seblu committed
        self._parse()
        # save history
        self.printer.history.write(self.settings.get("history", None))
Seblu's avatar
Seblu committed

Seblu's avatar
Seblu committed
    def _connect(self):
        '''Connect to a cloud control server'''
Seblu's avatar
Seblu committed
        self.printer.debug("Connecting...")
Seblu's avatar
Seblu committed
        try:
            self.rpc = SimpleRpcClient.from_addr(self.settings["server"],
Seblu's avatar
Seblu committed
                                             self.settings["port"],
                                             enable_ssl=True,
                                             default_handler=CliHandler(),
                                             on_disconnect="quit",
                                             timeout=self.settings["timeout"]
Seblu's avatar
Seblu committed
                                        )
            self.rpc.start(daemonize=True)
        except Exception as e:
            s = "Connection failure!" if not str(e) else "Connection failure: %s"%str(e)
            raise cliError(s)
Seblu's avatar
Seblu committed
        self.printer.debug("Connected.")
Seblu's avatar
Seblu committed

Seblu's avatar
Seblu committed
    def _auth(self):
Seblu's avatar
Seblu committed
        '''Handle server authentification'''
Seblu's avatar
Seblu committed
        self.printer.debug("Authenticating...")
            self.rpc.call("authentify", self.settings["login"], self.settings["pass"])
        except Exception as e:
            s = "Authentication failure!" if not str(e) else "Authentication failure: %s"%str(e)
            raise cliError(s)
        self.printer.debug("Authenticated.")
Seblu's avatar
Seblu committed
    def _parse(self):
        '''Parse a line'''
Seblu's avatar
Seblu committed
        while True:
            try:
Seblu's avatar
Seblu committed
                try:
Seblu's avatar
Seblu committed
                    argv = shlex.split(self.printer.getline(self.prompt), comments=True)
Seblu's avatar
Seblu committed
                except ValueError as e:
                    self.printer.error("Lexer: %s"%str(e))
                    continue
                if len(argv) == 0:
                    continue
                # alias subsitution
Seblu's avatar
Seblu committed
                if argv[0] in self.aliases:
Seblu's avatar
Seblu committed
                    oldargv = argv[1:]
Seblu's avatar
Seblu committed
                    argv = shlex.split(self.aliases[argv[0]])
Seblu's avatar
Seblu committed
                    argv.extend(oldargv)
                self._exec_command(argv)
            except KeyboardInterrupt:
                self.printer.out("")
                continue
Seblu's avatar
Seblu committed
            except EOFError:
                break
            except SystemExit:
                break
        self.printer.interactive("tcho!")
Seblu's avatar
Seblu committed
    def _exec_command(self, argv):
        '''Execute command'''
        try:
            # handle ! in command name
            if argv[0][0] == "!":
Seblu's avatar
Seblu committed
                argv[0] = argv[0][1:]
                if not len(argv[0]):
                    return
                p = subprocess.Popen(argv, close_fds=True, shell=True)
Seblu's avatar
Seblu committed
                p.wait()
                ret = p.returncode
                return
            # handle ? in command name
            if argv[0][0] == "?":
                if len(argv[0]) > 1:
                    argv.insert(1, argv[0][1:])
                argv[0] = "help"
            # execute command
            self.printer.debug("argv: %s"%argv)
Seblu's avatar
Seblu committed
            self.commands(argv)
Seblu's avatar
Seblu committed
        except cmdBadArgument as e:
Seblu's avatar
Seblu committed
            if str(e):
Seblu's avatar
Seblu committed
                self.printer.error("Bad argument: %s."%str(e))
Seblu's avatar
Seblu committed
            else:
Seblu's avatar
Seblu committed
                self.printer.error("Bad argument.")
Seblu's avatar
Seblu committed
            usage = self.commands.usage(argv[0])
Seblu's avatar
Seblu committed
            if usage != "":
Seblu's avatar
Seblu committed
                self.printer.out(usage)
Seblu's avatar
Seblu committed
        except cmdBadName:
            self.printer.error("No command: %s."%argv[0])
        except cmdWarning as e:
            self.printer.warn("%s: %s"%(argv[0], str(e)))
        except cmdError as e:
            self.printer.error("%s: %s"%(argv[0], str(e)))
Seblu's avatar
Seblu committed
        except EOFError:
            self.printer.out("")
Seblu's avatar
Seblu committed
        except Exception as e:
            if cccli.debug: raise
Seblu's avatar
Seblu committed
            self.printer.error("%s: %s"%(type(e), str(e)))
            self.printer.warn("This is a not expected error, please report it!")
    def _command_completer(self, texte):
        '''Return the list of completion'''
        comp = self.printer.completion
        stripped = comp.get_buf()[:comp.get_begin() + 1].lstrip()
        if texte == "" and stripped != "":
            return ()
        if len(texte) > 0 and texte[0] == "!":
            return ()
        if len(texte) > 0 and texte[0] == "?":
Seblu's avatar
Seblu committed
            return [ "?%s"%c for c in  list(self.commands) + self.aliases.keys() if c.startswith(texte[1:]) ]
        return [ c for c in  list(self.commands) + self.aliases.keys() if c.startswith(texte) ]