Skip to content
tql.py 6.33 KiB
Newer Older
Antoine Millet's avatar
Antoine Millet committed
#!/usr/bin/env python
#coding=utf8

import re
from fnmatch import fnmatch

MULTIPLICATOR_TABLE = {'B': 1, 'b': 1, 'o': 1,
                       'K': 100, 'k': 100,
                       'M': 1000, 'm': 1000,
                       'G': 10000, 'g': 10000,
                       'T': 100000, 't': 100000}

def gen_all_operators(operators):
    ops = []
    ops += list(operators)
    ops += ['!' + o for o in operators]
    return '|'.join(ops)


def multiplicator(number):
    number = str(number)
    try:
        if len(number) > 1 and number[-1] in MULTIPLICATOR_TABLE:
            integer = int(number[:-1]) * MULTIPLICATOR_TABLE[number[-1]]
        else:
            integer = int(number)
    except ValueError:
        return 0

    return integer
    

class TqlParsingError(Exception):
    pass


class TqlCondition(object):
    '''
    Represent a single condition for a tag (eg: value of mem eq 1000G).

    :param name: the name of the tag
    :param value: the matching value
    :param operator: the operator function
    '''
    
    def __init__(self, name, value, operator, invert=False):
        self.name = name
        self.value = value
        self.operator = operator
        self.invert = invert

    def __hash__(self):
        return hash((self.name, self.value, self.operator, self.invert))

    def __repr__(self):
        return '<TqlCondition object for %s value %s>' % (self.name, self.value)

class TqlQuery(object):
    '''
    Parse a query written with TQL (Tag Query Language) and allow to filter a
    liste of tagged objects.

    :param query: the query to parse
    '''

    OPERATORS = {':': 'glob',
                 '=': 'equal',
                 '>': 'gt',
                 '<': 'lt',
                 '>=': 'gte',
                 '<=': 'lte',
                 '~': 'regex'}

    REGEX = re.compile(r'^(?P<name>[a-z-A-Z0-9_-]+)'
                       r'((?P<operator>[!$<>~:=]{1,3})'
                       r'(?P<value>[^&]+))?$')

    def __init__(self, query):
        self._conditions = set()
        self._parse(query)

    def _parse(self, query):
        '''
        Parse the TQG query and fill the condition set.

        :param query: the query to parse
        '''

        conditions = query.split('&')

        for cond in conditions:
            m = TqlQuery.REGEX.match(cond)
            if not m:
                raise TqlParsingError(("Error while parsing, invalid '"
                                       "condition: '%s'") % cond)

            # Retrieve each parts:
            name = m.group('name')
            operator = m.group('operator')

            if operator:
                value = m.group('value')
            else: # Apply the default operator if not specified
                operator = ':'
                value = '*'

            # "not" operator stuff:
            if operator.startswith('!') and len(operator) > 1:
                operator = operator.lstrip('!')
                invert = True
            else:
                invert = False

            # Retrieve the operator callable:
            op_name = TqlQuery.OPERATORS.get(operator)
            if op_name is None:
                raise TqlParsingError("Error while parsing, invalid operator: "
                                      "'%s'" % operator)
            op_func = getattr(self, 'op_%s' % op_name, lambda x, y: True)

            # Create the condition and add it to the condition set:
            cond = TqlCondition(name, value, op_func, invert)
            self._conditions.add(cond)

    def _match_single(self, obj, tag_name=None, key=lambda o,n: o.get(n)):
        '''
        Return True if provided object match all the conditions.

        If tag_name is provided, the matching will be made only for the
        specified tag.
        '''

        if tag_name is None:
            conditions = self._conditions
        else:
            conditions = [cond for cond in self._conditions
                          if cond.name == tag_name]

        res = (c.operator(key(obj, c.name), c.value) ^ c.invert
                   for c in conditions)

        if res:
            return all(res)
        else:
            return False
        
    def filter(self, objects, tag_name=None, key=lambda o,n: o.get(n)):
        '''
        Filters objects which don't match with the query.

        If tag_name is provided, the matching will be made only for the
        specified tag.

        :param objects: list of objects to check
        :param tag_name: the name of the matching tag
        :return: filtered list of objects
        '''
        
        return [o for o in objects if self._match_single(o, tag_name, key)]

    def has_condition(self, tag_name):
        '''
        Return True if query has made a condition for the specified tag name.
        '''
        
        for cond in self._conditions:
            if cond.name == tag_name:
                return True
        else:
            return False

    
    def _get_tags(self):
        '''
        Get the set of all tags involved in this query.
        '''

        return set((c.name for c in self._conditions))

    tags = property(_get_tags)

    def op_glob(self, value, pattern):
        if value is None:
            return False
        return fnmatch(str(value), str(pattern))

    def op_lt(self, lvalue, rvalue):
        lvalue = multiplicator(lvalue)
        rvalue = multiplicator(rvalue)
        return lvalue < rvalue
        
    def op_lte(self, lvalue, rvalue):
        lvalue = multiplicator(lvalue)
        rvalue = multiplicator(rvalue)
        return lvalue <= rvalue
        
    def op_gt(self, lvalue, rvalue):
        lvalue = multiplicator(lvalue)
        rvalue = multiplicator(rvalue)
        return lvalue > rvalue
        
    def op_gte(self, lvalue, rvalue):
        lvalue = multiplicator(lvalue)
        rvalue = multiplicator(rvalue)
        return lvalue >= rvalue

    def op_regex(self, value, pattern):
        if value is None:
            return False
        try:
            return re.match(pattern, value) is not None
        except re.error:
            raise TqlParsingError('Error in your regex pattern: %s' % pattern)

    def op_equal(self, lvalue, rvalue):
        if lvalue is None:
            return False
        if lvalue.isdigit() and rvalue.isdigit(): # Integer comparison:
            lvalue = multiplicator(lvalue)
            rvalue = multiplicator(rvalue)

        return lvalue == rvalue