diff --git a/ccserver/ccserver.py b/ccserver/ccserver.py index a24184ff0d02fa8db110f4e675f987959f22ade8..9c29ff05ba1b93ff2e27ba1afd2e1e8e1617e323 100644 --- a/ccserver/ccserver.py +++ b/ccserver/ccserver.py @@ -50,15 +50,16 @@ class CCServer(object): default_handler=WelcomeHandler(self), on_disconnect='on_disconnect') - def iterrole(self, role): + def iter_connected_role(self, role=None): ''' - Generator to iter on each connected client with specified role. + Generator to iter on each connected client with specified role. If role + is None, return all connected clients. :param role: role to filter ''' for login, client in self._connected.iteritems(): - if client.connection.get_handler().__class__.role_name == role: + if role is None or client.role == role: yield client def register(self, login, role, connection): @@ -105,3 +106,46 @@ class CCServer(object): logging.info('Running manager mainloop') self.manager.run() + + def get_connection(self, login): + ''' + Get the connection of a connecter account login. + + :param login: login of the connection to get + :return: :class:`RpcConnection` instance of the peer connection + ''' + + return self._connected[login] + + def resolve_tags(self, login, requested_tags=None): + ''' + Try to resolve all provided tags for the specified account. + ''' + + tags = {} + conf = self.conf.show(login) + tags['a'] = login + tags['id'] = login + tags['role'] = conf['role'] + + if login in self._connected: + client = self._connected[login] + try: + tags.update(client.connection.call('get_tags', + tuple(requested_tags))) + except Exception as err: + logging.error('Error while calling get_tags on ' + '%s: %s' % (client.login, err)) + else: + tags['status'] = 'online' + else: + tags['status'] = 'offline' + + # Apply all user specified tags: + tags.update(conf['tags']) + + # Also add user specified tags in a special field for vm inheritance + # (only useful for hypervisors) + tags['__static_user'] = conf['tags'] + + return tags diff --git a/ccserver/handlers.py b/ccserver/handlers.py index 00ab02b4645083fa2294763885b1514abe0821e8..6a20459d8c633973feab67d372ca70f73afb1dbd 100644 --- a/ccserver/handlers.py +++ b/ccserver/handlers.py @@ -62,48 +62,66 @@ class ClientHandler(OnlineCCHandler): role_name = 'client' def _list(self, query): - - # Contains all the objects to be filted: - objects = [] - # The query object: + objects = [] query = TqlQuery(query) - # Get the hypervisors that match the query: - hypervisors = list(self._server.iterrole('hypervisor')) - if query.has_condition('hv'): - hypervisors = query.filter(hypervisors, tag_name='hv', - key=lambda o,n: o.get_tags([n]).get(n)) - logging.debug('Querying %d hypervisors: %s' % (len(hypervisors), - hypervisors)) - - # Try to get vm for each matched hypervisor: - async_calls = {} - tags = tuple(query.tags) - for hy in hypervisors: - async_calls[hy.connection.async_call('list_vm', tags=tags)] = hy - - logging.debug('Waiting for the response of hypervisors...') - responses = self._server.manager.wait(frozenset(async_calls), timeout=5) - logging.debug('Responses received.') - - # Make the hypervisor object -> tags mapping: - hv_tags = {} - for hy in hypervisors: - tags = hy.get_tags(tags=tuple(query.tags)) - hv_tags[hy] = tags - objects.append(tags) - - for resp in responses: - if resp['error'] is None: - hy = async_calls[resp['id']] - for vm in resp['return']: - vm_tags = hv_tags[hy].copy() - vm_tags.update(vm) - vm_tags['role'] = 'vm' - objects.append(vm_tags) - # Filter the objects with the query, and return it: - return query.filter(objects) + for condition in query.iterconditions(): + tag_name = condition.name + + if tag_name in ('a', 'hv', 'h', 'id'): + # Append all accounts: + for login in self._server.conf.list_accounts(): + tags = self._server.resolve_tags(login, query.tags) + objects.append(tags) + + if tag_name in ('vm', 'h', 'id'): + hvs = [o for o in objects if o['role'] == 'hypervisor' + and o['status'] == 'online'] + if not hvs: + for hy in self._server.iter_connected_role('hypervisor'): + tags = self._server.resolve_tags(hy.login, query.tags) + hvs.append(tags) + + # Query the selected hypervisors: + logging.debug('Querying %d hypervisors: ' + '%s' % (len(hvs), hvs)) + async_calls = {} + tags = tuple(query.tags) + for hy_tags in hvs: + hy = self._server.get_connection(hy_tags['a']) + cid = hy.connection.async_call('list_vm', tags=tags) + async_calls[cid] = hy_tags + + logging.debug('Waiting for the response of hypervisors...') + calls = frozenset(async_calls) + responses = self._server.manager.wait(calls, timeout=5) + logging.debug('Responses received from hypervisors.') + + for resp in responses: + if resp['error'] is None: + hy = async_calls[resp['id']] + for vm_tags in resp['return']: + vm_tags['role'] = 'vm' + vm_tags['id'] = '%s.%s' % (hy['a'], vm_tags['vm']) + vm_tags.update(hy['__static_user']) + objects.append(vm_tags) + + # Filter the tag + objects = [o for o in objects if condition.check(o.get(tag_name))] + + # Before to return, filter with the asked tags for each objects: + tags = query.tags + tags.add('id') + tags.add('role') + final_objects = {} + for obj in objects: + for key in obj.keys(): + if key not in tags: + del obj[key] + final_objects[obj['id']] = obj + + return final_objects.values() @pure @listed @@ -116,11 +134,15 @@ class ClientHandler(OnlineCCHandler): return self._list(query) def _vm_action(self, query, method, *args, **kwargs): - query += '&vm' vms = self._list(query) - hypervisors = list(self._server.iterrole('hypervisor')) + hypervisors = list(self._server.iter_connected_role('hypervisor')) for hv in hypervisors: - vm_to_start = [vm['vm'] for vm in vms if vm['hv'] == hv.login] + vm_to_start = [] + for vm in vms: + if vm['role'] != 'vm': + pass + elif vm['id'].split('.')[0] == hv.login: + vm_to_start.append(vm['vm']) if vm_to_start: hv.connection.call(method, vm_to_start, *args, **kwargs) @@ -146,16 +168,7 @@ class ClientHandler(OnlineCCHandler): @pure @listed - def execute(self, query, command): - query += '&hv' - query = TqlQuery(query) - hypervisors = query.filter(list(self._server.iterrole('hypervisor')), - key=lambda o,n: o.get_tags([n]).get(n)) - outputs = [] - for hv in hypervisors: - outputs.append(hv.connection.call('execute_command', command)) - return outputs class AuthenticationError(Exception): pass diff --git a/ccserver/tql.py b/ccserver/tql.py index ec5c146b86128ac82da8fbd97e8441f330d659ee..319e1e6a97e05a2cf20061d4a07e37f6e1f718d1 100644 --- a/ccserver/tql.py +++ b/ccserver/tql.py @@ -3,6 +3,7 @@ import re from fnmatch import fnmatch +from itertools import izip MULTIPLICATOR_TABLE = {'B': 1, 'b': 1, 'o': 1, 'K': 100, 'k': 100, @@ -55,6 +56,9 @@ class TqlCondition(object): def __repr__(self): return '' % (self.name, self.value) + def check(self, value): + return self.operator(value, self.value) ^ self.invert + class TqlQuery(object): ''' Parse a query written with TQL (Tag Query Language) and allow to filter a @@ -71,12 +75,16 @@ class TqlQuery(object): '<=': 'lte', '~': 'regex'} - REGEX = re.compile(r'^(?P[a-z-A-Z0-9_-]+)' - r'((?P[!$<>~:=]{1,3})' - r'(?P[^&]+))?$') + RE_COND = re.compile(r'^(?P[a-z-A-Z0-9_-]+)' + r'((?P[!$<>~:=]{1,3})' + r'(?P[^&]+))?$') + RE_TAG = re.compile(r'^(?P[a-z-A-Z0-9_-]+)') + RE_SEPARATOR = re.compile(r'(&|\$)') + def __init__(self, query): - self._conditions = set() + self._conditions = [] + self._to_show = set() self._parse(query) def _parse(self, query): @@ -86,77 +94,62 @@ class TqlQuery(object): :param query: the query to parse ''' - conditions = query.split('&') - - for cond in conditions: - m = TqlQuery.REGEX.match(cond) - if not m: - raise TqlParsingError(("Error while parsing, invalid '" - "condition: '%s'") % cond) - - # Retrieve each parts: - name = m.group('name') - operator = m.group('operator') - - if operator: - value = m.group('value') - else: # Apply the default operator if not specified - operator = ':' - value = '*' - - # "not" operator stuff: - if operator.startswith('!') and len(operator) > 1: - operator = operator.lstrip('!') - invert = True - else: - invert = False - - # Retrieve the operator callable: - op_name = TqlQuery.OPERATORS.get(operator) - if op_name is None: - raise TqlParsingError("Error while parsing, invalid operator: " - "'%s'" % operator) - op_func = getattr(self, 'op_%s' % op_name, lambda x, y: True) - - # Create the condition and add it to the condition set: - cond = TqlCondition(name, value, op_func, invert) - self._conditions.add(cond) - - def _match_single(self, obj, tag_name=None, key=lambda o,n: o.get(n)): - ''' - Return True if provided object match all the conditions. - - If tag_name is provided, the matching will be made only for the - specified tag. - ''' - - if tag_name is None: - conditions = self._conditions - else: - conditions = [cond for cond in self._conditions - if cond.name == tag_name] - - res = (c.operator(key(obj, c.name), c.value) ^ c.invert - for c in conditions) - - if res: - return all(res) - else: - return False - - def filter(self, objects, tag_name=None, key=lambda o,n: o.get(n)): - ''' - Filters objects which don't match with the query. - - If tag_name is provided, the matching will be made only for the - specified tag. - - :param objects: list of objects to check - :param tag_name: the name of the matching tag - :return: filtered list of objects - ''' - - return [o for o in objects if self._match_single(o, tag_name, key)] + # Separate the query into tokens: + tokens = TqlQuery.RE_SEPARATOR.split(query) + + # Insert the first separator at the start of the token list: + tokens.insert(0, '&') + + for sep, tok in izip(tokens[0::2], tokens[1::2]): + + if not tok: + continue # Empty condition, we assume that is not an error + + if sep == '&': + # Condition separator, the token is actually a condition: + m = TqlQuery.RE_COND.match(tok) + if not m: + raise TqlParsingError('Error while parsing, invalid ' + 'condition: %s' % repr(tok)) + + # Retrieve each parts: + name = m.group('name') + operator = m.group('operator') + + if operator: + value = m.group('value') + else: # Apply the default operator if not specified + operator = ':' + value = '*' + + # "not" operator stuff: + if operator.startswith('!') and len(operator) > 1: + operator = operator.lstrip('!') + invert = True + else: + invert = False + + # Retrieve the operator callable: + op_name = TqlQuery.OPERATORS.get(operator) + if op_name is None: + raise TqlParsingError('Error while parsing, invalid ' + 'operator: %s' % repr(operator)) + op_func = getattr(self, 'op_%s' % op_name, lambda x, y: True) + + # Create the condition and add it to the condition list: + cond = TqlCondition(name, value, op_func, invert) + self._conditions.append(cond) + + # The tag will be also show: + self._to_show.add(name) + + elif sep == '$': + # "To show" separator, the token is a tag name: + if TqlQuery.RE_TAG.match(tok) is not None: + self._to_show.add(tok) + else: + raise TqlParsingError('Error while parsing, invalid tag ' + 'name %s' % repr(tok)) def has_condition(self, tag_name): ''' @@ -169,13 +162,16 @@ class TqlQuery(object): else: return False - + def iterconditions(self): + for cond in self._conditions: + yield cond + def _get_tags(self): ''' Get the set of all tags involved in this query. ''' - return set((c.name for c in self._conditions)) + return self._to_show tags = property(_get_tags)