Skip to content
ccserver.py 9.05 KiB
Newer Older
Antoine Millet's avatar
Antoine Millet committed
#!/usr/bin/env python
#coding=utf8

import logging
import socket
from sjrpc.server import SimpleSslRpcServer
from handlers import WelcomeHandler
from conf import CCConf
from client import CCClient
from exceptions import AlreadyRegistered, NotConnectedAccountError
from tql import TqlQuery, TqlCondition, TqlLimit
Antoine Millet's avatar
Antoine Millet committed

def conv_node_tags(tags):
    '''
    Convert tag from new format to old format (used for release 8).
    '''

    d = {}
    for k, v in tags.iteritems():
        d[k] = v.get('value')
    return d

def conv_vm_tags(tags):
    '''
    Convert tag from new format to old format (used for release 8).
    '''

    l = []
    for k, v in tags.iteritems():
        t = conv_node_tags(v)
        t['vm'] = k
        l.append(t)
    return l

Antoine Millet's avatar
Antoine Millet committed
class CCServer(object):
    '''
    CloudControl server main class.

    :param conf_dir: the directory that store the client configuration
    :param certfile: the path to the ssl certificate
    :param keyfile: the path to the ssl key
    :param address: the interface to bind
    :param port: the port to bind
    '''

    LISTEN_BACKLOG = 5

    # These tags are reserved and cannot be setted by an user:
    RESERVED_TAGS = ('id', 'a', 'r', 'close', 'con', 'ip')

Antoine Millet's avatar
Antoine Millet committed
    def __init__(self, conf_dir, certfile=None, keyfile=None,
                 address='0.0.0.0', port=1984):

        # Dict containing all connected accounts, the key is the login of
        # the account and the value the :class:`RpcConnection` of the peer:
        self._connected = {}

        # The interface object to the configuration directory:
        self.conf = CCConf(conf_dir)

        # Create the server socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((address, port))
        sock.listen(CCServer.LISTEN_BACKLOG)

        if certfile:
            logging.info('SSL Certificate: %s' % certfile)
        if keyfile:
            logging.info('SSL Key: %s' % certfile)

        logging.info('Started to listen on %s port %s' % (address, port))
        
        # Create the connection manager:
        self.manager = SimpleSslRpcServer(sock, certfile=certfile,
                                          keyfile=keyfile,
                                          default_handler=WelcomeHandler(self),
                                          on_disconnect='on_disconnect')
Antoine Millet's avatar
Antoine Millet committed

    def iter_connected_role(self, role=None):
Antoine Millet's avatar
Antoine Millet committed
        '''
        Generator to iter on each connected client with specified role. If role
        is None, return all connected clients.
Antoine Millet's avatar
Antoine Millet committed

        :param role: role to filter
        '''

        for login, client in self._connected.iteritems():
            if role is None or client.role == role:
Antoine Millet's avatar
Antoine Millet committed
                yield client

    def register(self, login, role, connection):
        '''
        Register a new connected account on the server.

        :param login: login of the account
        :param connection: connection to register
        :param tags: tags to add for the client
        '''

        if login in self._connected:
            raise AlreadyRegistered('A client is already connected with this '
                                    'account.')
        else:
            self._connected[login] = CCClient(login, role, self, connection)
Antoine Millet's avatar
Antoine Millet committed

    def unregister(self, connection):
        '''
        Unregister an already connected account.

        :param connection: the connection of the client to unregister
        '''
        
        client = self.search_client_by_connection(connection)
        if client.login in self._connected:
            del self._connected[client.login]

    def search_client_by_connection(self, connection):
        '''
        Search a connected client by it connection. If no client is found,
        return None.

        :param connection: the connection of the client to search
        :return: the found client or None
        '''

        for client in self._connected.values():
            if client.connection is connection:
                return client
        else:
            return None

Antoine Millet's avatar
Antoine Millet committed
    def run(self):
        '''
        Run the server mainloop.
        '''

        logging.info('Running manager mainloop')
        self.manager.run()

    def get_connection(self, login):
        '''
        Get the connection of a connecter account login.

        :param login: login of the connection to get
        :return: :class:`RpcConnection` instance of the peer connection
        '''

        return self._connected[login]
        
    def resolve_tags(self, login, requested_tags=None):
        '''
        Try to resolve all provided tags for the specified account.
        '''

        tags = {}
        conf = self.conf.show(login)
        tags['a'] = login
        tags['id'] = login
        tags['role'] = conf['role']

        if login in self._connected:
            client = self._connected[login]
Antoine Millet's avatar
Antoine Millet committed
            tags['con'] = client.get_uptime()
Antoine Millet's avatar
Antoine Millet committed
            tags['ip'] = client.get_ip()
            rtags = None if requested_tags is None else tuple(requested_tags)
                tags.update(conv_node_tags(client.connection.call('node_tags', rtags)))
            except Exception as err:
                logging.error('Error while calling get_tags on '
                              '%s: %s' % (client.login, err))
        else:
Antoine Millet's avatar
Antoine Millet committed
            tags['con'] = 'offline'

        # Apply all user specified tags:
        tags.update(conf['tags'])

        # Also add user specified tags in a special field for vm inheritance
        # (only useful for hypervisors)
        tags['__static_user'] = conf['tags']

        return tags
    def kill(self, login):
        '''
        Disconnect from the server the client identified by provided login.

        :param login: the login of the user to disconnect
        :throws NotConnectedAccount: when provided account is not connected (or
            if account doesn't exists).
        '''

        client = self._connected.get(login)
        if client is None:
            raise NotConnectedAccountError('The account %s is not '
                                           'connected' % login)
        client.shutdown()

    def list(self, query):
        '''
        List objects on server.

        :param query: the TQL to use to select objects to list
        '''

        objects = []
        query = TqlQuery(query)

        for condition in query.iterconditions():
            if isinstance(condition, TqlCondition):
                tag_name = condition.name
                
                if tag_name in ('a', 'hv', 'h', 'id'):
                    # Append all accounts:
                    for login in self.conf.list_accounts():
                        tags = self.resolve_tags(login, query.req_tags)
                        objects.append(tags)

                if tag_name in ('vm', 'h', 'id'):
                    hvs = [o for o in objects if o['role'] == 'hypervisor'
                           and o['con'] != 'offline']
                    if not hvs:
                        for hy in self.iter_connected_role('hypervisor'):
                            tags = self.resolve_tags(hy.login, query.tags)
                            hvs.append(tags)

                    # Query the selected hypervisors:
                    logging.debug('Querying %d hypervisors: '
                                  '%s' % (len(hvs), hvs))
                    async_calls = {}
                    tags = tuple(query.tags)
                    for hy_tags in hvs:
                        hy = self.get_connection(hy_tags['a'])
                        cid = hy.connection.async_call('vm_tags', tags=query.req_tags)
                        async_calls[cid] = hy_tags

                    logging.debug('Waiting for the response of hypervisors...')
                    calls = frozenset(async_calls)
                    responses = self.manager.wait(calls, timeout=5)
                    logging.debug('Responses received from hypervisors.')

                    for resp in responses:
Antoine Millet's avatar
Antoine Millet committed
                        if resp['error'] is None and resp['return']:
                            for vm_tags in conv_vm_tags(resp['return']):
                                print vm_tags
                                vm_tags['role'] = 'vm'
                                vm_tags['id'] = '%s.%s' % (hy['a'], vm_tags['vm'])
                                vm_tags.update(hy['__static_user'])
                                objects.append(vm_tags)

                # Filter the tag
                objects = [o for o in objects if condition.check(o.get(tag_name))]

            elif isinstance(condition, TqlLimit):
                objects = objects[:condition.number]
                
        # Before to return, filter with the asked tags for each objects:
        tags = query.req_tags
        uniq_objects = {}
        if tags is not None:
            tags += ('id', )
        for obj in objects:
            for key in obj.keys():
                if tags is not None and key not in tags or key.startswith('__'):
                    del obj[key]
            uniq_objects[obj['id']] = obj

        return uniq_objects.values()