#!/usr/bin/env python #coding=utf8 ''' CloudControl CLI Printer module ''' import sys import os import termios import fcntl import struct import cccli from cccli.exception import * color = { # regular "red": "\033[0;31m", "green": "\033[0;32m", "yellow": "\033[0;33m", "blue": "\033[0;34m", "purple": "\033[0;35m", "cyan": "\033[0;36m", "white": "\033[0;37m", # lighted "lred": "\033[1;31m", "lgreen": "\033[1;32m", "lyellow": "\033[1;33m", "lblue": "\033[1;34m", "lpurple": "\033[1;35m", "lcyan": "\033[1;36m", "lwhite": "\033[1;37m", # underline "ured": "\033[4;31m", "ugreen": "\033[4;32m", "uyellow": "\033[4;33m", "ublue": "\033[4;34m", "upurple": "\033[4;35m", "ucyan": "\033[4;36m", # others "light": "\033[1m", "reset": "\033[m", "[": "\001", "]": "\002", } class Printer(object): '''Print relative class''' def __init__(self): self.readline = None self.history = History() self.completion = Completion() def set_interactive(self): '''Set interactive mode''' if self.readline is not None: return try: import readline except Exception as e: raise cliError("Unable to start readline") self.readline = readline # enable history self.history.readline = readline # enable completion self.completion.readline = readline def out(self, message="", fd=sys.stdout, nl=os.linesep, flush=True): '''Print a message in fd ended by nl''' fd.write("%s%s"%(message, nl)) if flush: fd.flush() def err(self, message, fd=sys.stderr, nl=os.linesep): self.out(message, fd, nl) def fatal(self, message, quit=True, fd=sys.stderr, nl=os.linesep): self.out("%sFatal%s: %s%s"%(color["lred"],color["red"],message, color["reset"]), fd, nl) if quit: os.kill(0, 15) def error(self, message, fd=sys.stderr, nl=os.linesep): self.out("%sError%s: %s%s"%(color["lred"],color["red"],message,color["reset"]), fd, nl) def warn(self, message, fd=sys.stderr, nl=os.linesep): self.out("%sWarning%s: %s%s"%(color["lyellow"],color["yellow"],message,color["reset"]), fd, nl) def debug(self, message, fd=sys.stderr, nl=os.linesep): if cccli.debug: self.out(message, fd, nl) def interactive(self, message, fd=sys.stderr, nl=os.linesep): '''Print only in interactive mode''' if self.readline is not None: self.out(message, fd, nl) def getline(self, prompt, history=True): '''Read a line from stdin''' try: s = raw_input(prompt) except EOFError: raise except KeyboardInterrupt: raise except Exception as e: raise cliError(str(e)) if not history and s: self.history.removelast() return s def getpass(self, prompt): '''Ask for a password. No echo. Not in history''' if self.readline is None: raise cliError("Unable to ask a password in non-interactive mode") fd = sys.stdin.fileno() old = termios.tcgetattr(fd) new = termios.tcgetattr(fd) new[3] = new[3] & ~termios.ECHO try: termios.tcsetattr(fd, termios.TCSADRAIN, new) passwd = raw_input(prompt) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old) return passwd def ask(self, prompt): '''Used to ask a question. Default answer not saved to history''' if self.readline is None: raise cliError("Unable to ask question in non-interactive mode") h = list(self.history) self.history.clear() try: r = self.getline(prompt, history=False) finally: self.history.load(h) return r def get_term_size(self): '''Return terminal size''' if self.readline is None: raise cliError("Unable to get term size in non-interactive mode") req = struct.pack("HHHH", 0, 0, 0, 0) resp = fcntl.ioctl(sys.stdin.fileno(), termios.TIOCGWINSZ, req) rows, cols, px_x, px_y = struct.unpack("HHHH", resp) return rows, cols class History(object): '''History class''' def __init__(self): self.readline = None def __nonzero__(self): return self.readline is not None def __getattribute__(self, name): r = object.__getattribute__(self, "readline") if name == "readline": return r if r is None: return lambda *a,**k: None return object.__getattribute__(self, name) def __iter__(self): if self.readline is None: return for i in range(1, self.readline.get_current_history_length() + 1): yield self.readline.get_history_item(i) def __len__(self): if self.readline is None: return 0 return self.readline.get_current_history_length() def load(self, items): '''Load history from a list''' self.clear() for l in items: self.readline.add_history(l) def read(self, path): '''Load history from a file''' self.clear() try: self.readline.read_history_file(path) except IOError: pass def write(self, path): '''Save history into path''' try: self.readline.write_history_file(path) except IOError: pass def maxsize(self, size=None): '''Set or return max history size''' if size is not None: self.readline.set_history_length(size) return self.readline.get_history_length() def removelast(self): '''Remove last history line''' self.readline.remove_history_item(self.readline.get_current_history_length() - 1) def clear(self): '''Clear history''' self.readline.clear_history() class Completion(object): '''Handle completion functions''' def __init__(self): self.readline = None def __getattribute__(self, name): r = object.__getattribute__(self, "readline") if name == "readline": return r if r is None: return lambda *a,**k: None return object.__getattribute__(self, name) def get_buf(self): '''Return current readline buffer''' return self.readline.get_line_buffer() def get_begin(self): '''Get the beginning index of the readline tab-completion scope''' return self.readline.get_begidx() def get_end(self): '''Get the ending index of the readline tab-completion scope''' return self.readline.get_begidx() def set_completer(self, func): '''Set completer function''' self.readline.set_completer(func) self.readline.parse_and_bind("tab: complete")