cli.py 6.72 KiB
#!/usr/bin/env python
#coding=utf8
'''
CloudControl CLI class
'''
import os, os.path
import sys
import socket
import ssl
import threading
import subprocess
import ConfigParser
import re
from cccli import printer, command, version, debug
from cccli.command import Command
from cccli.clierror import *
from sjrpc.core.exceptions import *
from sjrpc.client import SimpleRpcClient
from sjrpc.utils import RpcHandler, ConnectionProxy, pure
class Cli(object):
def __init__(self, settings):
self.isinteractive = sys.stderr.isatty() and sys.stdin.isatty()
self.settings = settings
self.prompt = "> "
self.rpc = None
self.alias = Alias()
self.history = History(self)
def start(self, line=""):
'''Start a CLI'''
# not interactive is command line
if line:
self.isinteractive = False
# start readline and load history
if self.isinteractive:
import readline
self.readline = readline
self.history.load(self.settings.get("history", ""))
self.history.maxsize(self.settings.get("hsize", None))
# load alias
self.alias.load(self.settings.get("alias", ""))
printer.debug("Alias: %s"%self.alias)
# Connecting
self._connect()
# authentifications
self._auth()
# run parsing args
if line:
self._parse_line(line)
else:
self._shell()
self.history.save(self.settings.get("history", ""))
def _connect(self):
printer.debug("Connecting...")
rpcc = SimpleRpcClient.from_addr(self.settings["server"],
self.settings["port"],
enable_ssl=True,
default_handler=CliHandler(),
on_disconnect="quit",
timeout=self.settings["timeout"]
)
rpcc.start(daemonize=True)
self.rpc = ConnectionProxy(rpcc)
def _auth(self):
printer.debug("Authenticating...")
if self.rpc.authentify(self.settings["login"], self.settings["pass"]):
printer.debug("Authenticated.")
else:
printer.fatal("Autentification failed!")
def _shell(self):
'''Shell parser'''
while True:
try:
line = raw_input(self.prompt)
self._parse_line(line)
except EOFError:
printer.out("")
break
except SystemExit:
break
except KeyboardInterrupt:
printer.out("")
printer.out("Tcho!")
def _parse_line(self, line):
'''Parse a line (more than one command)'''
for cmd in line.split(";"):
if (cmd.strip() == "" or cmd[0] == "#"):
continue
elif (cmd[0] == "!"):
p = subprocess.Popen(cmd[1:], close_fds=True, shell=True)
p.wait()
ret = p.returncode
elif (cmd[0] == "?"):
Command(["help"], self).call()
else:
self._parse_command(cmd)
def _parse_command(self, cmd):
try:
# lex command
argv = self._lex_argv(cmd)
# alias subs
if argv[0] in self.alias:
argv[0] = self.alias[argv[0]]
# execute command
Command(argv, self).call()
except BadArgument, e:
if str(e):
printer.error("Bad argument: %s."%str(e))
else:
printer.error("Bad argument.")
usage = Command.usage(argv[0])
if usage != "":
printer.out("usage: %s."%usage)
except BadCommand, e:
if str(e):
printer.error("command: %s."%str(e))
else:
printer.error("No command: %s."%argv[0])
except RpcError, e:
if debug: raise
printer.error("sjRPC: %s"%str(e))
except Exception, e:
if debug: raise
printer.error("%s: %s."%(argv[0], str(e)))
def _lex_argv(self, string):
'''Lex command argument'''
return string.split(" ")
class Alias(dict):
''' Alias wrapper'''
def load(self, filename):
'''load alias from file'''
if os.access(filename, os.R_OK):
fparser = ConfigParser.SafeConfigParser()
fparser.read(filename)
if fparser.has_section("alias"):
self.clear()
self.update(fparser.items("alias"))
def save(self, filename):
'''save alias on file'''
if os.access(filename, os.R_OK or os.W_OK):
fparser = ConfigParser.SafeConfigParser()
fparser.read(filename)
fparser.remove_section("alias")
fparser.add_section("alias")
for n,v in self.items():
fparser.set("alias", n, v)
fparser.write(open(filename, "w"))
class History(object):
'''History class'''
def __init__(self, cli):
self.cli = cli
def __nonzero__(self):
return not self.cli.readline is None
def __getattribute__(self, name):
r = object.__getattribute__(self, "cli")
if name == "cli":
return r
if r.readline is None:
return lambda *a,**k: None
return object.__getattribute__(self, name)
def __iter__(self):
for i in range(1, len(self)):
yield self.cli.readline.get_history_item(i)
def __len__(self):
return self.cli.readline.get_current_history_length()
def load(self, path):
'''Load history from a file'''
try:
self.cli.readline.read_history_file(path)
except IOError:
pass
def save(self, path):
'''Save history into path'''
try:
self.cli.readline.write_history_file(path)
except IOError:
pass
def maxsize(self, size=None):
'''Set or return max history size'''
if size is not None:
self.cli.readline.set_history_length(size)
return self.cli.readline.get_history_length()
def removelast(self):
'''Remove last history line'''
self.cli.readline.remove_history_item(self.cli.readline.get_current_history_length())
def clear(self):
'''Clear history'''
self.cli.readline.clear_history()
class CliHandler(RpcHandler):
'''Handle RPC incoming request'''
@pure
def get_tags(self, tags=()):
if "version" in tags:
return { "version": version }
@pure
def quit(self, rpc=None):
printer.fatal("Disconnected from server!")