#!/usr/bin/env python #coding=utf8 ''' CloudControl CLI Printer module ''' import sys import os import termios import fcntl import struct import cccli from cccli.exception import * color = { # regular "red": "\033[0;31m", "green": "\033[0;32m", "yellow": "\033[0;33m", "blue": "\033[0;34m", "purple": "\033[0;35m", "cyan": "\033[0;36m", "white": "\033[0;37m", # lighted "lred": "\033[1;31m", "lgreen": "\033[1;32m", "lyellow": "\033[1;33m", "lblue": "\033[1;34m", "lpurple": "\033[1;35m", "lcyan": "\033[1;36m", "lwhite": "\033[1;37m", # underline "ured": "\033[4;31m", "ugreen": "\033[4;32m", "uyellow": "\033[4;33m", "ublue": "\033[4;34m", "upurple": "\033[4;35m", "ucyan": "\033[4;36m", # others "light": "\033[1m", "reset": "\033[m", "[": "\001", "]": "\002", } class Printer(object): '''Print relative class''' def __init__(self, interactive=False, historyfile=None, historysize=None): self.readline = None self.history = History() self.historyfile = historyfile self.historysize = historysize if interactive: self.setinteractive() def isinteractive(self): '''Return if printer is in interactive mode''' return self.readline is not None def setinteractive(self): if self.readline is not None: return try: import readline except Exception as e: raise cliError("Unable to start readline") self.readline = readline # enable history self.history.readline = readline # load history self.history.read(self.historyfile) self.history.maxsize(self.historysize) # start prompt completer self.readline.set_completer(self.completer) self.readline.parse_and_bind("tab: complete") def out(self, message="", fd=sys.stdout, nl=os.linesep, flush=True): '''Print a message in fd ended by nl''' fd.write("%s%s"%(message, nl)) if flush: fd.flush() def err(self, message, fd=sys.stderr, nl=os.linesep): self.out(message, fd, nl) def fatal(self, message, quit=True, fd=sys.stderr, nl=os.linesep): self.out("%sFatal%s: %s%s"%(color["lred"],color["red"],message, color["reset"]), fd, nl) if quit: os.kill(0, 15) def error(self, message, fd=sys.stderr, nl=os.linesep): self.out("%sError%s: %s%s"%(color["lred"],color["red"],message,color["reset"]), fd, nl) def warn(self, message, fd=sys.stderr, nl=os.linesep): self.out("%sWarning%s: %s%s"%(color["lyellow"],color["yellow"],message,color["reset"]), fd, nl) def debug(self, message, fd=sys.stderr, nl=os.linesep): if cccli.debug: self.out(message, fd, nl) def getline(self, prompt, history=True): '''Read a line from stdin''' try: s = raw_input(prompt) except EOFError: raise except KeyboardInterrupt: raise except Exception as e: raise cliError(str(e)) if not history and s: self.history.removelast() return s def getpass(self, prompt): '''Ask for a password. No echo. Not in history''' if self.readline is None: raise cliError("Unable to ask a password in non-interactive mode") fd = sys.stdin.fileno() old = termios.tcgetattr(fd) new = termios.tcgetattr(fd) new[3] = new[3] & ~termios.ECHO try: termios.tcsetattr(fd, termios.TCSADRAIN, new) passwd = raw_input(prompt) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old) return passwd def ask(self, prompt): '''Used to ask a question. Default answer not saved to history''' if self.readline is None: raise cliError("Unable to ask question in non-interactive mode") h = list(self.history) self.history.clear() try: r = self.getline(prompt, history=False) finally: self.history.load(h) return r def get_term_size(self): '''Return terminal size''' if self.readline is None: raise cliError("Unable to get term size in non-interactive mode") req = struct.pack("HHHH", 0, 0, 0, 0) resp = fcntl.ioctl(sys.stdin.fileno(), termios.TIOCGWINSZ, req) rows, cols, px_x, px_y = struct.unpack("HHHH", resp) return rows, cols def completer(self, texte, state): r = self.readline if state == 0 and texte == "" and r.get_line_buffer()[:r.get_begidx() + 1].lstrip() != "": return None cl = [ c for c in Command.list() if c.startswith(texte) ] if state < len(cl): return cl[state] return None class History(object): '''History class''' def __init__(self): self.readline = None def __nonzero__(self): return self.readline is not None def __getattribute__(self, name): r = object.__getattribute__(self, "readline") if name == "readline": return r if r is None: return lambda *a,**k: None return object.__getattribute__(self, name) def __iter__(self): if self.readline is None: return for i in range(1, self.readline.get_current_history_length() + 1): yield self.readline.get_history_item(i) def __len__(self): if self.readline is None: return 0 return self.readline.get_current_history_length() def load(self, items): '''Load history from a list''' self.clear() for l in items: self.readline.add_history(l) def read(self, path): '''Load history from a file''' self.clear() try: self.readline.read_history_file(path) except IOError: pass def write(self, path): '''Save history into path''' try: self.readline.write_history_file(path) except IOError: pass def maxsize(self, size=None): '''Set or return max history size''' if size is not None: self.readline.set_history_length(size) return self.readline.get_history_length() def removelast(self): '''Remove last history line''' self.readline.remove_history_item(self.readline.get_current_history_length() - 1) def clear(self): '''Clear history''' self.readline.clear_history() # Need to be at end of file to fix cross loading of printer by command and command by printer # So i maybe should change the design of import from cccli.command import Command