diff --git a/bin/cc-server b/bin/cc-server index 6ed7af5d72bd33e7bdd1fe1b89f4d85820ec0a40..6528e30492bf2dadf9bd67cc0f767480cd4350cb 100755 --- a/bin/cc-server +++ b/bin/cc-server @@ -74,7 +74,7 @@ def run_server(options): handler = logging.handlers.SysLogHandler(address='/dev/log', facility=facility) - fmt = EncodingFormatter('cc-server: %(levelname)s %(message)s') + fmt = EncodingFormatter('cc-server (%(name)s): %(levelname)s %(message)s') handler.setFormatter(fmt) logger.addHandler(handler) @@ -91,11 +91,13 @@ def run_server(options): Handler called when SIGINT is emitted. ''' - server.manager.shutdown() + server.rpc.shutdown() logging.info('Server properly exited by SIGINT') - sys.exit(0) - signal.signal(signal.SIGINT, shutdown_handler) + watcher_sigint = server.rpc.loop.signal(signal.SIGINT, shutdown_handler) + watcher_sigstop = server.rpc.loop.signal(signal.SIGTERM, shutdown_handler) + watcher_sigint.start() + watcher_sigstop.start() try: server.run() diff --git a/ccserver/ccserver.py b/ccserver/ccserver.py index d12690ac93f0b127125036614fe9dd2b64d4c77e..e32749c3d8e701cf20dc0093e663eeba0ed1ae09 100644 --- a/ccserver/ccserver.py +++ b/ccserver/ccserver.py @@ -8,11 +8,10 @@ Main class of cc-server. from __future__ import absolute_import import logging -import socket from copy import copy from fnmatch import fnmatch as glob -from sjrpc.server import SimpleSslRpcServer +from sjrpc.server import SSLRpcServer from ccserver.handlers import WelcomeHandler from ccserver.conf import CCConf @@ -45,39 +44,30 @@ class CCServer(object): # Dict containing all connected accounts, the key is the login of # the account and the value the :class:`RpcConnection` of the peer: self._connected = {} - # The interface object to the configuration directory: self.conf = CCConf(conf_dir) - # Some settings: self._maxcon = maxcon self._maxidle = maxidle - - # Create the server socket - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind((address, port)) - sock.listen(CCServer.LISTEN_BACKLOG) - - logging.info('Server started to running') - + # SSL configuration stuff: if certfile: logging.info('SSL Certificate: %s', certfile) if keyfile: logging.info('SSL Key: %s', certfile) - logging.info('Listening on %s:%s', address, port) self.objects = ObjectsDB(self) - # Create the connection manager: - self.manager = SimpleSslRpcServer(sock, certfile=certfile, + # Create the rpc server: + logging.info('Listening on %s:%s', address, port) + self.rpc = SSLRpcServer.from_addr(address, port, certfile=certfile, keyfile=keyfile, - default_handler=WelcomeHandler(self), - on_disconnect='on_disconnect') + conn_kw=dict(handler=WelcomeHandler(self), + on_disconnect='on_disconnect')) # The jobs manager: self.jobs = JobsManager(self) + logging.info('Server started to running') def _update_accounts(self): ''' @@ -204,8 +194,8 @@ class CCServer(object): self.jobs.create('kill_oldcli', maxcon=self._maxcon, maxidle=self._maxidle, _hidden=True) - logging.debug('Running manager mainloop') - self.manager.run() + logging.debug('Running rpc mainloop') + self.rpc.run() def get_connection(self, login): ''' diff --git a/ccserver/client.py b/ccserver/client.py index 35db68f820523f742f085419761460c59e546058..b6446acd52b1340bb2fc70ea451c38c2bb6d266e 100644 --- a/ccserver/client.py +++ b/ccserver/client.py @@ -47,7 +47,7 @@ class CCClient(object): :return: uptime of the client ''' - + dt = datetime.now() - self._connection_date return dt.seconds + dt.days * 86400 @@ -57,7 +57,7 @@ class CCClient(object): :return: idle of the client ''' - + dt = datetime.now() - self._last_action return dt.seconds + dt.days * 86400 @@ -77,7 +77,7 @@ class CCClient(object): Shutdown the connection to the client. ''' - self.server.manager.shutdown_client(self.connection.get_fd()) + self.server.rpc.unregister(self.connection, shutdown=True) def get_tags(self): ''' diff --git a/ccserver/handlers.py b/ccserver/handlers.py index 82c9b193f095b99edb89022ec7c9f1651480253e..fff28dec74bd9b1732467c10926543bd91006c21 100644 --- a/ccserver/handlers.py +++ b/ccserver/handlers.py @@ -5,7 +5,7 @@ from __future__ import absolute_import import inspect import logging -from sjrpc.utils import RpcHandler +from sjrpc.utils import RpcHandler, pass_connection, pass_rpc from sjrpc.core import RpcError from ccserver.orderedset import OrderedSet from ccserver.conf import CCConf @@ -68,7 +68,7 @@ class CCHandler(RpcHandler): return super(CCHandler, self).__getitem__(name) @listed - def functions(self, conn): + def functions(self): ''' Show the list of functions available to the peer. @@ -90,7 +90,7 @@ class CCHandler(RpcHandler): return cmd_list @listed - def version(self, conn): + def version(self): ''' Return the current server version. @@ -101,6 +101,7 @@ class CCHandler(RpcHandler): class OnlineCCHandler(CCHandler): + @pass_connection def on_disconnect(self, conn): client = self._server.search_client_by_connection(conn) logging.info('Client %s disconnected', client.login) @@ -119,7 +120,7 @@ class SpvHandler(OnlineCCHandler): ''' @listed - def list(self, conn, query): + def list(self, query): ''' List all objects registered on this instance. @@ -140,6 +141,7 @@ class HypervisorHandler(OnlineCCHandler): ''' @listed + @pass_connection def register(self, conn, obj_id, role): ''' Register an object managed by the calling node. @@ -157,6 +159,7 @@ class HypervisorHandler(OnlineCCHandler): self._server.sub_register(client.login, obj_id, role) @listed + @pass_connection def unregister(self, conn, obj_id): ''' Unregister an object managed by the calling node. @@ -211,6 +214,7 @@ class CliHandler(OnlineCCHandler): ''' @listed + @pass_connection def list(self, conn, query): ''' List all objects registered on this instance. @@ -243,31 +247,37 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def start(self, conn, query): self._check(conn, 'start', query) return self._vm_action(query, 'vm_start') @listed + @pass_connection def stop(self, conn, query): self._check(conn, 'stop', query) return self._vm_action(query, 'vm_stop', force=False) @listed + @pass_connection def destroy(self, conn, query): self._check(conn, 'destroy', query) return self._vm_action(query, 'vm_stop', force=True) @listed + @pass_connection def pause(self, conn, query): self._check(conn, 'pause', query) return self._vm_action(query, 'vm_suspend') @listed + @pass_connection def resume(self, conn, query): self._check(conn, 'resume', query) return self._vm_action(query, 'vm_resume') @listed + @pass_connection def undefine(self, conn, query, delete_storage=True): ''' Undefine selected virtual machines. @@ -307,6 +317,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def passwd(self, conn, query, password, method='ssha'): ''' Define a new password for selected user. @@ -332,6 +343,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def addaccount(self, conn, login, role, password=None): ''' Create a new account with specified login. @@ -349,6 +361,7 @@ class CliHandler(OnlineCCHandler): raise BadRoleError('%r is not a legal role.' % role) @listed + @pass_connection def copyaccount(self, conn, copy_login, login, password=None): ''' Create a new account with specified login. @@ -362,6 +375,7 @@ class CliHandler(OnlineCCHandler): self._server.conf.copy_account(copy_login, login, password) @listed + @pass_connection def addtag(self, conn, query, tag_name, tag_value): ''' Add a tag to the accounts which match the specified query. @@ -394,6 +408,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def deltag(self, conn, query, tag_name): ''' Remove a tag of the selected accounts. @@ -424,6 +439,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def tags(self, conn, query): ''' Return all static tags attached to the selected accounts. @@ -444,6 +460,7 @@ class CliHandler(OnlineCCHandler): return {'objects': tags, 'order': ['id']} @listed + @pass_connection def delaccount(self, conn, query): ''' Delete the accounts selected by query. @@ -473,6 +490,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def close(self, conn, query): ''' Close selected account an account without deleting them. @@ -503,6 +521,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def declose(self, conn, query): ''' Re-open selected closed accounts. @@ -528,6 +547,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def kill(self, conn, query): ''' Disconnect all connected accounts selected by query. @@ -553,6 +573,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def rights(self, conn, query): ''' Get the rights of selected accounts. @@ -572,6 +593,7 @@ class CliHandler(OnlineCCHandler): return rules @listed + @pass_connection def addright(self, conn, query, tql, method=None, allow=True, index=None): ''' Add a right rule to the selected accounts. @@ -604,6 +626,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def delright(self, conn, query, index): ''' Remove a right rule from the selected objects. @@ -632,6 +655,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def execute(self, conn, query, command): ''' Execute command on matched objects (must be roles hv or host). @@ -662,6 +686,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def shutdown(self, conn, query, reboot=True, gracefull=True): ''' Execute a shutdown on selected objects (must be roles hv or host). @@ -698,7 +723,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed - def jobs(self, conn, query, show_done=True, show_running=True): + def jobs(self, query, show_done=True, show_running=True): ''' Return all jobs. @@ -718,7 +743,7 @@ class CliHandler(OnlineCCHandler): return {'objects': jobs, 'order': props} @listed - def cancel(self, conn, jobid): + def cancel(self, jobid): ''' Cancel a job. @@ -728,7 +753,7 @@ class CliHandler(OnlineCCHandler): self._server.jobs.cancel(jobid) @listed - def jobspurge(self, conn): + def jobspurge(self): ''' Purge all done jobs from the job list. ''' @@ -736,10 +761,11 @@ class CliHandler(OnlineCCHandler): self._server.jobs.purge() @listed - def electiontypes(self, conn): + def electiontypes(self): return Elector.ALGO_BY_TYPES @listed + @pass_connection def election(self, conn, query_vm, query_dest, mtype='cold', algo='fair', **kwargs): ''' @@ -757,6 +783,7 @@ class CliHandler(OnlineCCHandler): return elector.election(mtype, algo, **kwargs) @listed + @pass_connection def migrate(self, conn, migration_plan): ''' Launch the provided migration plan. @@ -793,6 +820,7 @@ class CliHandler(OnlineCCHandler): return errs.get_dict() @listed + @pass_connection def clone(self, conn, tql_vm, tql_dest, name): ''' Create and launch a clone job. @@ -829,7 +857,7 @@ class CliHandler(OnlineCCHandler): 'author': client.login}) @listed - def dbstats(self, conn): + def dbstats(self): ''' Return statistics about current database status. ''' @@ -851,17 +879,19 @@ class WelcomeHandler(CCHandler): :cvar ROLES: role name/handler mapping ''' - + ROLES = { 'cli': CliHandler, 'hv': HypervisorHandler, 'host': HypervisorHandler, 'spv': SpvHandler, - 'bootstrap': BootstrapHandler, + 'bootstrap': BootstrapHandler, } @listed - def authentify(self, conn, login, password): + @pass_connection + @pass_rpc + def authentify(self, conn, rpc, login, password): ''' Authenticate the client. @@ -893,7 +923,7 @@ class WelcomeHandler(CCHandler): logging.warning(logmsg + 'bad role in account config (%s)', conn.getpeername(), login) raise BadRoleError('%r is not a legal role' % role) - + create_object = False # If authentication is a success, try to register the client: @@ -921,7 +951,7 @@ class WelcomeHandler(CCHandler): conn.getpeername(), login) raise AuthenticationError('Already connected') - conn.set_handler(WelcomeHandler.ROLES.get(role)(self._server)) + rpc.set_handler(WelcomeHandler.ROLES.get(role)(self._server)) logging.info('Authentication success from %s with login %s', conn.getpeername(), login) return role diff --git a/ccserver/objectsdb.py b/ccserver/objectsdb.py index 2cbb204e59f66c5908f224d30c231bb502c8eb65..ddd07bd7bbd732ca12d696bba56905ffb9bd8173 100644 --- a/ccserver/objectsdb.py +++ b/ccserver/objectsdb.py @@ -11,6 +11,8 @@ from datetime import datetime, timedelta import logging from threading import RLock +from sjrpc.core.async import AsyncWatcher + from ccserver.tql import TqlObject from ccserver.orderedset import OrderedSet from ccserver.exceptions import AlreadyRegistered, UnknownObjectError @@ -49,9 +51,9 @@ class ObjectsDB(object): # The access lock of the object database: self._lock = RLock() - # The list of received message while update: - self._msgs = [] self._requested = {} # object id -> set of tags + # Asynchronous watcher: + self._asyncwatcher = None # Total amount of query updates: self._nb_queries = 0 @@ -87,8 +89,8 @@ class ObjectsDB(object): by :meth:`update` method. ''' - self._msgs = [] - + self._asyncwatcher = AsyncWatcher() + # Cast input to the right type: if tags is not None: tags = set(tags) @@ -167,9 +169,7 @@ class ObjectsDB(object): ''' Process the responses received from clients. ''' - - msgs = self._server.manager.wait(frozenset(self._msgs), timeout=4) - now = datetime.now() + # Avoid error if all tags are asked (not optimized case because we # can't known the real list of tags: @@ -177,8 +177,9 @@ class ObjectsDB(object): tags = () received = set() - - for msg in msgs: + + now = datetime.now() + for msg in self._asyncwatcher.iter(timeout=5): obj = msg['data'] assert isinstance(obj, TqlObject), 'data obj is not TqlObject' @@ -300,14 +301,13 @@ class ObjectsDB(object): if '__parent' in obj: subid = obj['id'].partition('.')[2] - msg_id = client.connection.async_call('sub_tags', subid, tags, - tags_novalue, _data=obj) + self._asyncwatcher.register(client.connection, 'sub_tags', subid, + tags, tags_novalue, _data=obj) else: - msg_id = client.connection.async_call('get_tags', tags, - tags_novalue, _data=obj) + self._asyncwatcher.register(client.connection, 'get_tags', + tags, tags_novalue, _data=obj) self._requested[obj['id']] = tags - self._msgs.append(msg_id) def get_by_id(self, oid): ''' diff --git a/debian/control b/debian/control index 12ab0584f756a5374952fe48abf21c12d8574d33..b2906c380c4e5c9edd0bdb7a8a9982447d4e0472 100644 --- a/debian/control +++ b/debian/control @@ -8,7 +8,7 @@ Standards-Version: 3.9.1 Package: cc-server Architecture: all -Depends: ${misc:Depends}, ${python:Depends}, python-sjrpc (>= 9), python-daemon +Depends: ${misc:Depends}, ${python:Depends}, python-sjrpc (>= 14), python-daemon XB-Python-Version: ${python:Versions} Description: CloudControl server This package provides the server of CloudControl.