#!/usr/bin/env python #coding=utf8 import re from fnmatch import fnmatch from itertools import izip MULTIPLICATOR_TABLE = {'B': 1, 'b': 1, 'o': 1, 'K': 100, 'k': 100, 'M': 1000, 'm': 1000, 'G': 10000, 'g': 10000, 'T': 100000, 't': 100000} def gen_all_operators(operators): ops = [] ops += list(operators) ops += ['!' + o for o in operators] return '|'.join(ops) def multiplicator(number): number = str(number) try: if len(number) > 1 and number[-1] in MULTIPLICATOR_TABLE: integer = int(number[:-1]) * MULTIPLICATOR_TABLE[number[-1]] else: integer = int(number) except ValueError: return 0 return integer class TqlParsingError(Exception): pass class TqlCondition(object): ''' Represent a single condition for a tag (eg: value of mem eq 1000G). :param name: the name of the tag :param value: the matching value :param operator: the operator function ''' def __init__(self, name, value, operator, invert=False): self.name = name self.value = value self.operator = operator self.invert = invert def __hash__(self): return hash((self.name, self.value, self.operator, self.invert)) def __repr__(self): return '' % (self.name, self.value) def check(self, value): return self.operator(value, self.value) ^ self.invert class TqlQuery(object): ''' Parse a query written with TQL (Tag Query Language) and allow to filter a liste of tagged objects. :param query: the query to parse ''' OPERATORS = {':': 'glob', '=': 'equal', '>': 'gt', '<': 'lt', '>=': 'gte', '<=': 'lte', '~': 'regex'} RE_COND = re.compile(r'^(?P[a-z-A-Z0-9_-]+)' r'((?P[!$<>~:=]{1,3})' r'(?P[^&]+))?$') RE_TAG = re.compile(r'^(?P[a-z-A-Z0-9_-]+)') RE_SEPARATOR = re.compile(r'(&|\$)') def __init__(self, query): self._conditions = [] self._to_show = set() self._parse(query) def _parse(self, query): ''' Parse the TQG query and fill the condition set. :param query: the query to parse ''' # Separate the query into tokens: tokens = TqlQuery.RE_SEPARATOR.split(query) # Insert the first separator at the start of the token list: tokens.insert(0, '&') for sep, tok in izip(tokens[0::2], tokens[1::2]): if not tok: continue # Empty condition, we assume that is not an error if sep == '&': # Condition separator, the token is actually a condition: m = TqlQuery.RE_COND.match(tok) if not m: raise TqlParsingError('Error while parsing, invalid ' 'condition: %s' % repr(tok)) # Retrieve each parts: name = m.group('name') operator = m.group('operator') if operator: value = m.group('value') else: # Apply the default operator if not specified operator = ':' value = '*' # "not" operator stuff: if operator.startswith('!') and len(operator) > 1: operator = operator.lstrip('!') invert = True else: invert = False # Retrieve the operator callable: op_name = TqlQuery.OPERATORS.get(operator) if op_name is None: raise TqlParsingError('Error while parsing, invalid ' 'operator: %s' % repr(operator)) op_func = getattr(self, 'op_%s' % op_name, lambda x, y: True) # Create the condition and add it to the condition list: cond = TqlCondition(name, value, op_func, invert) self._conditions.append(cond) # The tag will be also show: self._to_show.add(name) elif sep == '$': # "To show" separator, the token is a tag name: if TqlQuery.RE_TAG.match(tok) is not None: self._to_show.add(tok) else: raise TqlParsingError('Error while parsing, invalid tag ' 'name %s' % repr(tok)) def has_condition(self, tag_name): ''' Return True if query has made a condition for the specified tag name. ''' for cond in self._conditions: if cond.name == tag_name: return True else: return False def iterconditions(self): for cond in self._conditions: yield cond def _get_tags(self): ''' Get the set of all tags involved in this query. ''' return self._to_show tags = property(_get_tags) def op_glob(self, value, pattern): if value is None: return False return fnmatch(str(value), str(pattern)) def op_lt(self, lvalue, rvalue): lvalue = multiplicator(lvalue) rvalue = multiplicator(rvalue) return lvalue < rvalue def op_lte(self, lvalue, rvalue): lvalue = multiplicator(lvalue) rvalue = multiplicator(rvalue) return lvalue <= rvalue def op_gt(self, lvalue, rvalue): lvalue = multiplicator(lvalue) rvalue = multiplicator(rvalue) return lvalue > rvalue def op_gte(self, lvalue, rvalue): lvalue = multiplicator(lvalue) rvalue = multiplicator(rvalue) return lvalue >= rvalue def op_regex(self, value, pattern): if value is None: return False try: return re.match(pattern, value) is not None except re.error: raise TqlParsingError('Error in your regex pattern: %s' % pattern) def op_equal(self, lvalue, rvalue): if lvalue is None: return False if lvalue.isdigit() and rvalue.isdigit(): # Integer comparison: lvalue = multiplicator(lvalue) rvalue = multiplicator(rvalue) return lvalue == rvalue