printer.py 7.58 KiB
#!/usr/bin/env python
#coding=utf8
'''
CloudControl CLI Printer module
'''
import sys
import os
import termios
import fcntl
import struct
import cccli
from cccli.exception import *
color = {
# regular
"black": "\033[0;30m",
"red": "\033[0;31m",
"green": "\033[0;32m",
"yellow": "\033[0;33m",
"blue": "\033[0;34m",
"purple": "\033[0;35m",
"cyan": "\033[0;36m",
"white": "\033[0;37m",
# lighted
"lgrey": "\033[1;30m",
"lred": "\033[1;31m",
"lgreen": "\033[1;32m",
"lyellow": "\033[1;33m",
"lblue": "\033[1;34m",
"lpurple": "\033[1;35m",
"lcyan": "\033[1;36m",
"lwhite": "\033[1;37m",
# underline
"ured": "\033[4;31m",
"ugreen": "\033[4;32m",
"uyellow": "\033[4;33m",
"ublue": "\033[4;34m",
"upurple": "\033[4;35m",
"ucyan": "\033[4;36m",
# others
"light": "\033[1m",
"reset": "\033[m",
"[": "\001",
"]": "\002",
}
class Printer(object):
'''Print relative class'''
def __init__(self):
self.readline = None
self.history = History()
self.completion = Completion()
def set_interactive(self):
'''Set interactive mode'''
if self.readline is not None:
return
try:
import readline
except Exception as e:
raise cliError("Unable to start readline")
self.readline = readline
# enable history
self.history.readline = readline
# enable completion
self.completion.readline = readline
def out(self, message="", fd=sys.stdout, nl=os.linesep, flush=True):
'''Print a message in fd ended by nl'''
fd.write("%s%s"%(message, nl))
if flush:
fd.flush()
def err(self, message, fd=sys.stderr, nl=os.linesep):
self.out(message, fd, nl)
def fatal(self, message, quit=True, fd=sys.stderr, nl=os.linesep):
self.out("%sFatal%s: %s%s"%(color["lred"],color["red"],message, color["reset"]),
fd,
nl)
if quit:
os.kill(0, 15)
def error(self, message, fd=sys.stderr, nl=os.linesep):
self.out("%sError%s: %s%s"%(color["lred"],color["red"],message,color["reset"]),
fd,
nl)
def warn(self, message, fd=sys.stderr, nl=os.linesep):
self.out("%sWarning%s: %s%s"%(color["lyellow"],color["yellow"],message,color["reset"]),
fd,
nl)
def debug(self, message, fd=sys.stderr, nl=os.linesep):
if cccli.debug:
self.out("%s%s%s"%(color["lgrey"],message,color["reset"]), fd, nl)
def interactive(self, message, fd=sys.stderr, nl=os.linesep):
'''Print only in interactive mode'''
if self.readline is not None:
self.out(message, fd, nl)
def getline(self, prompt, history=True):
'''Read a line from stdin'''
try:
s = raw_input(prompt)
except EOFError:
raise
except KeyboardInterrupt:
raise
except Exception as e:
raise cliError(str(e))
if not history and s:
self.history.removelast()
return s
def getpass(self, prompt):
'''Ask for a password. No echo. Not in history'''
if self.readline is None:
raise cliError("Unable to ask a password in non-interactive mode")
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
new = termios.tcgetattr(fd)
new[3] = new[3] & ~termios.ECHO
try:
termios.tcsetattr(fd, termios.TCSADRAIN, new)
passwd = raw_input(prompt)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
return passwd
def ask(self, prompt):
'''Used to ask a question. Default answer not saved to history'''
if self.readline is None:
raise cliError("Unable to ask question in non-interactive mode")
h = list(self.history)
self.history.clear()
try:
r = self.getline(prompt, history=False)
finally:
self.history.load(h)
return r
def get_term_size(self):
'''Return terminal size'''
if self.readline is None:
raise cliError("Unable to get term size in non-interactive mode")
req = struct.pack("HHHH", 0, 0, 0, 0)
resp = fcntl.ioctl(sys.stdin.fileno(), termios.TIOCGWINSZ, req)
rows, cols, px_x, px_y = struct.unpack("HHHH", resp)
return rows, cols
class History(object):
'''History class'''
def __init__(self):
self.readline = None
def __nonzero__(self):
return self.readline is not None
def __getattribute__(self, name):
r = object.__getattribute__(self, "readline")
if name == "readline":
return r
if r is None:
return lambda *a,**k: None
return object.__getattribute__(self, name)
def __iter__(self):
if self.readline is None:
return
for i in range(1, self.readline.get_current_history_length() + 1):
yield self.readline.get_history_item(i)
def __len__(self):
if self.readline is None:
return 0
return self.readline.get_current_history_length()
def load(self, items):
'''Load history from a list'''
self.clear()
for l in items:
self.readline.add_history(l)
def read(self, path):
'''Load history from a file'''
self.clear()
try:
self.readline.read_history_file(path)
except IOError:
pass
def write(self, path):
'''Save history into path'''
try:
self.readline.write_history_file(path)
except IOError:
pass
def maxsize(self, size=None):
'''Set or return max history size'''
if size is not None:
self.readline.set_history_length(size)
return self.readline.get_history_length()
def removelast(self):
'''Remove last history line'''
self.readline.remove_history_item(self.readline.get_current_history_length() - 1)
def clear(self):
'''Clear history'''
self.readline.clear_history()
class Completion(object):
'''Handle completion functions'''
def __init__(self):
self.readline = None
self.compfunc = None
self.complist = list()
def __getattribute__(self, name):
r = object.__getattribute__(self, "readline")
if name == "readline":
return r
if r is None:
return lambda *a,**k: None
return object.__getattribute__(self, name)
def get_buf(self):
'''Return current readline buffer'''
return self.readline.get_line_buffer()
def get_begin(self):
'''Get the beginning index of the readline tab-completion scope'''
return self.readline.get_begidx()
def get_end(self):
'''Get the ending index of the readline tab-completion scope'''
return self.readline.get_begidx()
def _completer(self, text, state):
'''Readline real completer'''
if state == 0:
if self.compfunc is not None:
self.complist = list(self.compfunc(text))
try:
return self.complist[state]
except IndexError:
self.complist = list()
return None
def set_completer(self, func):
'''Set completer custom function which return a list of possibilities'''
self.compfunc = func
self.readline.set_completer(self._completer)
self.readline.parse_and_bind("tab: complete")
self.readline.set_completer_delims(" \t")