Newer
Older
#!/usr/bin/env python
#coding=utf8
'''
from cccli.tagdisplay import TagDisplay
from sjrpc.core.exceptions import *
from sjrpc.client import SimpleRpcClient
import os, os.path
import sys
import re
import subprocess
import platform
import shlex
import StringIO
self.tagdisplay = TagDisplay()
# start printer
self.printer = Printer()
if sys.stderr.isatty() and sys.stdin.isatty():
self.printer.debug("Interactive mode")
self.prompt = "%s%s%s%s>%s%s%s "%(color["["],
color["light"],
color["]"],
self.settings["login"],
color["["],
color["reset"],
color["]"])
self.printer.set_interactive()
# load history
self.printer.history.read(self.settings.get("history", ""))
self.printer.history.maxsize(self.settings.get("hsize", None))
self.printer.debug("Loaded history: %s"%len(self.printer.history))
# enable completion
self.printer.completion.set_completer(self._command_completer)
# load commands
self.commands = Commands(self)
self.printer.debug("Loaded commands: %d"%len(self.commands))
self.aliases.load(self.settings.get("alias", None))
self.printer.debug("Loaded aliases: %d"%len(self.aliases))
# connecting
self._connect()
# auth
self._auth()
self.printer.history.write(self.settings.get("history", None))
'''Connect to a cloud control server'''
self.rpc = SimpleRpcClient.from_addr(self.settings["server"],
self.settings["port"],
enable_ssl=True,
default_handler=CliHandler(),
on_disconnect="quit",
timeout=self.settings["timeout"]
except Exception as e:
s = "Connection failure!" if not str(e) else "Connection failure: %s"%str(e)
raise cliError(s)
self.rpc.call("authentify", self.settings["login"], self.settings["pass"])
except Exception as e:
s = "Authentication failure!" if not str(e) else "Authentication failure: %s"%str(e)
raise cliError(s)
argv = shlex.split(self.printer.getline(self.prompt), comments=True)
except ValueError as e:
self.printer.error("Lexer: %s"%str(e))
continue
if len(argv) == 0:
continue
# alias subsitution
except KeyboardInterrupt:
self.printer.out("")
continue
except EOFError:
break
except SystemExit:
break
def _exec_command(self, argv):
'''Execute command'''
try:
# handle ! in command name
if argv[0][0] == "!":
argv[0] = argv[0][1:]
if not len(argv[0]):
return
p = subprocess.Popen(argv, close_fds=True, shell=True)
return
# handle ? in command name
if argv[0][0] == "?":
if len(argv[0]) > 1:
argv.insert(1, argv[0][1:])
argv[0] = "help"
# execute command
self.printer.debug("argv: %s"%argv)
except cmdExit:
pass
except cmdBadName:
self.printer.error("No command: %s."%argv[0])
except cmdWarning as e:
self.printer.warn("%s: %s"%(argv[0], str(e)))
except cmdError as e:
self.printer.error("%s: %s"%(argv[0], str(e)))
self.printer.error("%s: %s"%(type(e), str(e)))
self.printer.warn("This is a not expected error, please report it!")
def _command_completer(self, texte):
'''Return the list of completion'''
comp = self.printer.completion
stripped = comp.get_buf()[:comp.get_begin() + 1].lstrip()
if texte == "" and stripped != "":
return ()
if len(texte) > 0 and texte[0] == "!":
return ()
if len(texte) > 0 and texte[0] == "?":
return [ "?%s"%c for c in list(self.commands) + self.aliases.keys() if c.startswith(texte[1:]) ]
return [ c for c in list(self.commands) + self.aliases.keys() if c.startswith(texte) ]