Loading ccserver/ccserver.py +94 −121 Original line number Diff line number Diff line Loading @@ -3,34 +3,16 @@ import logging import socket from copy import copy from sjrpc.server import SimpleSslRpcServer from handlers import WelcomeHandler from conf import CCConf from client import CCClient from exceptions import AlreadyRegistered, NotConnectedAccountError from tql import TqlQuery, TqlCondition, TqlLimit from orderedset import OrderedSet from tql import TqlParser, TqlObject from objectsdb import ObjectsDB def conv_node_tags(tags): ''' Convert tag from new format to old format (used for release 8). ''' d = {} for k, v in tags.iteritems(): d[k] = v.get('value') return d def conv_vm_tags(tags): ''' Convert tag from new format to old format (used for release 8). ''' l = [] for k, v in tags.iteritems(): t = conv_node_tags(v) t['vm'] = k l.append(t) return l class CCServer(object): ''' Loading Loading @@ -71,12 +53,36 @@ class CCServer(object): logging.info('Started to listen on %s port %s' % (address, port)) self.objects = ObjectsDB(self) # Create the connection manager: self.manager = SimpleSslRpcServer(sock, certfile=certfile, keyfile=keyfile, default_handler=WelcomeHandler(self), on_disconnect='on_disconnect') # Register accounts on the database: self._update_accounts() def _update_accounts(self): ''' Update the database with accounts. ''' db_accounts = self.objects.get_ids() accounts = set(self.conf.list_accounts()) to_register = accounts - db_accounts to_unregister = db_accounts - accounts for login in to_register: conf = self.conf.show(login) obj = TqlObject(id=login, r=conf['role'], a=login) self.objects.register(obj) for login in to_unregister: self.objects.unregister(login) def iter_connected_role(self, role=None): ''' Generator to iter on each connected client with specified role. If role Loading Loading @@ -114,6 +120,32 @@ class CCServer(object): client = self.search_client_by_connection(connection) if client.login in self._connected: del self._connected[client.login] self.objects.unregister_children(client.login) def sub_register(self, parent, name, role): ''' Register a new node supervised by a parent. :param parent: the parent login of the subnode :param login: the name of the subnode :param role: the role of the subnode ''' obj_parent = self.objects.get_by_id(parent) oid = '%s.%s' % (parent, name) obj = TqlObject(id=oid, r=role, __parent=obj_parent) self.objects.register(obj) def sub_unregister(self, parent, name): ''' Unregister a node supervised by a parent. :param parent: the parent of the subnode :param login: the name of the subnode ''' oid = '%s.%s' % (parent, name) self.objects.unregister(oid) def search_client_by_connection(self, connection): ''' Loading Loading @@ -148,39 +180,6 @@ class CCServer(object): return self._connected[login] def resolve_tags(self, login, requested_tags=None): ''' Try to resolve all provided tags for the specified account. ''' tags = {} conf = self.conf.show(login) tags['a'] = login tags['id'] = login tags['role'] = conf['role'] if login in self._connected: client = self._connected[login] tags['con'] = client.get_uptime() tags['ip'] = client.get_ip() rtags = None if requested_tags is None else tuple(requested_tags) try: tags.update(conv_node_tags(client.connection.call('node_tags', rtags))) except Exception as err: logging.error('Error while calling get_tags on ' '%s: %s' % (client.login, err)) else: tags['con'] = 'offline' # Apply all user specified tags: tags.update(conf['tags']) # Also add user specified tags in a special field for vm inheritance # (only useful for hypervisors) tags['__static_user'] = conf['tags'] return tags def kill(self, login): ''' Disconnect from the server the client identified by provided login. Loading @@ -196,74 +195,48 @@ class CCServer(object): 'connected' % login) client.shutdown() def list(self, query): def list(self, query, show=set()): ''' List objects on server. List objects on the server. :param query: the TQL to use to select objects to list :param query: the TQL to use to selection objects on list. ''' objects = [] query = TqlQuery(query) for condition in query.iterconditions(): if isinstance(condition, TqlCondition): tag_name = condition.name if tag_name in ('a', 'hv', 'h', 'id'): # Append all accounts: for login in self.conf.list_accounts(): tags = self.resolve_tags(login, query.req_tags) objects.append(tags) if tag_name in ('vm', 'h', 'id'): hvs = [o for o in objects if o['role'] == 'hypervisor' and o['con'] != 'offline'] if not hvs: for hy in self.iter_connected_role('hypervisor'): tags = self.resolve_tags(hy.login, query.tags) hvs.append(tags) # Query the selected hypervisors: logging.debug('Querying %d hypervisors: ' '%s' % (len(hvs), hvs)) async_calls = {} tags = tuple(query.tags) for hy_tags in hvs: hy = self.get_connection(hy_tags['a']) cid = hy.connection.async_call('vm_tags', tags=query.req_tags) async_calls[cid] = hy_tags logging.debug('Waiting for the response of hypervisors...') calls = frozenset(async_calls) responses = self.manager.wait(calls, timeout=5) logging.debug('Responses received from hypervisors.') for resp in responses: if resp['error'] is None and resp['return']: hy = async_calls[resp['id']] for vm_tags in conv_vm_tags(resp['return']): print vm_tags vm_tags['role'] = 'vm' vm_tags['id'] = '%s.%s' % (hy['a'], vm_tags['vm']) vm_tags.update(hy['__static_user']) objects.append(vm_tags) # Filter the tag objects = [o for o in objects if condition.check(o.get(tag_name))] elif isinstance(condition, TqlLimit): objects = objects[:condition.number] # Before to return, filter with the asked tags for each objects: tags = query.req_tags uniq_objects = {} if tags is not None: tags += ('id', ) self._update_accounts() parser = TqlParser(query) ast, to_show, to_get = parser.parse() to_show += show to_check = set() # Calculate the tags to get/check/show: if '*' in to_show: to_get = None deny = set() for tag in copy(to_show): if tag == '*': to_show = None deny.clear() elif tag.startswith('-'): tag = tag[1:] if to_show is None: deny.add(tag) elif tag in to_show: to_show.remove(tag) if to_get is not None: to_get -= set(self.RESERVED_TAGS) to_check = to_get & deny to_get -= to_check objects = OrderedSet(self.objects.all(to_get, to_check)) if ast is not None: objects, _ = ast.eval(objects, objects) objects_dicts = [] for obj in objects: for key in obj.keys(): if tags is not None and key not in tags or key.startswith('__'): del obj[key] uniq_objects[obj['id']] = obj objects_dicts.append(obj.to_dict(to_show, deny=deny)) return uniq_objects.values() return objects_dicts Loading
ccserver/ccserver.py +94 −121 Original line number Diff line number Diff line Loading @@ -3,34 +3,16 @@ import logging import socket from copy import copy from sjrpc.server import SimpleSslRpcServer from handlers import WelcomeHandler from conf import CCConf from client import CCClient from exceptions import AlreadyRegistered, NotConnectedAccountError from tql import TqlQuery, TqlCondition, TqlLimit from orderedset import OrderedSet from tql import TqlParser, TqlObject from objectsdb import ObjectsDB def conv_node_tags(tags): ''' Convert tag from new format to old format (used for release 8). ''' d = {} for k, v in tags.iteritems(): d[k] = v.get('value') return d def conv_vm_tags(tags): ''' Convert tag from new format to old format (used for release 8). ''' l = [] for k, v in tags.iteritems(): t = conv_node_tags(v) t['vm'] = k l.append(t) return l class CCServer(object): ''' Loading Loading @@ -71,12 +53,36 @@ class CCServer(object): logging.info('Started to listen on %s port %s' % (address, port)) self.objects = ObjectsDB(self) # Create the connection manager: self.manager = SimpleSslRpcServer(sock, certfile=certfile, keyfile=keyfile, default_handler=WelcomeHandler(self), on_disconnect='on_disconnect') # Register accounts on the database: self._update_accounts() def _update_accounts(self): ''' Update the database with accounts. ''' db_accounts = self.objects.get_ids() accounts = set(self.conf.list_accounts()) to_register = accounts - db_accounts to_unregister = db_accounts - accounts for login in to_register: conf = self.conf.show(login) obj = TqlObject(id=login, r=conf['role'], a=login) self.objects.register(obj) for login in to_unregister: self.objects.unregister(login) def iter_connected_role(self, role=None): ''' Generator to iter on each connected client with specified role. If role Loading Loading @@ -114,6 +120,32 @@ class CCServer(object): client = self.search_client_by_connection(connection) if client.login in self._connected: del self._connected[client.login] self.objects.unregister_children(client.login) def sub_register(self, parent, name, role): ''' Register a new node supervised by a parent. :param parent: the parent login of the subnode :param login: the name of the subnode :param role: the role of the subnode ''' obj_parent = self.objects.get_by_id(parent) oid = '%s.%s' % (parent, name) obj = TqlObject(id=oid, r=role, __parent=obj_parent) self.objects.register(obj) def sub_unregister(self, parent, name): ''' Unregister a node supervised by a parent. :param parent: the parent of the subnode :param login: the name of the subnode ''' oid = '%s.%s' % (parent, name) self.objects.unregister(oid) def search_client_by_connection(self, connection): ''' Loading Loading @@ -148,39 +180,6 @@ class CCServer(object): return self._connected[login] def resolve_tags(self, login, requested_tags=None): ''' Try to resolve all provided tags for the specified account. ''' tags = {} conf = self.conf.show(login) tags['a'] = login tags['id'] = login tags['role'] = conf['role'] if login in self._connected: client = self._connected[login] tags['con'] = client.get_uptime() tags['ip'] = client.get_ip() rtags = None if requested_tags is None else tuple(requested_tags) try: tags.update(conv_node_tags(client.connection.call('node_tags', rtags))) except Exception as err: logging.error('Error while calling get_tags on ' '%s: %s' % (client.login, err)) else: tags['con'] = 'offline' # Apply all user specified tags: tags.update(conf['tags']) # Also add user specified tags in a special field for vm inheritance # (only useful for hypervisors) tags['__static_user'] = conf['tags'] return tags def kill(self, login): ''' Disconnect from the server the client identified by provided login. Loading @@ -196,74 +195,48 @@ class CCServer(object): 'connected' % login) client.shutdown() def list(self, query): def list(self, query, show=set()): ''' List objects on server. List objects on the server. :param query: the TQL to use to select objects to list :param query: the TQL to use to selection objects on list. ''' objects = [] query = TqlQuery(query) for condition in query.iterconditions(): if isinstance(condition, TqlCondition): tag_name = condition.name if tag_name in ('a', 'hv', 'h', 'id'): # Append all accounts: for login in self.conf.list_accounts(): tags = self.resolve_tags(login, query.req_tags) objects.append(tags) if tag_name in ('vm', 'h', 'id'): hvs = [o for o in objects if o['role'] == 'hypervisor' and o['con'] != 'offline'] if not hvs: for hy in self.iter_connected_role('hypervisor'): tags = self.resolve_tags(hy.login, query.tags) hvs.append(tags) # Query the selected hypervisors: logging.debug('Querying %d hypervisors: ' '%s' % (len(hvs), hvs)) async_calls = {} tags = tuple(query.tags) for hy_tags in hvs: hy = self.get_connection(hy_tags['a']) cid = hy.connection.async_call('vm_tags', tags=query.req_tags) async_calls[cid] = hy_tags logging.debug('Waiting for the response of hypervisors...') calls = frozenset(async_calls) responses = self.manager.wait(calls, timeout=5) logging.debug('Responses received from hypervisors.') for resp in responses: if resp['error'] is None and resp['return']: hy = async_calls[resp['id']] for vm_tags in conv_vm_tags(resp['return']): print vm_tags vm_tags['role'] = 'vm' vm_tags['id'] = '%s.%s' % (hy['a'], vm_tags['vm']) vm_tags.update(hy['__static_user']) objects.append(vm_tags) # Filter the tag objects = [o for o in objects if condition.check(o.get(tag_name))] elif isinstance(condition, TqlLimit): objects = objects[:condition.number] # Before to return, filter with the asked tags for each objects: tags = query.req_tags uniq_objects = {} if tags is not None: tags += ('id', ) self._update_accounts() parser = TqlParser(query) ast, to_show, to_get = parser.parse() to_show += show to_check = set() # Calculate the tags to get/check/show: if '*' in to_show: to_get = None deny = set() for tag in copy(to_show): if tag == '*': to_show = None deny.clear() elif tag.startswith('-'): tag = tag[1:] if to_show is None: deny.add(tag) elif tag in to_show: to_show.remove(tag) if to_get is not None: to_get -= set(self.RESERVED_TAGS) to_check = to_get & deny to_get -= to_check objects = OrderedSet(self.objects.all(to_get, to_check)) if ast is not None: objects, _ = ast.eval(objects, objects) objects_dicts = [] for obj in objects: for key in obj.keys(): if tags is not None and key not in tags or key.startswith('__'): del obj[key] uniq_objects[obj['id']] = obj objects_dicts.append(obj.to_dict(to_show, deny=deny)) return uniq_objects.values() return objects_dicts