Newer
Older
#!/usr/bin/env python
#coding=utf8
'''
CloudControl CLI class
'''
import os, os.path
import sys
import socket
import ssl
import threading
import re
from cccli import printer, command, version, debug
from cccli.clierror import *
from sjrpc.core.exceptions import *
from sjrpc.client import SimpleRpcClient
from sjrpc.utils import RpcHandler, ConnectionProxy, pure
def __init__(self, settings):
self.isinteractive = sys.stderr.isatty() and sys.stdin.isatty()
self.settings = settings
self.prompt = "> "
# not interactive is command line
if line:
self.isinteractive = False
# start readline and load history
if self.isinteractive:
import readline
self.history.load(self.settings.get("history", ""))
self.history.maxsize(self.settings.get("hsize", None))
# load alias
self.alias.load(self.settings.get("alias", ""))
printer.debug("Alias: %s"%self.alias)
# Connecting
self._connect()
# authentifications
self._auth()
# run parsing args
self._shell()
self.history.save(self.settings.get("history", ""))
def _connect(self):
printer.debug("Connecting...")
rpcc = SimpleRpcClient.from_addr(self.settings["server"],
self.settings["port"],
on_disconnect="quit",
def _auth(self):
printer.debug("Authenticating...")
if self.rpc.authentify(self.settings["login"], self.settings["pass"]):
printer.debug("Authenticated.")
else:
printer.fatal("Autentification failed!")
self._parse_line(line)
except EOFError:
printer.out("")
break
except SystemExit:
break
except KeyboardInterrupt:
printer.out("")
printer.out("Tcho!")
def _parse_line(self, line):
'''Parse a line (more than one command)'''
for cmd in line.split(";"):
if (cmd.strip() == "" or cmd[0] == "#"):
continue
elif (cmd[0] == "!"):
p = subprocess.Popen(cmd[1:], close_fds=True, shell=True)
p.wait()
ret = p.returncode
elif (cmd[0] == "?"):
else:
self._parse_command(cmd)
def _parse_command(self, cmd):
try:
# lex command
argv = self._lex_argv(cmd)
# alias subs
if argv[0] in self.alias:
argv[0] = self.alias[argv[0]]
# execute command
Command(argv, self).call()
except BadArgument, e:
printer.error("Bad argument.")
usage = Command.usage(argv[0])
if usage != "":
printer.out("usage: %s."%usage)
except BadCommand, e:
if str(e):
printer.error("command: %s."%str(e))
else:
printer.error("No command: %s."%argv[0])
printer.error("sjRPC: %s"%str(e))
except Exception, e:
if fparser.has_section("alias"):
self.clear()
self.update(fparser.items("alias"))
fparser.remove_section("alias")
fparser.add_section("alias")
for n,v in self.items():
fparser.set("alias", n, v)
fparser.write(open(filename, "w"))
class History(object):
'''History class'''
def __init__(self, cli):
self.cli = cli
r = object.__getattribute__(self, "cli")
if name == "cli":
return lambda *a,**k: None
return object.__getattribute__(self, name)
def __iter__(self):
for i in range(1, len(self)):
yield self.cli.readline.get_history_item(i)
return self.cli.readline.get_current_history_length()
def load(self, path):
'''Load history from a file'''
try:
self.cli.readline.read_history_file(path)
except IOError:
pass
def save(self, path):
'''Save history into path'''
try:
self.cli.readline.write_history_file(path)
except IOError:
pass
def maxsize(self, size=None):
'''Set or return max history size'''
if size is not None:
self.cli.readline.set_history_length(size)
return self.cli.readline.get_history_length()
def removelast(self):
'''Remove last history line'''
self.cli.readline.remove_history_item(self.cli.readline.get_current_history_length())
def clear(self):
'''Clear history'''
self.cli.readline.clear_history()
class CliHandler(RpcHandler):
'''Handle RPC incoming request'''
@pure
def get_tags(self, tags=()):
if "version" in tags: