Skip to content
Snippets Groups Projects
printer.py 7.92 KiB
#!/usr/bin/env python
#coding=utf8

'''
CloudControl CLI Printer module
'''

import cccli
from cccli.exception import *

import sys
import os
import termios
import fcntl
import struct
import signal

color = {
    # regular
    "black": "\033[0;30m",
    "red": "\033[0;31m",
    "green": "\033[0;32m",
    "yellow": "\033[0;33m",
    "blue": "\033[0;34m",
    "purple": "\033[0;35m",
    "cyan": "\033[0;36m",
    "white": "\033[0;37m",
    # lighted
    "lgrey": "\033[1;30m",
    "lred": "\033[1;31m",
    "lgreen": "\033[1;32m",
    "lyellow": "\033[1;33m",
    "lblue": "\033[1;34m",
    "lpurple": "\033[1;35m",
    "lcyan": "\033[1;36m",
    "lwhite": "\033[1;37m",
    # underline
    "ured": "\033[4;31m",
    "ugreen": "\033[4;32m",
    "uyellow": "\033[4;33m",
    "ublue": "\033[4;34m",
    "upurple": "\033[4;35m",
    "ucyan": "\033[4;36m",
    # others
    "light": "\033[1m",
    "reset": "\033[m",
    "[": "\001",
    "]": "\002",
    }


class Printer(object):
    '''Print relative class'''
    def __init__(self):
        self.readline = None
        self.history = History()
        self.completion = Completion()

    def set_interactive(self):
        '''Set interactive mode'''
        if self.readline is not None:
            return
        try:
            import readline
        except Exception as e:
            raise cliError("Unable to start readline")
        self.readline = readline
        # enable history
        self.history.readline = readline
        # enable completion
        self.completion.readline = readline

    def out(self, message="", fd=sys.stdout, nl=os.linesep, flush=True):
        '''Print a message in fd ended by nl'''
        fd.write("%s%s"%(message, nl))
        if flush:
            fd.flush()

    def err(self, message, fd=sys.stderr, nl=os.linesep):
        self.out(message, fd, nl)

    def fatal(self, message, quit=True, fd=sys.stderr, nl=os.linesep):
        self.out("%sFatal%s: %s%s"%(color["lred"],color["red"],message, color["reset"]),
            fd,
            nl)
        if quit:
            os.kill(0, signal.SIGKILL)

    def error(self, message, fd=sys.stderr, nl=os.linesep):
        self.out("%sError%s: %s%s"%(color["lred"],color["red"],message,color["reset"]),
                 fd,
                 nl)

    def warn(self, message, fd=sys.stderr, nl=os.linesep):
        self.out("%sWarning%s: %s%s"%(color["lyellow"],color["yellow"],message,color["reset"]),
                 fd,
                 nl)

    def debug(self, message, fd=sys.stderr, nl=os.linesep):
        if cccli.debug:
            self.out("%s%s%s"%(color["lgrey"],message,color["reset"]), fd, nl)

    def isinteractive(self):
        return self.readline is not None

    def interactive(self, message, fd=sys.stderr, nl=os.linesep):
        '''Print only in interactive mode'''
        if self.readline is not None:
            self.out(message, fd, nl)

    def getline(self, prompt):
        '''Read a line from stdin'''
        try:
            s = raw_input(prompt)
        except EOFError:
            raise
        except KeyboardInterrupt:
            raise
        except Exception as e:
            raise cliError(str(e))
        return s

    def getpass(self, prompt):
        '''Ask for a password. No echo. Not in history'''
        if self.readline is None:
            raise cliError("Unable to ask a password in non-interactive mode")
        fd = sys.stdin.fileno()
        old = termios.tcgetattr(fd)
        new = termios.tcgetattr(fd)
        new[3] = new[3] & ~termios.ECHO
        h = list(self.history)
        self.history.clear()
        try:
            termios.tcsetattr(fd, termios.TCSADRAIN, new)
            passwd = self.getline(prompt)
        finally:
            termios.tcsetattr(fd, termios.TCSADRAIN, old)
            self.history.load(h)
        sys.stdout.write(os.linesep)
        sys.stdout.flush()
        return passwd

    def ask(self, prompt):
        '''Used to ask a question. Default answer not saved to history'''
        if self.readline is None:
            raise cliError("Unable to ask question in non-interactive mode")
        h = list(self.history)
        self.history.clear()
        r = ""
        try:
            r = self.getline(prompt)
        except EOFError:
            pass
        except KeyboardInterrupt:
            pass
        finally:
            self.history.load(h)
        return r

    def get_term_size(self):
        '''Return terminal size'''
        if self.readline is None:
            raise cliError("Unable to get term size in non-interactive mode")
        req = struct.pack("HHHH", 0, 0, 0, 0)
        resp = fcntl.ioctl(sys.stdin.fileno(), termios.TIOCGWINSZ, req)
        rows, cols, px_x, px_y = struct.unpack("HHHH", resp)
        return rows, cols


class History(object):
    '''History class'''
    def __init__(self):
        self.readline = None

    def __nonzero__(self):
        return self.readline is not None

    def __getattribute__(self, name):
         r = object.__getattribute__(self, "readline")
         if name == "readline":
             return r
         if r is None:
             return lambda *a,**k: None
         return object.__getattribute__(self, name)

    def __iter__(self):
        if self.readline is None:
            return
        for i in range(1, self.readline.get_current_history_length() + 1):
            yield self.readline.get_history_item(i)

    def __len__(self):
        if self.readline is None:
            return 0
        return self.readline.get_current_history_length()

    def load(self, items):
        '''Load history from a list'''
        self.clear()
        for l in items:
            self.readline.add_history(l)

    def read(self, path):
        '''Load history from a file'''
        self.clear()
        if path is None:
            return
        try:
            self.readline.read_history_file(path)
        except IOError:
            pass

    def write(self, path):
        '''Save history into path'''
        if path is None:
            return
        try:
            self.readline.write_history_file(path)
        except IOError:
            pass

    def maxsize(self, size=None):
        '''Set or return max history size'''
        if size is not None:
            self.readline.set_history_length(size)
        return self.readline.get_history_length()

    def removelast(self):
        '''Remove last history line'''
        self.readline.remove_history_item(self.readline.get_current_history_length() - 1)

    def clear(self):
        '''Clear history'''
        self.readline.clear_history()


class Completion(object):
    '''Handle completion functions'''
    def __init__(self):
        self.readline = None
        self.compfunc = None
        self.complist = list()

    def __getattribute__(self, name):
         r = object.__getattribute__(self, "readline")
         if name == "readline":
             return r
         if r is None:
             return lambda *a,**k: None
         return object.__getattribute__(self, name)

    def get_buf(self):
        '''Return current readline buffer'''
        return self.readline.get_line_buffer()

    def get_begin(self):
        '''Get the beginning index of the readline tab-completion scope'''
        return self.readline.get_begidx()

    def get_end(self):
        '''Get the ending index of the readline tab-completion scope'''
        return self.readline.get_begidx()

    def _completer(self, text, state):
        '''Readline real completer'''
        if state == 0:
            if self.compfunc is not None:
                self.complist = list(self.compfunc(text))
        try:
            return self.complist[state]
        except IndexError:
            self.complist = list()
            return None

    def set_completer(self, func):
        '''Set completer custom function which return a list of possibilities'''
        self.compfunc = func
        self.readline.set_completer(self._completer)
        self.readline.parse_and_bind("tab: complete")
        self.readline.set_completer_delims(" \t")