#!/usr/bin/env python #coding=utf8 import re from fnmatch import fnmatch MULTIPLICATOR_TABLE = {'B': 1, 'b': 1, 'o': 1, 'K': 100, 'k': 100, 'M': 1000, 'm': 1000, 'G': 10000, 'g': 10000, 'T': 100000, 't': 100000} def gen_all_operators(operators): ops = [] ops += list(operators) ops += ['!' + o for o in operators] return '|'.join(ops) def multiplicator(number): number = str(number) try: if len(number) > 1 and number[-1] in MULTIPLICATOR_TABLE: integer = int(number[:-1]) * MULTIPLICATOR_TABLE[number[-1]] else: integer = int(number) except ValueError: return 0 return integer class TqlParsingError(Exception): pass class TqlCondition(object): ''' Represent a single condition for a tag (eg: value of mem eq 1000G). :param name: the name of the tag :param value: the matching value :param operator: the operator function ''' def __init__(self, name, value, operator, invert=False): self.name = name self.value = value self.operator = operator self.invert = invert def __hash__(self): return hash((self.name, self.value, self.operator, self.invert)) def __repr__(self): return '' % (self.name, self.value) class TqlQuery(object): ''' Parse a query written with TQL (Tag Query Language) and allow to filter a liste of tagged objects. :param query: the query to parse ''' OPERATORS = {':': 'glob', '=': 'equal', '>': 'gt', '<': 'lt', '>=': 'gte', '<=': 'lte', '~': 'regex'} REGEX = re.compile(r'^(?P[a-z-A-Z0-9_-]+)' r'((?P[!$<>~:=]{1,3})' r'(?P[^&]+))?$') def __init__(self, query): self._conditions = set() self._parse(query) def _parse(self, query): ''' Parse the TQG query and fill the condition set. :param query: the query to parse ''' conditions = query.split('&') for cond in conditions: m = TqlQuery.REGEX.match(cond) if not m: raise TqlParsingError(("Error while parsing, invalid '" "condition: '%s'") % cond) # Retrieve each parts: name = m.group('name') operator = m.group('operator') if operator: value = m.group('value') else: # Apply the default operator if not specified operator = ':' value = '*' # "not" operator stuff: if operator.startswith('!') and len(operator) > 1: operator = operator.lstrip('!') invert = True else: invert = False # Retrieve the operator callable: op_name = TqlQuery.OPERATORS.get(operator) if op_name is None: raise TqlParsingError("Error while parsing, invalid operator: " "'%s'" % operator) op_func = getattr(self, 'op_%s' % op_name, lambda x, y: True) # Create the condition and add it to the condition set: cond = TqlCondition(name, value, op_func, invert) self._conditions.add(cond) def _match_single(self, obj, tag_name=None, key=lambda o,n: o.get(n)): ''' Return True if provided object match all the conditions. If tag_name is provided, the matching will be made only for the specified tag. ''' if tag_name is None: conditions = self._conditions else: conditions = [cond for cond in self._conditions if cond.name == tag_name] res = (c.operator(key(obj, c.name), c.value) ^ c.invert for c in conditions) if res: return all(res) else: return False def filter(self, objects, tag_name=None, key=lambda o,n: o.get(n)): ''' Filters objects which don't match with the query. If tag_name is provided, the matching will be made only for the specified tag. :param objects: list of objects to check :param tag_name: the name of the matching tag :return: filtered list of objects ''' return [o for o in objects if self._match_single(o, tag_name, key)] def has_condition(self, tag_name): ''' Return True if query has made a condition for the specified tag name. ''' for cond in self._conditions: if cond.name == tag_name: return True else: return False def _get_tags(self): ''' Get the set of all tags involved in this query. ''' return set((c.name for c in self._conditions)) tags = property(_get_tags) def op_glob(self, value, pattern): if value is None: return False return fnmatch(str(value), str(pattern)) def op_lt(self, lvalue, rvalue): lvalue = multiplicator(lvalue) rvalue = multiplicator(rvalue) return lvalue < rvalue def op_lte(self, lvalue, rvalue): lvalue = multiplicator(lvalue) rvalue = multiplicator(rvalue) return lvalue <= rvalue def op_gt(self, lvalue, rvalue): lvalue = multiplicator(lvalue) rvalue = multiplicator(rvalue) return lvalue > rvalue def op_gte(self, lvalue, rvalue): lvalue = multiplicator(lvalue) rvalue = multiplicator(rvalue) return lvalue >= rvalue def op_regex(self, value, pattern): if value is None: return False try: return re.match(pattern, value) is not None except re.error: raise TqlParsingError('Error in your regex pattern: %s' % pattern) def op_equal(self, lvalue, rvalue): if lvalue is None: return False if lvalue.isdigit() and rvalue.isdigit(): # Integer comparison: lvalue = multiplicator(lvalue) rvalue = multiplicator(rvalue) return lvalue == rvalue