From 88d9169331f02bec53befe6f1a6c680bf1b28ff4 Mon Sep 17 00:00:00 2001 From: Antoine Millet <antoine.millet@smartjog.com> Date: Wed, 23 May 2012 14:54:28 +0200 Subject: [PATCH] Refactored client and associated rpc handlers Created a class for each client role Moved handled in same module of the client role class Fixed handlers to be compatible with new object db. --- ccserver/ccserver.py | 272 ++++------ ccserver/clients/__init__.py | 246 +++++++++ ccserver/clients/bootstrap.py | 12 + ccserver/clients/cli.py | 680 +++++++++++++++++++++++++ ccserver/clients/client.py | 1 + ccserver/clients/host.py | 11 + ccserver/clients/hv.py | 184 +++++++ ccserver/clients/spv.py | 33 ++ ccserver/db.py | 60 +++ ccserver/handlers.py | 927 +--------------------------------- 10 files changed, 1354 insertions(+), 1072 deletions(-) create mode 100644 ccserver/clients/__init__.py create mode 100644 ccserver/clients/bootstrap.py create mode 100644 ccserver/clients/cli.py create mode 100644 ccserver/clients/client.py create mode 100644 ccserver/clients/host.py create mode 100644 ccserver/clients/hv.py create mode 100644 ccserver/clients/spv.py create mode 100644 ccserver/db.py diff --git a/ccserver/ccserver.py b/ccserver/ccserver.py index 6dd9be4..17f6a84 100644 --- a/ccserver/ccserver.py +++ b/ccserver/ccserver.py @@ -6,20 +6,39 @@ from __future__ import absolute_import import logging from fnmatch import fnmatch as glob -from functools import partial from sjrpc.server import SSLRpcServer +from sjrpc.utils import RpcHandler, pass_connection -from ccserver.handlers import WelcomeHandler from ccserver.conf import CCConf -from ccserver.client import CCClient -from ccserver.exceptions import AlreadyRegistered, NotConnectedAccountError +from ccserver.exceptions import AlreadyRegistered, NotConnectedAccountError, AuthenticationError, BadRoleError from ccserver.jobs import JobsManager +from ccserver.clients import Client -from cloudcontrol.common.tql.db.object import TqlObject -from cloudcontrol.common.tql.db.tag import StaticTag, CallbackTag +#from cloudcontrol.common.tql.db.object import TqlObject +from ccserver.db import SObject +from cloudcontrol.common.tql.db.tag import StaticTag from cloudcontrol.common.tql.db.db import TqlDatabase +# Import all enabled roles +import ccserver.clients.cli +import ccserver.clients.host +import ccserver.clients.hv +import ccserver.clients.bootstrap +import ccserver.clients.spv + + +class WelcomeHandler(RpcHandler): + """ Default handler used on client connections of the server. + """ + + def __init__(self, server): + self._server = server + + @pass_connection + def authentify(self, conn, login, password): + return self._server.authenticate(conn, login, password) + class CCServer(object): ''' @@ -32,7 +51,7 @@ class CCServer(object): :param port: the port to bind ''' - LISTEN_BACKLOG = 5 + LISTEN_BACKLOG = 50 # These tags are reserved and cannot be setted by an user: RESERVED_TAGS = ('id', 'a', 'r', 'close', 'con', 'ip', 'p') @@ -40,9 +59,8 @@ class CCServer(object): def __init__(self, conf_dir, maxcon, maxidle, certfile=None, keyfile=None, address='0.0.0.0', port=1984): - # Dict containing all connected accounts, the key is the login of - # the account and the value the :class:`RpcConnection` of the peer: - self._connected = {} + self._clients = {} # Clients connected to the server + # The interface object to the configuration directory: self.conf = CCConf(conf_dir) # Some settings: @@ -80,9 +98,12 @@ class CCServer(object): for login in to_register: conf = self.conf.show(login) - obj = TqlObject(login) - obj.register(StaticTag('r', conf['role'])) - obj.register(StaticTag('a', login)) + obj = SObject(login) + obj.register(StaticTag('r', conf['role']), override=True) + obj.register(StaticTag('a', login), override=True) + # Register static tags: + for tag, value in self.conf.show(login)['tags'].iteritems(): + obj.register(StaticTag(tag, value), override=True) self.db.register(obj) for login in to_unregister: @@ -96,10 +117,66 @@ class CCServer(object): :param role: role to filter ''' - for login, client in self._connected.items(): + for login, client in self._clients.items(): if role is None or client.role == role: yield client + def authenticate(self, conn, login, password): + """ Authenticate a client against provided login and password. + + If the authentication is a success, register the client on the server + and return the client role, else, raise an exception. + """ + + logmsg = 'Authentication error from %s: ' + with self.conf: + try: + role = self.conf.authentify(login, password) + except CCConf.UnknownAccount: + raise AuthenticationError('Unknown login') + else: + if 'close' in self.conf.show(login)['tags']: + logging.warning(logmsg + 'account closed (%s)', conn.getpeername(), login) + raise AuthenticationError('Account is closed') + + if role is None: + logging.warning(logmsg + 'bad login/password (%s)', conn.getpeername(), login) + raise AuthenticationError('Bad login/password') + else: + if role not in Client.roles: + logging.warning(logmsg + 'bad role in account config (%s)', conn.getpeername(), login) + raise BadRoleError('%r is not a legal role' % role) + + create_object = False + + # If authentication is a success, try to register the client: + if role == 'bootstrap': + # Set a bootstrap id for the object: + login = '%s.%s' % (login, conn.get_fd()) + # Real role of the node will be host: + role = 'host' + create_object = True + + # Try to register the client: + for _ in xrange(5): + try: + self.register(login, role, conn, create_object) + except AlreadyRegistered: + if role == 'cli': + try: + self.kill(login) + except NotConnectedAccountError: + pass + else: + break + else: + logging.warning(logmsg + 'already connected (%s)', conn.getpeername(), login) + raise AuthenticationError('Already connected') + + logging.info('Authentication success from %s with login %s', conn.getpeername(), login) + return role + + def register(self, login, role, connection, create_object=False): ''' Register a new connected account on the server. @@ -110,94 +187,26 @@ class CCServer(object): :param create_object: create the object on objectdb ''' - if login in self._connected: - raise AlreadyRegistered('A client is already connected with this ' - 'account.') - else: - self._connected[login] = CCClient(login, role, self, connection) - # Create the object on objectdb if required: if create_object: - obj = TqlObject(login) - obj.register(StaticTag('r', role)) - self.db.register(obj) + tql_object = SObject(login) + tql_object.register(StaticTag('r', role)) + self.db.register(tql_object) else: - obj = self.db.get(login) - assert obj is not None - - # Define server defined tags for the new node: - obj.register(CallbackTag('con', self._connected[login].get_uptime, ttl=0)) - obj.register(CallbackTag('ip', self._connected[login].get_ip)) - - def unregister(self, connection): - ''' - Unregister an already connected account. - - :param connection: the connection of the client to unregister - ''' - #XXX: not functional since new db!!! - client = self.search_client_by_connection(connection) + tql_object = self.db.get(login) + assert tql_object is not None - # Unregister objects from database if it have no account attached: - obj = self.db.get(client.login) - if obj is not None and 'a' not in obj: - self.db.unregister(obj) + # Register the client: + if login in self._clients: + raise AlreadyRegistered('A client is already connected with this account.') else: - # Unregister tags of connected account: - obj.unregister() - - if client.login in self._connected: - del self._connected[client.login] - #self.objects.unregister_children(client.login) - - def sub_register(self, parent, name, role): - ''' - Register a new node supervised by a parent. - - :param parent: the parent login of the subnode - :param login: the name of the subnode - :param role: the role of the subnode - ''' - - client = self.get_connection(parent) - child = '%s.%s' % (parent, name) - client.register_child(child) - - # Register the children in the tags database: - obj = TqlObject(child) - obj.register(StaticTag('r', role)) - obj.register(StaticTag('p', client)) - self.db.register(obj) - - def sub_unregister(self, parent, name): - ''' - Unregister a node supervised by a parent. - - :param parent: the parent of the subnode - :param login: the name of the subnode - ''' - - client = self.get_connection(parent) - child = '%s.%s' % (parent, name) - client.unregister_child(child) - - # Unregister the children from the tags database: - self.objects.unregister(child) - - def search_client_by_connection(self, connection): - ''' - Search a connected client by it connection. If no client is found, - return None. - - :param connection: the connection of the client to search - :return: the found client or None - ''' + client = Client.from_role(role, login, self, connection, tql_object) + self._clients[login] = client - for client in self._connected.values(): - if client.connection is connection: - return client - else: - return None + def unregister(self, client): + """ Unregister a client. + """ + del self._clients[client.login] def run(self): ''' @@ -214,15 +223,13 @@ class CCServer(object): logging.debug('Running rpc mainloop') self.rpc.run() - def get_connection(self, login): - ''' - Get the connection of a connecter account login. + def get_client(self, login): + """ Get a connected client by its login. :param login: login of the connection to get - :return: :class:`RpcConnection` instance of the peer connection - ''' - - return self._connected[login] + :return: the client instance + """ + return self._clients[login] def kill(self, login): ''' @@ -233,7 +240,7 @@ class CCServer(object): if account doesn't exists). ''' - client = self._connected.get(login) + client = self._clients.get(login) if client is None: raise NotConnectedAccountError('The account %s is not ' 'connected' % login) @@ -263,63 +270,6 @@ class CCServer(object): return right['target'] == 'allow' return False - def tags_register(self, login, name, ttl=None, value=None): - ''' - Register a new tag for a client. - - :param login: login of the client - :param name: name of the tag to register - :param ttl: TTL of the tag if applicable (None = no TTL, the tag will - never expire) - :param value: value of the tag - ''' - - obj = self.db.get(login) - client = self._connected.get(login) - callback = partial(client.get_remote_tags, name) - tag = CallbackTag(name, callback, ttl=ttl) - obj.register(tag) - - def tags_unregister(self, login, name): - ''' - Unregister a tag for the client. - - :param login: login of the client - :param name: name of the tag to unregister - ''' - - obj = self.db.get(login) - obj.unregister(name) - - def tags_drop(self, login, name): - ''' - Drop the cached value of a tag for the client. - - :param login: login of the client - :param name: name of the tag to drop - ''' - obj = self.db.get(login) - tag = obj.get(name) - if tag is not None: - tag.invalidate() - - def tags_update(self, login, name, value=None, ttl=None): - ''' - Update a tag. - - :param login: login of the client - :param name: name of the tag to update - :param value: new value of the tag - :param ttl: new ttl of the tag - ''' - obj = self.db.get(login) - tag = obj.get(name) - if tag is not None: - if value is not None: - tag.set_value(value) - if ttl is not None: - tag.ttl = ttl - def list(self, query, show=None): self._update_accounts() return self.db.query(query, show) diff --git a/ccserver/clients/__init__.py b/ccserver/clients/__init__.py new file mode 100644 index 0000000..c79dee2 --- /dev/null +++ b/ccserver/clients/__init__.py @@ -0,0 +1,246 @@ +""" Connected client management package. + +This package store classes representing each client's role and the associated +sjRPC handler. +""" + +import logging + +from functools import partial +from datetime import datetime + +from cloudcontrol.common.tql.db.tag import CallbackTag + +from ccserver.handlers import CCHandler, listed +from ccserver.exceptions import RightError + + +class RegisteredCCHandler(CCHandler): + + """ Basic handler for all registered clients. + """ + + def __getitem__(self, name): + self.client.top() + return super(RegisteredCCHandler, self).__getitem__(name) + + def on_disconnect(self, conn): + logging.info('Client %s disconnected', self.client.login) + self.client.shutdown() + + def check(self, method, tql=None): + """ Check if the client have access to this method. + """ + allow = self.client.server.check(self.client.login, method, tql) + if not allow: + raise RightError('You are not allowed to do this action.') + + # + # Tags registration handler functions: + # + + @listed + def tags_register(self, name, ttl=None, value=None): + """ Register a new tag on the calling node. + + :param name: name of the tag to register + :param ttl: ttl of the tag (or None if not applicable) + :param value: value to fill the tag (optionnal) + """ + self.client.tags_register(name, ttl, value) + + @listed + def tags_unregister(self, name): + """ Unregister a tag on the calling node. + + :param name: name of the tag to unregister + """ + self.client.tags_unregister(name) + + @listed + def tags_drop(self, name): + """ Drop the tag value of the specified tag on the calling node. + + :param name: name of the tag to drop + """ + self.client.tags_drop(name) + + @listed + def tags_update(self, name, value, ttl=None): + """ Update the value of the specified tag on the calling node. + + :param name: name of the tag to update + :param value: new tag value + :param ttl: new ttl value + """ + self.client.tags_update(name, value, ttl) + + +class Client(object): + + """ Base class for all types cc-server clients. + + :param login: login of the client + :param server: server instance + :param connection: rpc connection to the client + """ + + ROLE = None + RPC_HANDLER = RegisteredCCHandler + + roles = {} + + def __init__(self, login, server, connection, tql_object): + self._login = login + self._server = server + self._connection = connection + self._tql_object = tql_object + self._handler = self.RPC_HANDLER(self) + self._last_action = datetime.now() + self._connection_date = datetime.now() + + # Set the role's handler for the client: + self._connection.rpc.set_handler(self._handler) + + # Remote tags registered: + self._remote_tags = set() + + # Register the server defined client tags: + self._tql_object.register(CallbackTag('con', lambda: self.uptime, ttl=0)) + self._tql_object.register(CallbackTag('idle', lambda: self.idle, ttl=0)) + self._tql_object.register(CallbackTag('ip', lambda: self.ip)) + + @classmethod + def register_client_class(cls, class_): + """ Register a new client class. + """ + cls.roles[class_.ROLE] = class_ + + @classmethod + def from_role(cls, role, login, server, connection, tql_object): + return cls.roles[role](login, server, connection, tql_object) + + # + # Properties + # + + @property + def login(self): + """ Return the login of this client. + """ + return self._login + + @property + def server(self): + """ Return the cc-server binded to this client. + """ + return self._server + + @property + def conn(self): + """ Return the sjrpc connection to the client. + """ + return self._connection + + @property + def uptime(self): + """ Get the uptime of the client connection in seconds. + + :return: uptime of the client + """ + + dt = datetime.now() - self._connection_date + return dt.seconds + dt.days * 86400 + + @property + def idle(self): + """ Get the idle time of the client connection in seconds. + + :return: idle of the client + """ + dt = datetime.now() - self._last_action + return dt.seconds + dt.days * 86400 + + @property + def ip(self): + """ Get client remote ip address. + """ + peer = self.conn.getpeername() + return ':'.join(peer.split(':')[:-1]) + + def get_tags(self, tags): + """ Get tags on the remote node. + + :param tags: tags is the list of tags to fetch + """ + return self._connection.call('get_tags', tags) + + def shutdown(self): + """ Shutdown the connection to the client. + """ + # Unregister all remote tags: + for tag in self._remote_tags.copy(): + self.tags_unregister(tag) + + # Unrefister all server defined client tags: + self._tql_object.unregister('con') + self._tql_object.unregister('idle') + self._tql_object.unregister('ip') + + self._server.rpc.unregister(self.conn, shutdown=True) + self._server.unregister(self) + + def top(self): + """ Reset the "last action" date to now. + """ + self._last_action = datetime.now() + + def get_remote_tags(self, tag): + return self.conn.call('get_tags', (tag,))[tag] + + def tags_register(self, name, ttl=None, value=None): + """ Register a new remote tag for the client. + + :param name: name of the tag to register + :param ttl: TTL of the tag if applicable (None = no TTL, the tag will + never expire) + :param value: value of the tag + """ + + callback = partial(self.get_remote_tags, name) + tag = CallbackTag(name, callback, ttl=ttl) + self._tql_object.register(tag) + self._remote_tags.add(name) + + def tags_unregister(self, name): + """ + Unregister a remote tag for the client. + + :param name: name of the tag to unregister + """ + + self._tql_object.unregister(name) + self._remote_tags.discard(name) + + def tags_drop(self, name): + """ Drop the cached value of a remote tag for the client. + + :param name: name of the tag to drop + """ + tag = self._tql_object.get(name) + if tag is not None: + tag.invalidate() + + def tags_update(self, name, value=None, ttl=None): + """ Update a remote tag. + + :param name: name of the tag to update + :param value: new value of the tag + :param ttl: new ttl of the tag + """ + tag = self._tql_object.get(name) + if tag is not None: + if value is not None: + tag.set_value(value) + if ttl is not None: + tag.ttl = ttl diff --git a/ccserver/clients/bootstrap.py b/ccserver/clients/bootstrap.py new file mode 100644 index 0000000..89c942a --- /dev/null +++ b/ccserver/clients/bootstrap.py @@ -0,0 +1,12 @@ +from ccserver.clients import Client + + +class BootstrapClient(Client): + + """ A bootstrap client connected to the cc-server. + """ + + ROLE = 'bootstrap' + + +Client.register_client_class(BootstrapClient) diff --git a/ccserver/clients/cli.py b/ccserver/clients/cli.py new file mode 100644 index 0000000..7b77fe8 --- /dev/null +++ b/ccserver/clients/cli.py @@ -0,0 +1,680 @@ +import logging +from collections import defaultdict + +from sjrpc.core import RpcError + +from ccserver.orderedset import OrderedSet +from ccserver.conf import CCConf +from ccserver.exceptions import (AlreadyRegistered, AuthenticationError, + RightError, ReservedTagError, BadObjectError, + BadRoleError, NotConnectedAccountError, + CloneError) +from ccserver.election import Elector + +from ccserver.handlers import listed, Reporter +from ccserver.clients import Client, RegisteredCCHandler +from cloudcontrol.common.tql.db.tag import StaticTag + +MIGRATION_TYPES = {'cold': 'cold_migrate', + 'hot': 'hot_migrate',} + + +class CliHandler(RegisteredCCHandler): + """ Handler binded to 'cli' role. + + Summary of methods: + + ================ ================================ ============= + Method name Description Right(s) + ================ ================================ ============= + list list objects list + start start a vm start + stop stop a vm stop + destroy destroy a vm destroy + pause suspend a vm pause + resume resume a paused vm resume + passwd change password of accounts passwd + addaccount add a new account addaccount + copyaccount copy an account addaccount + addtag add a tag to accounts addtag + deltag remove a tag from accounts deltag + tags show tags of accounts tags + delaccount delete an account delaccount + close close an account close + declose declose an account declose + kill kill a connected account kill + rights show rights of accounts rights + addright add right rules to accounts addright + delright remove right rules from accounts delright + execute execute remote command on hosts execute + shutdown shutdown a connected client shutdown + jobs show jobs jobs + cancel cancel a running job cancel + jobspurge remove done jobs from jobs list jobspurge + ================ ================================ ============= + """ + + @listed + def list(self, query): + """ List all objects registered on this instance. + + :param query: the query to select objects to show + """ + + self.check('list', query) + logging.debug('Executed list function with query %s', query) + objects = self.server.list(query) + order = OrderedSet(['id']) + #if tags is not None: + # order |= OrderedSet(tags) + return {'objects': objects, 'order': list(order)} + + def _vm_action(self, query, method, *args, **kwargs): + """ Do an action on a virtual machine. + """ + errs = Reporter() + # Search all hypervisors of selected vms: + for vm in self.server.list(query, show=('r', 'h', 'p')): + if vm['r'] != 'vm': + errs.error(vm['id'], 'not a vm') + else: + hvclient = self.server.get_client(vm['p']) + if hvclient is None: + errs.error(vm['id'], 'offline hypervisor') + else: + try: + hvclient.vm_action(method, vm['h']) + except Exception as err: + errs.error(vm['id'], str(err)) + else: + errs.success(vm['id'], 'ok') + return errs.get_dict() + + @listed + def start(self, query): + """ Start a virtual machine. + """ + self.check('start', query) + return self._vm_action(query, 'vm_start') + + @listed + def stop(self, query): + """ Stop a virtual machine. + """ + self.check('stop', query) + return self._vm_action(query, 'vm_stop') + + @listed + def destroy(self, query): + """ Destroy (hard shutdown) a virtual machine. + """ + self.check('destroy', query) + return self._vm_action(query, 'vm_destroy') + + @listed + def pause(self, query): + """ Pause a virtual machine. + """ + self.check('pause', query) + return self._vm_action(query, 'vm_suspend') + + @listed + def resume(self, query): + """ Resume a virtual machine. + """ + self.check('resume', query) + return self._vm_action(query, 'vm_resume') + + @listed + def undefine(self, query, delete_storage=True): + """ Undefine selected virtual machines. + + :param query: the tql query to select objects. + :param delete_storage: delete storage of vm. + :return: a dict where key is the id of a selected object, and the value + a tuple (errcode, message) where errcode is (success|error|warn) and + message an error message or the output of the command in case of + success. + """ + + self.check('undefine', query) + + #FIXME: When tag globbing will be implemented, the list of tags to + # show will be: r, p, h, disk* + # I ask "all tags" pending implementation. + objects = self.server.list(query, show=('*',)) + errs = Reporter() + for obj in objects: + if obj['r'] != 'vm': + errs.error(obj['id'], 'bad role') + continue + try: + hvcon = self.server.get_connection(obj['p']) + except KeyError: + errs.error(obj['id'], 'hypervisor not connected') + else: + if delete_storage: + for disk in obj.get('disk', '').split(): + pool = obj.get('disk%s_pool' % disk) + name = obj.get('disk%s_vol' % disk) + hvcon.proxy.vol_delete(pool, name) + hvcon.proxy.vm_undefine(obj['h']) + errs.success(obj['id'], 'vm undefined') + + return errs.get_dict() + + @listed + def passwd(self, query, password, method='ssha'): + """ Define a new password for selected users. + + :param query: the query to select the objects to change + :param password: the password to set (None to remove password) + :param method: the hash method (sha, ssha, md5, smd5 or plain) + :return: a standard report output + """ + + self.check('passwd', query) + objects = self.server.list(query, show=('a',)) + errs = Reporter() + with self.conf: + for obj in objects: + if 'a' not in obj: + errs.error(obj['id'], 'not an account') + continue + + self.conf.set_password(obj['a'], password, method) + errs.success(obj['id'], 'password updated') + + return errs.get_dict() + + @listed + def addaccount(self, login, role, password=None): + """ Create a new account with specified login. + + :param login: the login of the new account + :param role: the role of the new account + :param password: the password of the new account (None = not set) + """ + + self.check('addaccount') + + if role in Client.roles: + self.conf.create_account(login, role, password) + else: + raise BadRoleError('%r is not a legal role.' % role) + + @listed + def copyaccount(self, copy_login, login, password=None): + """ Create a new account with specified login. + + :param copy_login: the login of the account to copy + :param login: the login of the new account + :param password: the password of the new account (default None) + """ + + self.check('addaccount') + self.conf.copy_account(copy_login, login, password) + + @listed + def addtag(self, query, tag_name, tag_value): + """ Add a tag to the accounts which match the specified query. + + :param query: the query to select objects + :param tag_name: the name of the tag to add + :param tag_value: the value of the tag + """ + + self.check('addtag', query) + + if tag_name in self.server.RESERVED_TAGS: + raise ReservedTagError('Tag %r is read-only' % tag_name) + + objects = self.server.list(query, show=('a',)) + errs = Reporter() + with self.conf: + for obj in objects: + print obj + if 'a' not in obj: + errs.error(obj['id'], 'not an account') + continue + + # Update the configuration for this account: + tags = self.conf.show(obj['a'])['tags'] + if tag_name in tags: + errs.warn(obj['id'], 'tag already exists, changed from %s' + ' to %s' % (tags[tag_name], tag_value)) + # Update the object db (update the tag value): + dbobj = self.server.db.get(obj['id']) + dbobj[tag_name].set_value(tag_value) + else: + errs.success(obj['id'], 'tag created') + # Update the object db (create the tag): + dbobj = self.server.db.get(obj['id']) + dbobj.register(StaticTag(tag_name, tag_value), override=True) + self.conf.add_tag(obj['a'], tag_name, tag_value) + + return errs.get_dict() + + @listed + def deltag(self, query, tag_name): + """ Remove a tag of the selected accounts. + + :param query: the query to select objects + :param tag_name: the name of the tag to remove + """ + + self.check('deltag', query) + + if tag_name in self.server.RESERVED_TAGS: + raise ReservedTagError('Tag %r is read-only' % tag_name) + + objects = self.server.list(query, show=('a',)) + errs = Reporter() + with self.conf: + for obj in objects: + if 'a' not in obj: + errs.error(obj['id'], 'not an account') + continue + tags = self.conf.show(obj['a'])['tags'] + if tag_name in tags: + errs.success(obj['id'], 'tag deleted') + dbobj = self.server.db.get(obj['id']) + dbobj.unregister(tag_name, override=True) + else: + errs.warn(obj['id'], 'unknown tag') + self.server.conf.remove_tag(obj['a'], tag_name) + + return errs.get_dict() + + @listed + def tags(self, query): + """ Return all static tags attached to the selected accounts. + + :param query: the query to select objects + """ + + self.check('tags', query) + objects = self.server.list(query, show=('a',)) + tags = [] + for obj in objects: + o = {'id': obj['id']} + if 'a' in obj: + otags = self.server.conf.show(obj['a'])['tags'] + otags.update({'id': obj['id']}) + o.update(otags) + tags.append(o) + return {'objects': tags, 'order': ['id']} + + @listed + def delaccount(self, query): + """ Delete the accounts selected by query. + + :param query: the query to select objects + """ + + self.check('delaccount', query) + objects = self.server.list(query, show=('a',)) + errs = Reporter() + with self.server.conf: + for obj in objects: + if 'a' not in obj: + errs.error(obj['id'], 'not an account') + continue + try: + self.server.conf.remove_account(obj['a']) + except CCConf.UnknownAccount: + errs.error(obj['id'], 'unknown account') + else: + errs.success(obj['id'], 'account deleted') + + self.server.jobs.create('kill', author=self.client.login, + account=obj['a'], gracetime=1) + + return errs.get_dict() + + @listed + def close(self, query): + """ Close selected account an account without deleting them. + + :param query: the query to select objects + """ + + self.check('close', query) + objects = self.server.list(query, show=('a',)) + errs = Reporter() + with self.server.conf: + for obj in objects: + if 'a' not in obj: + errs.error(obj['id'], 'not an account') + continue + tags = self.server.conf.show(obj['a'])['tags'] + if 'close' in tags: + errs.warn(obj['id'], 'account already closed') + continue + + errs.success(obj['id'], 'closed') + self.server.conf.add_tag(obj['a'], 'close', 'yes') + + self.server.jobs.create('kill', author=self.client.login, + account=obj['a'], gracetime=1) + + return errs.get_dict() + + @listed + def declose(self, query): + """ Re-open selected closed accounts. + + :param query: the query to select objects + """ + + self.check('declose', query) + objects = self.server.list(query, show=('a',)) + errs = Reporter() + with self.server.conf: + for obj in objects: + if 'a' not in obj: + errs.error(obj['id'], 'not an account') + continue + tags = self.conf.show(obj['a'])['tags'] + if 'close' in tags: + errs.success(obj['id'], 'account declosed') + else: + errs.warn(obj['id'], 'account not closed') + self.conf.remove_tag(obj['a'], 'close') + + return errs.get_dict() + + @listed + def kill(self, query): + """ Disconnect all connected accounts selected by query. + + :param query: the query to select objects + """ + + self.check('kill', query) + objects = self.server.list(query, show=set(('a',))) + errs = Reporter() + with self.server.conf: + for obj in objects: + if 'a' not in obj: + errs.error(obj['id'], 'not an account') + continue + try: + self.server.kill(obj['a']) + except NotConnectedAccountError: + errs.error(obj['id'], 'account is not connected') + else: + errs.success(obj['id'], 'account killed') + + return errs.get_dict() + + @listed + def rights(self, query): + """ Get the rights of selected accounts. + + :param query: the query to select objects + """ + + self.check('rights', query) + objects = self.server.list(query, show=set(('a',))) + rules = {} + for obj in objects: + if 'a' in obj: + rules[obj['a']] = self.server.conf.show(obj['a'])['rights'] + else: + raise BadObjectError('All objects must have the "a" tag.') + + return rules + + @listed + def addright(self, query, tql, method=None, allow=True, index=None): + """ Add a right rule to the selected accounts. + + :param query: the query to select objects + :param tql: the TQL of the right rule + :param method: the method of the right rule + :param allow: target = allow if True, else False + :param index: the index of the rule in list (can be negative to index + from the end, if the index is out of range, the rule is added to + the end. + """ + + self.check('addright', query) + objects = self.server.list(query, show=set(('a',))) + errs = Reporter() + with self.server.conf: + for obj in objects: + if 'a' not in obj: + errs.error(obj['id'], 'not an account') + continue + try: + self.server.conf.add_right(obj['a'], tql, method, + allow, index) + except self.server.conf.UnknownAccount: + errs.error(obj['id'], 'unknown account') + else: + errs.success(obj['id'], 'right rule added') + + return errs.get_dict() + + @listed + def delright(self, query, index): + """ Remove a right rule from the selected objects. + + :param query: the query to select objects + :param index: the index of the right rule to remove + """ + + self.check('delright', query) + objects = self.server.list(query, show=set(('a',))) + errs = Reporter() + with self.server.conf: + for obj in objects: + if 'a' not in obj: + errs.error(obj['id'], 'not an account') + continue + try: + self.server.conf.remove_right(obj['a'], index) + except self.server.conf.UnknownAccount: + errs.error(obj['id'], 'unknown account') + except self.server.conf.OutOfRangeIndexError: + errs.error(obj['id'], 'index out of range') + else: + errs.success(obj['id'], 'right rule deleted') + + return errs.get_dict() + + @listed + def execute(self, query, command): + """ Execute command on matched objects (must be roles hv or host). + + :param query: the tql query to select objects. + :param command: the command to execute on each object + :return: a dict where key is the id of a selected object, and the value + a tuple (errcode, message) where errcode is (success|error|warn) and + message an error message or the output of the command in case of + success. + """ + + self.check('execute', query) + objects = self.server.list(query, show=('r',)) + errs = Reporter() + for obj in objects: + if obj['r'] not in ('hv', 'host'): + errs.error(obj['id'], 'bad role') + continue + try: + objcon = self.server.get_connection(obj['id']) + except KeyError: + errs.error(obj['id'], 'node not connected') + else: + returned = objcon.connection.call('execute_command', command) + errs.success(obj['id'], 'command executed', output=returned) + + return errs.get_dict() + + @listed + def shutdown(self, query, reboot=True, gracefull=True): + """ Execute a shutdown on selected objects (must be roles hv or host). + + :param query: the tql query to select objects. + :param reboot: reboot the host instead of just shut it off + :param gracefull: properly shutdown the host + :return: a dict where key is the id of a selected object, and the value + a tuple (errcode, message) where errcode is (success|error|warn) and + message an error message. + """ + + self.check('execute', query) + objects = self.server.list(query, show=set(('r',))) + errs = Reporter() + for obj in objects: + if obj['r'] not in ('hv', 'host'): + errs.error(obj['id'], 'bad role') + continue + try: + objcon = self.server.get_connection(obj['id']) + except KeyError: + errs.error(obj['id'], 'node not connected') + else: + try: + objcon.connection.call('node_shutdown', + reboot, gracefull) + except RpcError as err: + errs.error(obj['id'], '%s (exc: %s)' % (err.message, + err.exception)) + else: + errs.success(obj['id'], 'ok') + + return errs.get_dict() + + @listed + def jobs(self, query, show_done=True, show_running=True): + """ Return all jobs. + + :param show_done: show done jobs + :param show_running: show running jobs + """ + + if query: + raise NotImplementedError('Tql in jobs is not yet supported.') + props = ('id', 'author', 'created', 'ended', + 'duration', 'done', 'title', 'status') + jobs = [] + for job in self.server.jobs.iterjobs(show_done, show_running): + + jobs.append(job.export(props)) + + return {'objects': jobs, 'order': props} + + @listed + def cancel(self, jobid): + """ Cancel a job. + + :param jobid: the id of the job to cancel. + """ + + self.server.jobs.cancel(jobid) + + @listed + def jobspurge(self): + """ Purge all done jobs from the job list. + """ + self.server.jobs.purge() + + @listed + def electiontypes(self): + return Elector.ALGO_BY_TYPES + + @listed + def election(self, query_vm, query_dest, mtype='cold', algo='fair', **kwargs): + """ Consult the server for the migration of specified vm on + an hypervisor pool. + + :param query_vm: the tql query to select VMs to migrate + :param query_dest: the tql query to select destination hypervisors + candidates + :param mtype: type of migration + :param algo: algo used for distribution + """ + elector = Elector(self.server, query_vm, query_dest, self.client.login) + return elector.election(mtype, algo, **kwargs) + + @listed + def migrate(self, migration_plan): + """ Launch the provided migration plan. + + :param migration_plan: the plan of the migration. + :return: a standard error report + """ + errs = Reporter() + for migration in migration_plan: + # Check if the migration type is know: + if migration['type'] in MIGRATION_TYPES: + mtype = MIGRATION_TYPES[migration['type']] + else: + errmsg = '%r unknown migration type' % migration['type'] + errs.error(migration['sid'], errmsg) + continue + + self.server.objects.update(ids=(migration['sid'],)) + vm = self.server.objects.get_by_id(migration['sid']) + + # Construct the migration properties: + migration_properties = { + 'author': self.client.login, + 'vm_name': vm['h'], + 'hv_source': vm['p'], + 'hv_dest': migration['did'] + } + + # Create the job: + self.server.jobs.create(mtype, **migration_properties) + errs.success(migration['sid'], 'migration launched') + + return errs.get_dict() + + @listed + def clone(self, tql_vm, tql_dest, name): + """ Create and launch a clone job. + + :param tql_vm: a tql matching one vm object (the cloned vm) + :param tql_dest: a tql matching one hypervisor object (the destination + hypervisor) + :param name: the new name of the VM + """ + + vm = self.server.list(tql_vm, show=('r', 'h', 'p')) + + if len(vm) != 1: + raise CloneError('VM Tql must select ONE vm') + elif vm[0]['r'] != 'vm': + raise CloneError('Destination Tql must select a vm') + else: + vm = vm[0] + + dest = self.server.list(tql_dest, show=('r',)) + if len(dest) != 1: + raise CloneError('Destination Tql must select ONE hypervisor') + elif dest[0]['r'] != 'hv': + raise CloneError('Destination Tql must select an hypervisor') + else: + dest = dest[0] + + self.server.jobs.create('clone', **{'vm_name': vm['h'], + 'new_vm_name': name, + 'hv_source': vm['p'], + 'hv_dest': dest['id'], + 'author': self.client.login}) + + +class CliClient(Client): + + """ A cli client connected to the cc-server. + """ + + ROLE = 'cli' + RPC_HANDLER = CliHandler + + +Client.register_client_class(CliClient) diff --git a/ccserver/clients/client.py b/ccserver/clients/client.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/ccserver/clients/client.py @@ -0,0 +1 @@ + diff --git a/ccserver/clients/host.py b/ccserver/clients/host.py new file mode 100644 index 0000000..038f654 --- /dev/null +++ b/ccserver/clients/host.py @@ -0,0 +1,11 @@ +from ccserver.clients import Client + +class HostClient(Client): + + """ A host client connected to the cc-server. + """ + + ROLE = 'host' + + +Client.register_client_class(HostClient) diff --git a/ccserver/clients/hv.py b/ccserver/clients/hv.py new file mode 100644 index 0000000..131ce4a --- /dev/null +++ b/ccserver/clients/hv.py @@ -0,0 +1,184 @@ +from ccserver.handlers import listed +from ccserver.clients import Client, RegisteredCCHandler + +from cloudcontrol.common.tql.db.object import TqlObject +from cloudcontrol.common.tql.db.tag import StaticTag, CallbackTag + +from functools import partial + + +class HypervisorHandler(RegisteredCCHandler): + """ Handler binded to an hv client. + """ + + @listed + def register(self, obj_id, role): + ''' + Register an object managed by the calling node. + + .. note: + the obj_id argument passed to this handler is the object id of the + registered object (not the fully qualified id, the server will + preprend the id by "node_id." itself). + + :param obj_id: the id of the object to register + :param role: the role of the object to register + ''' + self.client.register(obj_id, role) + + @listed + def unregister(self, obj_id): + ''' + Unregister an object managed by the calling node. + + .. note: + the obj_id argument passed to this handler is the object id of the + unregistered object (not the fully qualified id, the server will + preprend the id by "node_id." itself). + + :param obj_id: the id of the object to unregister + ''' + self.client.unregister(obj_id) + + @listed + def sub_tags_register(self, obj_id, name, ttl=None, value=None): + """ Register a new remote tag for a child of the client. + + :param obj_id: child name + :param name: name of the tag to register + :param ttl: TTL of the tag if applicable (None = no TTL, the tag will + never expire) + :param value: value of the tag + """ + self.client.sub_tags_register(obj_id, name, ttl, value) + + @listed + def sub_tags_unregister(self, obj_id, name): + """ + Unregister a remote tag for a child of the client. + + :param obj_id: child name + :param name: name of the tag to unregister + """ + self.client.sub_tags_unregister(obj_id, name) + + @listed + def sub_tags_drop(self, obj_id, name): + """ Drop the cached value of a remote tag for a child of the client. + + :param obj_id: child name + :param name: name of the tag to drop + """ + self.client.sub_tags_drop(obj_id, name) + + @listed + def sub_tags_update(self, obj_id, name, value=None, ttl=None): + """ Update a remote tag for a child of the client. + + :param obj_id: child name + :param name: name of the tag to update + :param value: new value of the tag + :param ttl: new ttl of the tag + """ + self.client.sub_tags_update(obj_id, name, value, ttl) + + +class HvClient(Client): + + """ A hv client connected to the cc-server. + """ + + ROLE = 'hv' + RPC_HANDLER = HypervisorHandler + + def __init__(self, *args, **kwargs): + super(HvClient, self).__init__(*args, **kwargs) + self._children = {} + + # + # Children specific methods: + # + + def shutdown(self): + # Unregister all children: + for child in self._children.copy(): + self.unregister(child) + super(HvClient, self).shutdown() + + def get_child_remote_tags(self, obj_id, tag): + return self.conn.call('sub_tags', obj_id, (tag,))[tag] + + def register(self, obj_id, role): + """ Register a new child. + """ + child = '%s.%s' % (self.login, obj_id) + + # Register the children in the tags database: + obj = TqlObject(child) + obj.register(StaticTag('r', role)) + obj.register(StaticTag('p', self.login)) + self._server.db.register(obj) + self._children[obj_id] = obj + + def unregister(self, obj_id): + """ Unregister a child. + """ + child = '%s.%s' % (self.login, obj_id) + del self._children[obj_id] + # Unregister the children from the tags database: + self._server.db.unregister(child) + + def vm_action(self, action, vms): + return self.conn.call(action, vms) + + def sub_tags_register(self, obj_id, name, ttl=None, value=None): + """ Register a new remote tag for a child of the client. + + :param obj_id: child name + :param name: name of the tag to register + :param ttl: TTL of the tag if applicable (None = no TTL, the tag will + never expire) + :param value: value of the tag + """ + + callback = partial(self.get_child_remote_tags, obj_id, name) + tag = CallbackTag(name, callback, ttl=ttl) + self._children[obj_id].register(tag) + + def sub_tags_unregister(self, obj_id, name): + """ + Unregister a remote tag for a child of the client. + + :param obj_id: child name + :param name: name of the tag to unregister + """ + + self._children[obj_id].unregister(name) + + def sub_tags_drop(self, obj_id, name): + """ Drop the cached value of a remote tag for a child of the client. + + :param obj_id: child name + :param name: name of the tag to drop + """ + tag = self._children[obj_id].get(name) + if tag is not None: + tag.invalidate() + + def sub_tags_update(self, obj_id, name, value=None, ttl=None): + """ Update a remote tag for a child of the client. + + :param obj_id: child name + :param name: name of the tag to update + :param value: new value of the tag + :param ttl: new ttl of the tag + """ + tag = self._children[obj_id].get(name) + if tag is not None: + if value is not None: + tag.set_value(value) + if ttl is not None: + tag.ttl = ttl + + +Client.register_client_class(HvClient) diff --git a/ccserver/clients/spv.py b/ccserver/clients/spv.py new file mode 100644 index 0000000..295e694 --- /dev/null +++ b/ccserver/clients/spv.py @@ -0,0 +1,33 @@ +import logging + +from ccserver.clients import Client, RegisteredCCHandler +from ccserver.handlers import listed +from ccserver.utils import OrderedSet + + +class SpvHandler(RegisteredCCHandler): + """ Handler binded to 'spv' role. + """ + + @listed + def list(self, query): + """ List all objects registered on this instance. + + :param query: the query to select objects to show + """ + + logging.debug('Executed list function with query %s', query) + objects = self.server.list(query) + return {'objects': objects} + + +class SpvClient(Client): + + """ A spv client connected to the cc-server. + """ + + ROLE = 'spv' + RPC_HANDLER = SpvHandler + + +Client.register_client_class(SpvClient) diff --git a/ccserver/db.py b/ccserver/db.py new file mode 100644 index 0000000..4c7471f --- /dev/null +++ b/ccserver/db.py @@ -0,0 +1,60 @@ +""" This module contains some cc-server specific helpers for the CloudControl + tags database provided in cc-commons. +""" + +from cloudcontrol.common.tql.db.tag import BaseTag +from cloudcontrol.common.tql.db.object import TqlObject + + +class SObject(TqlObject): + + """ A TQL object with features specific to cc-server. + """ + + def register(self, tag, override=False): + """ Register a tag on this object (or override). + """ + if override: + self._tags[tag.name] = OverrideTag(tag.name, tag, self._tags.get(tag.name)) + else: + return super(SObject, self).register(tag) + + + def unregister(self, name, override=False): + """ Unregister a tag on this object (or remove override). + """ + if override: + if isinstance(self._tags.get(name), OverrideTag): + if self._tags[name].replaced is not None: + self._tags[name] = self._tags[name].replaced + else: + super(SObject, self).unregister(name) + else: + if isinstance(self._tags.get(name), OverrideTag): + self._tags[name].replaced = None + else: + super(SObject, self).unregister(name) + + def is_overriding(self, name): + """ Return True if a tag is overriding another one for the name. + + If the tag is not found, False is returned. + """ + return isinstance(self._tags.get(name), OverrideTag) + + +class OverrideTag(BaseTag): + + """ A tag to override another one. + """ + + def __init__(self, name, override, replaced=None): + super(OverrideTag, self).__init__(name) + self.override = override + self.replaced = replaced + + def get_value(self): + return self.override.get_value() + + def set_value(self, value): + return self.override.set_value(value) diff --git a/ccserver/handlers.py b/ccserver/handlers.py index 17a76ea..4a21a72 100644 --- a/ccserver/handlers.py +++ b/ccserver/handlers.py @@ -5,20 +5,18 @@ from __future__ import absolute_import import inspect import logging -from sjrpc.utils import RpcHandler, pass_connection, pass_rpc + +from sjrpc.utils import RpcHandler from sjrpc.core import RpcError + from ccserver.orderedset import OrderedSet from ccserver.conf import CCConf from ccserver.exceptions import (AlreadyRegistered, AuthenticationError, RightError, ReservedTagError, BadObjectError, BadRoleError, NotConnectedAccountError, CloneError) -from ccserver.election import Elector -from ccserver import __version__ - -MIGRATION_TYPES = {'cold': 'cold_migrate', - 'hot': 'hot_migrate',} +from ccserver import __version__ def listed(func): @@ -52,12 +50,19 @@ class Reporter(object): class CCHandler(RpcHandler): - ''' - Base class for handlers of CloudControl server. - ''' + """ Base class for handlers of CloudControl server. + + This class provide the following features: + + - functions can be used to get all functions decorated with the listed + decorator + - version can be used to get the current cc-server version + """ - def __init__(self, server): - self._server = server + def __init__(self, client): + self.client = client + self.server = client.server # Shortcut to server + self.conf = client.server.conf # Shortcut to configuration def __getitem__(self, name): if name.startswith('_'): @@ -97,903 +102,3 @@ class CCHandler(RpcHandler): :return: the version ''' return __version__ - - -class OnlineCCHandler(CCHandler): - - @pass_connection - def on_disconnect(self, conn): - client = self._server.search_client_by_connection(conn) - logging.info('Client %s disconnected', client.login) - self._server.unregister(conn) - - def _check(self, conn, method, tql=None): - client = self._server.search_client_by_connection(conn) - allow = self._server.check(client.login, method, tql) - if not allow: - raise RightError('You are not allowed to do this action.') - - -class SpvHandler(OnlineCCHandler): - ''' - Handler binded to 'spv' role. - ''' - - @listed - def list(self, query): - ''' - List all objects registered on this instance. - - :param query: the query to select objects to show - ''' - - logging.debug('Executed list function with query %s', query) - objects, tags = self._server.list(query, return_toshow=True) - order = OrderedSet(['id']) - if tags is not None: - order |= OrderedSet(tags) - return {'objects': objects, 'order': list(order)} - - -class HypervisorHandler(OnlineCCHandler): - ''' - Handler binded to 'hv' role. - ''' - - @listed - @pass_connection - def register(self, conn, obj_id, role): - ''' - Register an object managed by the calling node. - - .. note: - the obj_id argument passed to this handler is the object id of the - registered object (not the fully qualified id, the server will - preprend the id by "node_id." itself). - - :param obj_id: the id of the object to register - :param role: the role of the object to register - ''' - - client = self._server.search_client_by_connection(conn) - self._server.sub_register(client.login, obj_id, role) - - @listed - @pass_connection - def unregister(self, conn, obj_id): - ''' - Unregister an object managed by the calling node. - - .. note: - the obj_id argument passed to this handler is the object id of the - unregistered object (not the fully qualified id, the server will - preprend the id by "node_id." itself). - - :param obj_id: the id of the object to unregister - ''' - - client = self._server.search_client_by_connection(conn) - self._server.sub_unregister(client.login, obj_id) - - @listed - @pass_connection - def tags_register(self, conn, name, ttl=None, value=None): - ''' - Register a new tag on the calling node. - - :param name: name of the tag to register - :param ttl: ttl of the tag (or None if not applicable) - :param value: value to fill the tag (optionnal) - ''' - client = self._server.search_client_by_connection(conn) - self._server.tags_register(client.login, name, ttl, value) - - @listed - @pass_connection - def tags_unregister(self, conn, name): - ''' - Unregister a tag on the calling node. - - :param name: name of the tag to unregister - ''' - client = self._server.search_client_by_connection(conn) - self._server.tags_unregister(client.login, name) - - @listed - @pass_connection - def tags_drop(self, conn, name): - ''' - Drop the tag value of the specified tag on the calling node. - - :param name: name of the tag to drop - ''' - client = self._server.search_client_by_connection(conn) - self._server.tags_drop(client.login, name) - - - @listed - @pass_connection - def tags_update(self, conn, name, value, ttl=None): - ''' - Update the value of the specified tag on the calling node. - - :param name: name of the tag to update - :param value: new tag value - :param ttl: new ttl value - ''' - client = self._server.search_client_by_connection(conn) - self._server.tags_update(client.login, name, value, ttl) - - -class CliHandler(OnlineCCHandler): - ''' - Handler binded to 'cli' role. - - Summary of methods: - - ================ ================================ ============= - Method name Description Right(s) - ================ ================================ ============= - list list objects list - start start a vm start - stop stop a vm stop - destroy destroy a vm destroy - pause suspend a vm pause - resume resume a paused vm resume - passwd change password of accounts passwd - addaccount add a new account addaccount - copyaccount copy an account addaccount - addtag add a tag to accounts addtag - deltag remove a tag from accounts deltag - tags show tags of accounts tags - delaccount delete an account delaccount - close close an account close - declose declose an account declose - kill kill a connected account kill - rights show rights of accounts rights - addright add right rules to accounts addright - delright remove right rules from accounts delright - execute execute remote command on hosts execute - shutdown shutdown a connected client shutdown - jobs show jobs jobs - cancel cancel a running job cancel - jobspurge remove done jobs from jobs list jobspurge - dbstats show stats from cache OPEN FOR ALL - ================ ================================ ============= - ''' - - @listed - @pass_connection - def list(self, conn, query): - ''' - List all objects registered on this instance. - - :param query: the query to select objects to show - ''' - - #self._check(conn, 'list', query) - logging.debug('Executed list function with query %s', query) - objects = self._server.list(query) - order = OrderedSet(['id']) - #if tags is not None: - # order |= OrderedSet(tags) - return {'objects': objects, 'order': list(order)} - - def _vm_action(self, query, method, *args, **kwargs): - vms = self._server.list(query, show=set(('r', 'h'))) - hypervisors = list(self._server.iter_connected_role('hv')) - errs = Reporter() - for hv in hypervisors: - vm_to_start = [] - for vm in vms: - if vm['r'] != 'vm': - errs.error(vm['id'], 'not a vm') - elif vm['id'].split('.')[0] == hv.login: - vm_to_start.append(vm['h']) - errs.success(vm['id'], 'ok') - if vm_to_start: - hv.connection.call(method, vm_to_start, *args, **kwargs) - return errs.get_dict() - - @listed - @pass_connection - def start(self, conn, query): - self._check(conn, 'start', query) - return self._vm_action(query, 'vm_start') - - @listed - @pass_connection - def stop(self, conn, query): - self._check(conn, 'stop', query) - return self._vm_action(query, 'vm_stop', force=False) - - @listed - @pass_connection - def destroy(self, conn, query): - self._check(conn, 'destroy', query) - return self._vm_action(query, 'vm_stop', force=True) - - @listed - @pass_connection - def pause(self, conn, query): - self._check(conn, 'pause', query) - return self._vm_action(query, 'vm_suspend') - - @listed - @pass_connection - def resume(self, conn, query): - self._check(conn, 'resume', query) - return self._vm_action(query, 'vm_resume') - - @listed - @pass_connection - def undefine(self, conn, query, delete_storage=True): - ''' - Undefine selected virtual machines. - - :param query: the tql query to select objects. - :param delete_storage: delete storage of vm. - :return: a dict where key is the id of a selected object, and the value - a tuple (errcode, message) where errcode is (success|error|warn) and - message an error message or the output of the command in case of - success. - ''' - - self._check(conn, 'undefine', query) - - #FIXME: When tag globbing will be implemented, the list of tags to - # show will be: r, p, h, disk* - # I ask "all tags" pending implementation. - objects = self._server.list(query, show=set(('*',))) - errs = Reporter() - for obj in objects: - if obj['r'] != 'vm': - errs.error(obj['id'], 'bad role') - continue - try: - hvcon = self._server.get_connection(obj['p']) - except KeyError: - errs.error(obj['id'], 'hypervisor not connected') - else: - if delete_storage: - for disk in obj.get('disk', '').split(): - pool = obj.get('disk%s_pool' % disk) - name = obj.get('disk%s_vol' % disk) - hvcon.proxy.vol_delete(pool, name) - hvcon.proxy.vm_undefine(obj['h']) - errs.success(obj['id'], 'vm undefined') - - return errs.get_dict() - - @listed - @pass_connection - def passwd(self, conn, query, password, method='ssha'): - ''' - Define a new password for selected user. - - :param query: the query to select the objects to change - :param password: the password to set (None to remove password) - :param method: the hash method (sha, ssha, md5, smd5 or plain) - :return: a standard report output - ''' - - self._check(conn, 'passwd', query) - objects = self._server.list(query, show=set(('a',))) - errs = Reporter() - with self._server.conf: - for obj in objects: - if 'a' not in obj: - errs.error(obj['id'], 'not an account') - continue - - self._server.conf.set_password(obj['a'], password, method) - errs.success(obj['id'], 'password updated') - - return errs.get_dict() - - @listed - @pass_connection - def addaccount(self, conn, login, role, password=None): - ''' - Create a new account with specified login. - - :param login: the login of the new account - :param role: the role of the new account - :param password: the password of the new account (None = not set) - ''' - - self._check(conn, 'addaccount') - - if role in WelcomeHandler.ROLES: - self._server.conf.create_account(login, role, password) - else: - raise BadRoleError('%r is not a legal role.' % role) - - @listed - @pass_connection - def copyaccount(self, conn, copy_login, login, password=None): - ''' - Create a new account with specified login. - - :param copy_login: the login of the account to copy - :param login: the login of the new account - :param password: the password of the new account (default None) - ''' - - self._check(conn, 'addaccount') - self._server.conf.copy_account(copy_login, login, password) - - @listed - @pass_connection - def addtag(self, conn, query, tag_name, tag_value): - ''' - Add a tag to the accounts which match the specified query. - - :param query: the query to select objects - :param tag_name: the name of the tag to add - :param tag_value: the value of the tag - ''' - - self._check(conn, 'addtag', query) - - if tag_name in self._server.RESERVED_TAGS: - raise ReservedTagError('Tag %r is read-only' % tag_name) - - objects = self._server.list(query, show=set(('a',))) - errs = Reporter() - with self._server.conf: - for obj in objects: - if 'a' not in obj: - errs.error(obj['id'], 'not an account') - continue - tags = self._server.conf.show(obj['a'])['tags'] - if tag_name in tags: - errs.warn(obj['id'], 'tag already exists, changed from %s' - ' to %s' % (tags[tag_name], tag_value)) - else: - errs.success(obj['id'], 'tag created') - self._server.conf.add_tag(obj['a'], tag_name, tag_value) - - return errs.get_dict() - - @listed - @pass_connection - def deltag(self, conn, query, tag_name): - ''' - Remove a tag of the selected accounts. - - :param query: the query to select objects - :param tag_name: the name of the tag to remove - ''' - - self._check(conn, 'deltag', query) - - if tag_name in self._server.RESERVED_TAGS: - raise ReservedTagError('Tag %r is read-only' % tag_name) - - objects = self._server.list(query, show=set(('a',))) - errs = Reporter() - with self._server.conf: - for obj in objects: - if 'a' not in obj: - errs.error(obj['id'], 'not an account') - continue - tags = self._server.conf.show(obj['a'])['tags'] - if tag_name in tags: - errs.success(obj['id'], 'tag deleted') - else: - errs.warn(obj['id'], 'unknown tag') - self._server.conf.remove_tag(obj['a'], tag_name) - - return errs.get_dict() - - @listed - @pass_connection - def tags(self, conn, query): - ''' - Return all static tags attached to the selected accounts. - - :param query: the query to select objects - ''' - - self._check(conn, 'tags', query) - objects = self._server.list(query, show=set(('a',))) - tags = [] - for obj in objects: - o = {'id': obj['id']} - if 'a' in obj: - otags = self._server.conf.show(obj['a'])['tags'] - otags.update({'id': obj['id']}) - o.update(otags) - tags.append(o) - return {'objects': tags, 'order': ['id']} - - @listed - @pass_connection - def delaccount(self, conn, query): - ''' - Delete the accounts selected by query. - - :param query: the query to select objects - ''' - - self._check(conn, 'delaccount', query) - objects = self._server.list(query, show=set(('a',))) - client = self._server.search_client_by_connection(conn) - errs = Reporter() - with self._server.conf: - for obj in objects: - if 'a' not in obj: - errs.error(obj['id'], 'not an account') - continue - try: - self._server.conf.remove_account(obj['a']) - except CCConf.UnknownAccount: - errs.error(obj['id'], 'unknown account') - else: - errs.success(obj['id'], 'account deleted') - - self._server.jobs.create('kill', author=client.login, - account=obj['a'], gracetime=1) - - return errs.get_dict() - - @listed - @pass_connection - def close(self, conn, query): - ''' - Close selected account an account without deleting them. - - :param query: the query to select objects - ''' - - self._check(conn, 'close', query) - objects = self._server.list(query, show=set(('a',))) - client = self._server.search_client_by_connection(conn) - errs = Reporter() - with self._server.conf: - for obj in objects: - if 'a' not in obj: - errs.error(obj['id'], 'not an account') - continue - tags = self._server.conf.show(obj['a'])['tags'] - if 'close' in tags: - errs.warn(obj['id'], 'account already closed') - continue - - errs.success(obj['id'], 'closed') - self._server.conf.add_tag(obj['a'], 'close', 'yes') - - self._server.jobs.create('kill', author=client.login, - account=obj['a'], gracetime=1) - - return errs.get_dict() - - @listed - @pass_connection - def declose(self, conn, query): - ''' - Re-open selected closed accounts. - - :param query: the query to select objects - ''' - - self._check(conn, 'declose', query) - objects = self._server.list(query, show=set(('a',))) - errs = Reporter() - with self._server.conf: - for obj in objects: - if 'a' not in obj: - errs.error(obj['id'], 'not an account') - continue - tags = self._server.conf.show(obj['a'])['tags'] - if 'close' in tags: - errs.success(obj['id'], 'account declosed') - else: - errs.warn(obj['id'], 'account not closed') - self._server.conf.remove_tag(obj['a'], 'close') - - return errs.get_dict() - - @listed - @pass_connection - def kill(self, conn, query): - ''' - Disconnect all connected accounts selected by query. - - :param query: the query to select objects - ''' - - self._check(conn, 'kill', query) - objects = self._server.list(query, show=set(('a',))) - errs = Reporter() - with self._server.conf: - for obj in objects: - if 'a' not in obj: - errs.error(obj['id'], 'not an account') - continue - try: - self._server.kill(obj['a']) - except NotConnectedAccountError: - errs.error(obj['id'], 'account is not connected') - else: - errs.success(obj['id'], 'account killed') - - return errs.get_dict() - - @listed - @pass_connection - def rights(self, conn, query): - ''' - Get the rights of selected accounts. - - :param query: the query to select objects - ''' - - self._check(conn, 'rights', query) - objects = self._server.list(query, show=set(('a',))) - rules = {} - for obj in objects: - if 'a' in obj: - rules[obj['a']] = self._server.conf.show(obj['a'])['rights'] - else: - raise BadObjectError('All objects must have the "a" tag.') - - return rules - - @listed - @pass_connection - def addright(self, conn, query, tql, method=None, allow=True, index=None): - ''' - Add a right rule to the selected accounts. - - :param query: the query to select objects - :param tql: the TQL of the right rule - :param method: the method of the right rule - :param allow: target = allow if True, else False - :param index: the index of the rule in list (can be negative to index - from the end, if the index is out of range, the rule is added to - the end. - ''' - - self._check(conn, 'addright', query) - objects = self._server.list(query, show=set(('a',))) - errs = Reporter() - with self._server.conf: - for obj in objects: - if 'a' not in obj: - errs.error(obj['id'], 'not an account') - continue - try: - self._server.conf.add_right(obj['a'], tql, method, - allow, index) - except self._server.conf.UnknownAccount: - errs.error(obj['id'], 'unknown account') - else: - errs.success(obj['id'], 'right rule added') - - return errs.get_dict() - - @listed - @pass_connection - def delright(self, conn, query, index): - ''' - Remove a right rule from the selected objects. - - :param query: the query to select objects - :param index: the index of the right rule to remove - ''' - - self._check(conn, 'delright', query) - objects = self._server.list(query, show=set(('a',))) - errs = Reporter() - with self._server.conf: - for obj in objects: - if 'a' not in obj: - errs.error(obj['id'], 'not an account') - continue - try: - self._server.conf.remove_right(obj['a'], index) - except self._server.conf.UnknownAccount: - errs.error(obj['id'], 'unknown account') - except self._server.conf.OutOfRangeIndexError: - errs.error(obj['id'], 'index out of range') - else: - errs.success(obj['id'], 'right rule deleted') - - return errs.get_dict() - - @listed - @pass_connection - def execute(self, conn, query, command): - ''' - Execute command on matched objects (must be roles hv or host). - - :param query: the tql query to select objects. - :param command: the command to execute on each object - :return: a dict where key is the id of a selected object, and the value - a tuple (errcode, message) where errcode is (success|error|warn) and - message an error message or the output of the command in case of - success. - ''' - - self._check(conn, 'execute', query) - objects = self._server.list(query, show=('r',)) - errs = Reporter() - for obj in objects: - if obj['r'] not in ('hv', 'host'): - errs.error(obj['id'], 'bad role') - continue - try: - objcon = self._server.get_connection(obj['id']) - except KeyError: - errs.error(obj['id'], 'node not connected') - else: - returned = objcon.connection.call('execute_command', command) - errs.success(obj['id'], 'command executed', output=returned) - - return errs.get_dict() - - @listed - @pass_connection - def shutdown(self, conn, query, reboot=True, gracefull=True): - ''' - Execute a shutdown on selected objects (must be roles hv or host). - - :param query: the tql query to select objects. - :param reboot: reboot the host instead of just shut it off - :param gracefull: properly shutdown the host - :return: a dict where key is the id of a selected object, and the value - a tuple (errcode, message) where errcode is (success|error|warn) and - message an error message. - ''' - - self._check(conn, 'execute', query) - objects = self._server.list(query, show=set(('r',))) - errs = Reporter() - for obj in objects: - if obj['r'] not in ('hv', 'host'): - errs.error(obj['id'], 'bad role') - continue - try: - objcon = self._server.get_connection(obj['id']) - except KeyError: - errs.error(obj['id'], 'node not connected') - else: - try: - objcon.connection.call('node_shutdown', - reboot, gracefull) - except RpcError as err: - errs.error(obj['id'], '%s (exc: %s)' % (err.message, - err.exception)) - else: - errs.success(obj['id'], 'ok') - - return errs.get_dict() - - @listed - def jobs(self, query, show_done=True, show_running=True): - ''' - Return all jobs. - - :param show_done: show done jobs - :param show_running: show running jobs - ''' - - if query: - raise NotImplementedError('Tql in jobs is not yet supported.') - props = ('id', 'author', 'created', 'ended', - 'duration', 'done', 'title', 'status') - jobs = [] - for job in self._server.jobs.iterjobs(show_done, show_running): - - jobs.append(job.export(props)) - - return {'objects': jobs, 'order': props} - - @listed - def cancel(self, jobid): - ''' - Cancel a job. - - :param jobid: the id of the job to cancel. - ''' - - self._server.jobs.cancel(jobid) - - @listed - def jobspurge(self): - ''' - Purge all done jobs from the job list. - ''' - - self._server.jobs.purge() - - @listed - def electiontypes(self): - return Elector.ALGO_BY_TYPES - - @listed - @pass_connection - def election(self, conn, query_vm, query_dest, mtype='cold', algo='fair', - **kwargs): - ''' - Consult the server for the migration of specified vm on an hypervisor - pool. - - :param query_vm: the tql query to select VMs to migrate - :param query_dest: the tql query to select destination hypervisors - candidates - :param mtype: type of migration - :param algo: algo used for distribution - ''' - client = self._server.search_client_by_connection(conn) - elector = Elector(self._server, query_vm, query_dest, client.login) - return elector.election(mtype, algo, **kwargs) - - @listed - @pass_connection - def migrate(self, conn, migration_plan): - ''' - Launch the provided migration plan. - - :param migration_plan: the plan of the migration. - :return: a standard error report - ''' - client = self._server.search_client_by_connection(conn) - errs = Reporter() - for migration in migration_plan: - # Check if the migration type is know: - if migration['type'] in MIGRATION_TYPES: - mtype = MIGRATION_TYPES[migration['type']] - else: - errmsg = '%r unknown migration type' % migration['type'] - errs.error(migration['sid'], errmsg) - continue - - self._server.objects.update(ids=(migration['sid'],)) - vm = self._server.objects.get_by_id(migration['sid']) - - # Construct the migration properties: - migration_properties = { - 'author': client.login, - 'vm_name': vm['h'], - 'hv_source': vm['p'], - 'hv_dest': migration['did'] - } - - # Create the job: - self._server.jobs.create(mtype, **migration_properties) - errs.success(migration['sid'], 'migration launched') - - return errs.get_dict() - - @listed - @pass_connection - def clone(self, conn, tql_vm, tql_dest, name): - ''' - Create and launch a clone job. - - :param tql_vm: a tql matching one vm object (the cloned vm) - :param tql_dest: a tql matching one hypervisor object (the destination - hypervisor) - :param name: the new name of the VM - ''' - - client = self._server.search_client_by_connection(conn) - - vm = self._server.list(tql_vm, show=('r', 'h', 'p')) - - if len(vm) != 1: - raise CloneError('VM Tql must select ONE vm') - elif vm[0]['r'] != 'vm': - raise CloneError('Destination Tql must select a vm') - else: - vm = vm[0] - - dest = self._server.list(tql_dest, show=('r',)) - if len(dest) != 1: - raise CloneError('Destination Tql must select ONE hypervisor') - elif dest[0]['r'] != 'hv': - raise CloneError('Destination Tql must select an hypervisor') - else: - dest = dest[0] - - self._server.jobs.create('clone', **{'vm_name': vm['h'], - 'new_vm_name': name, - 'hv_source': vm['p'], - 'hv_dest': dest['id'], - 'author': client.login}) - - -class BootstrapHandler(OnlineCCHandler): - - ''' - Handler for bootstrap clients. - ''' - - pass - - -class WelcomeHandler(CCHandler): - ''' - Default handler used on client connections of the server. - - :cvar ROLES: role name/handler mapping - ''' - - ROLES = { - 'cli': CliHandler, - 'hv': HypervisorHandler, - 'host': HypervisorHandler, - 'spv': SpvHandler, - 'bootstrap': BootstrapHandler, - } - - @listed - @pass_connection - @pass_rpc - def authentify(self, conn, rpc, login, password): - ''' - Authenticate the client. - - :param login: the login of the account - :param password: the password of the account (not hashed) - :return: role affected to the client on success - :thrown AuthenticationError: when authentication fail - ''' - - logmsg = 'Authentication error from %s: ' - conf = self._server.conf - with conf: - try: - role = self._server.conf.authentify(login, password) - except CCConf.UnknownAccount: - raise AuthenticationError('Unknown login') - else: - if 'close' in self._server.conf.show(login)['tags']: - logging.warning(logmsg + 'account closed (%s)', - conn.getpeername(), login) - raise AuthenticationError('Account is closed') - - if role is None: - logging.warning(logmsg + 'bad login/password (%s)', - conn.getpeername(), login) - raise AuthenticationError('Bad login/password') - else: - if role not in WelcomeHandler.ROLES: - logging.warning(logmsg + 'bad role in account config (%s)', - conn.getpeername(), login) - raise BadRoleError('%r is not a legal role' % role) - - create_object = False - - # If authentication is a success, try to register the client: - if role == 'bootstrap': - # Set a bootstrap id for the object: - login = '%s.%s' % (login, conn.get_fd()) - # Real role of the node will be host: - role = 'host' - create_object = True - - # Try to register the client: - for _ in xrange(5): - try: - self._server.register(login, role, conn, create_object) - except AlreadyRegistered: - if role == 'cli': - try: - self._server.kill(login) - except NotConnectedAccountError: - pass - else: - break - else: - logging.warning(logmsg + 'already connected (%s)', - conn.getpeername(), login) - raise AuthenticationError('Already connected') - - rpc.set_handler(WelcomeHandler.ROLES.get(role)(self._server)) - logging.info('Authentication success from %s with login %s', - conn.getpeername(), login) - return role -- GitLab