#!/usr/bin/env python #coding=utf8 import inspect import logging from sjrpc.utils import RpcHandler, pure from tql import TqlQuery from conf import CCConf def listed(func): func.__listed__ = True return func class CCHandler(RpcHandler): ''' Base class for handlers of CloudControl server. :cvar role_name: the name of the role binded to the handler :type role_name: str or None is not applicable ''' role_name = None def __init__(self, server): self._server = server def __getitem__(self, name): logging.debug('Called method %s.%s' % (self.__class__.__name__, name)) return super(CCHandler, self).__getitem__(name) @pure def list_commands(self): cmd_list = [] for attr in dir(self): attr = getattr(self, attr, None) if getattr(attr, '__listed__', False): cmd = {} cmd['name'] = attr.__name__ cmd['description'] = inspect.cleandoc(inspect.getdoc(attr)) cmd_list.append(cmd) return cmd_list class HypervisorHandler(CCHandler): ''' Handler binded to 'node' role. ''' role_name = 'hypervisor' class ClientHandler(CCHandler): ''' Handler binded to 'cli' role. ''' role_name = 'client' @pure @listed def list_vm(self): ''' List all VMs on nodes. ''' found_vm = [] async_ids = set() # Send the asynchronous request to all nodes: for node in self._server.iterrole('node'): async_ids.add(node.async_call('list_vm')) # Wait for the response: responses = self._server.manager.wait(frozenset(async_ids), timeout=5) # Process the responses: for resp in responses: if resp['error'] is None: for vm in resp['return']: found_vm.append(vm) return tuple(found_vm) @pure @listed def list(self, query): ''' List all objects registered on this instance. ''' logging.debug('Executed list function with query %s' % query) # Contains all the objects to be filted: objects = [] # The query object: query = TqlQuery(query) # Get the hypervisors that match the query: hypervisors = list(self._server.iterrole('hypervisor')) if query.has_condition('hv'): hypervisors = query.filter(hypervisors, tag_name='hv', key=lambda o,n: o.get_tags([n]).get(n)) logging.debug('Querying %d hypervisors: %s' % (len(hypervisors), hypervisors)) # Try to get vm for each matched hypervisor: async_calls = {} tags = tuple(query.tags) for hy in hypervisors: async_calls[hy.connection.async_call('list_vm', tags=tags)] = hy logging.debug('Waiting for the response of hypervisors...') responses = self._server.manager.wait(frozenset(async_calls), timeout=5) logging.debug('Responses received.') # Make the hypervisor object -> tags mapping: hv_tags = {} for hy in hypervisors: tags = hy.get_tags(tags=tuple(query.tags)) hv_tags[hy] = tags objects.append(tags) for resp in responses: if resp['error'] is None: hy = async_calls[resp['id']] for vm in resp['return']: vm_tags = hv_tags[hy].copy() vm_tags.update(vm) vm_tags['role'] = 'vm' objects.append(vm_tags) # Filter the objects with the query, and return it: return query.filter(objects) class AuthenticationError(Exception): pass class WelcomeHandler(CCHandler): ''' Default handler used on client connections of the server. :cvar ROLES: role name/handler mapping ''' ROLES = { 'client': ClientHandler, 'hypervisor': HypervisorHandler, } @listed def authentify(self, connection, login, password): ''' Authenticate the client. ''' try: role = self._server.conf.authentify(login, password) except CCConf.UnknownAccount: raise AuthenticationError('Authentication failure (Unknown login)') if role is None: logging.info('New authentication from %s: failure' % login) raise AuthenticationError('Authentication failure') else: # If authentication is a success, ask tags to the server: self._server.register(login, role, connection) connection.set_handler(WelcomeHandler.ROLES.get(role)(self._server)) logging.info('New authentication from %s: success' % login) return role