Commit f809bd8e authored by Antoine Millet's avatar Antoine Millet
Browse files

Updated cc-server to use the new sjrpc 14+ API

parent cb719383
Loading
Loading
Loading
Loading
+6 −4
Original line number Diff line number Diff line
@@ -74,7 +74,7 @@ def run_server(options):
        handler = logging.handlers.SysLogHandler(address='/dev/log',
                                                 facility=facility)

    fmt = EncodingFormatter('cc-server: %(levelname)s %(message)s')
    fmt = EncodingFormatter('cc-server (%(name)s): %(levelname)s %(message)s')
    handler.setFormatter(fmt)
    logger.addHandler(handler)

@@ -91,11 +91,13 @@ def run_server(options):
        Handler called when SIGINT is emitted.
        '''

        server.manager.shutdown()
        server.rpc.shutdown()
        logging.info('Server properly exited by SIGINT')
        sys.exit(0)

    signal.signal(signal.SIGINT, shutdown_handler)
    watcher_sigint = server.rpc.loop.signal(signal.SIGINT, shutdown_handler)
    watcher_sigstop = server.rpc.loop.signal(signal.SIGTERM, shutdown_handler)
    watcher_sigint.start()
    watcher_sigstop.start()

    try:
        server.run()
+10 −20
Original line number Diff line number Diff line
@@ -8,11 +8,10 @@ Main class of cc-server.
from __future__ import absolute_import

import logging
import socket
from copy import copy
from fnmatch import fnmatch as glob

from sjrpc.server import SimpleSslRpcServer
from sjrpc.server import SSLRpcServer

from ccserver.handlers import WelcomeHandler
from ccserver.conf import CCConf
@@ -45,39 +44,30 @@ class CCServer(object):
        # Dict containing all connected accounts, the key is the login of
        # the account and the value the :class:`RpcConnection` of the peer:
        self._connected = {}

        # The interface object to the configuration directory:
        self.conf = CCConf(conf_dir)

        # Some settings:
        self._maxcon = maxcon
        self._maxidle = maxidle

        # Create the server socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((address, port))
        sock.listen(CCServer.LISTEN_BACKLOG)

        logging.info('Server started to running')

        # SSL configuration stuff:
        if certfile:
            logging.info('SSL Certificate: %s', certfile)
        if keyfile:
            logging.info('SSL Key: %s', certfile)

        logging.info('Listening on %s:%s', address, port)

        self.objects = ObjectsDB(self)

        # Create the connection manager:
        self.manager = SimpleSslRpcServer(sock, certfile=certfile,
        # Create the rpc server:
        logging.info('Listening on %s:%s', address, port)
        self.rpc = SSLRpcServer.from_addr(address, port, certfile=certfile,
                                          keyfile=keyfile,
                                          default_handler=WelcomeHandler(self),
                                          on_disconnect='on_disconnect')
                                          conn_kw=dict(handler=WelcomeHandler(self),
                                                       on_disconnect='on_disconnect'))

        # The jobs manager:
        self.jobs = JobsManager(self)
        logging.info('Server started to running')

    def _update_accounts(self):
        '''
@@ -204,8 +194,8 @@ class CCServer(object):
        self.jobs.create('kill_oldcli', maxcon=self._maxcon,
                         maxidle=self._maxidle, _hidden=True)

        logging.debug('Running manager mainloop')
        self.manager.run()
        logging.debug('Running rpc mainloop')
        self.rpc.run()

    def get_connection(self, login):
        '''
+3 −3
Original line number Diff line number Diff line
@@ -77,7 +77,7 @@ class CCClient(object):
        Shutdown the connection to the client.
        '''

        self.server.manager.shutdown_client(self.connection.get_fd())
        self.server.rpc.unregister(self.connection, shutdown=True)

    def get_tags(self):
        '''
+44 −14
Original line number Diff line number Diff line
@@ -5,7 +5,7 @@ from __future__ import absolute_import

import inspect
import logging
from sjrpc.utils import RpcHandler
from sjrpc.utils import RpcHandler, pass_connection, pass_rpc
from sjrpc.core import RpcError
from ccserver.orderedset import OrderedSet
from ccserver.conf import CCConf
@@ -68,7 +68,7 @@ class CCHandler(RpcHandler):
            return super(CCHandler, self).__getitem__(name)

    @listed
    def functions(self, conn):
    def functions(self):
        '''
        Show the list of functions available to the peer.

@@ -90,7 +90,7 @@ class CCHandler(RpcHandler):
        return cmd_list

    @listed
    def version(self, conn):
    def version(self):
        '''
        Return the current server version.

@@ -101,6 +101,7 @@ class CCHandler(RpcHandler):

class OnlineCCHandler(CCHandler):

    @pass_connection
    def on_disconnect(self, conn):
        client = self._server.search_client_by_connection(conn)
        logging.info('Client %s disconnected', client.login)
@@ -119,7 +120,7 @@ class SpvHandler(OnlineCCHandler):
    '''

    @listed
    def list(self, conn, query):
    def list(self, query):
        '''
        List all objects registered on this instance.

@@ -140,6 +141,7 @@ class HypervisorHandler(OnlineCCHandler):
    '''

    @listed
    @pass_connection
    def register(self, conn, obj_id, role):
        '''
        Register an object managed by the calling node.
@@ -157,6 +159,7 @@ class HypervisorHandler(OnlineCCHandler):
        self._server.sub_register(client.login, obj_id, role)

    @listed
    @pass_connection
    def unregister(self, conn, obj_id):
        '''
        Unregister an object managed by the calling node.
@@ -211,6 +214,7 @@ class CliHandler(OnlineCCHandler):
    '''

    @listed
    @pass_connection
    def list(self, conn, query):
        '''
        List all objects registered on this instance.
@@ -243,31 +247,37 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def start(self, conn, query):
        self._check(conn, 'start', query)
        return self._vm_action(query, 'vm_start')

    @listed
    @pass_connection
    def stop(self, conn, query):
        self._check(conn, 'stop', query)
        return self._vm_action(query, 'vm_stop', force=False)

    @listed
    @pass_connection
    def destroy(self, conn, query):
        self._check(conn, 'destroy', query)
        return self._vm_action(query, 'vm_stop', force=True)

    @listed
    @pass_connection
    def pause(self, conn, query):
        self._check(conn, 'pause', query)
        return self._vm_action(query, 'vm_suspend')

    @listed
    @pass_connection
    def resume(self, conn, query):
        self._check(conn, 'resume', query)
        return self._vm_action(query, 'vm_resume')

    @listed
    @pass_connection
    def undefine(self, conn, query, delete_storage=True):
        '''
        Undefine selected virtual machines.
@@ -307,6 +317,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def passwd(self, conn, query, password, method='ssha'):
        '''
        Define a new password for selected user.
@@ -332,6 +343,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def addaccount(self, conn, login, role, password=None):
        '''
        Create a new account with specified login.
@@ -349,6 +361,7 @@ class CliHandler(OnlineCCHandler):
            raise BadRoleError('%r is not a legal role.' % role)

    @listed
    @pass_connection
    def copyaccount(self, conn, copy_login, login, password=None):
        '''
        Create a new account with specified login.
@@ -362,6 +375,7 @@ class CliHandler(OnlineCCHandler):
        self._server.conf.copy_account(copy_login, login, password)

    @listed
    @pass_connection
    def addtag(self, conn, query, tag_name, tag_value):
        '''
        Add a tag to the accounts which match the specified query.
@@ -394,6 +408,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def deltag(self, conn, query, tag_name):
        '''
        Remove a tag of the selected accounts.
@@ -424,6 +439,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def tags(self, conn, query):
        '''
        Return all static tags attached to the selected accounts.
@@ -444,6 +460,7 @@ class CliHandler(OnlineCCHandler):
        return {'objects': tags, 'order': ['id']}

    @listed
    @pass_connection
    def delaccount(self, conn, query):
        '''
        Delete the accounts selected by query.
@@ -473,6 +490,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def close(self, conn, query):
        '''
        Close selected account an account without deleting them.
@@ -503,6 +521,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def declose(self, conn, query):
        '''
        Re-open selected closed accounts.
@@ -528,6 +547,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def kill(self, conn, query):
        '''
        Disconnect all connected accounts selected by query.
@@ -553,6 +573,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def rights(self, conn, query):
        '''
        Get the rights of selected accounts.
@@ -572,6 +593,7 @@ class CliHandler(OnlineCCHandler):
        return rules

    @listed
    @pass_connection
    def addright(self, conn, query, tql, method=None, allow=True, index=None):
        '''
        Add a right rule to the selected accounts.
@@ -604,6 +626,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def delright(self, conn, query, index):
        '''
        Remove a right rule from the selected objects.
@@ -632,6 +655,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def execute(self, conn, query, command):
        '''
        Execute command on matched objects (must be roles hv or host).
@@ -662,6 +686,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def shutdown(self, conn, query, reboot=True, gracefull=True):
        '''
        Execute a shutdown on selected objects (must be roles hv or host).
@@ -698,7 +723,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    def jobs(self, conn, query, show_done=True, show_running=True):
    def jobs(self, query, show_done=True, show_running=True):
        '''
        Return all jobs.

@@ -718,7 +743,7 @@ class CliHandler(OnlineCCHandler):
        return {'objects': jobs, 'order': props}

    @listed
    def cancel(self, conn, jobid):
    def cancel(self, jobid):
        '''
        Cancel a job.

@@ -728,7 +753,7 @@ class CliHandler(OnlineCCHandler):
        self._server.jobs.cancel(jobid)

    @listed
    def jobspurge(self, conn):
    def jobspurge(self):
        '''
        Purge all done jobs from the job list.
        '''
@@ -736,10 +761,11 @@ class CliHandler(OnlineCCHandler):
        self._server.jobs.purge()

    @listed
    def electiontypes(self, conn):
    def electiontypes(self):
        return Elector.ALGO_BY_TYPES

    @listed
    @pass_connection
    def election(self, conn, query_vm, query_dest, mtype='cold', algo='fair',
                 **kwargs):
        '''
@@ -757,6 +783,7 @@ class CliHandler(OnlineCCHandler):
        return elector.election(mtype, algo, **kwargs)

    @listed
    @pass_connection
    def migrate(self, conn, migration_plan):
        '''
        Launch the provided migration plan.
@@ -793,6 +820,7 @@ class CliHandler(OnlineCCHandler):
        return errs.get_dict()

    @listed
    @pass_connection
    def clone(self, conn, tql_vm, tql_dest, name):
        '''
        Create and launch a clone job.
@@ -829,7 +857,7 @@ class CliHandler(OnlineCCHandler):
                                           'author': client.login})

    @listed
    def dbstats(self, conn):
    def dbstats(self):
        '''
        Return statistics about current database status.
        '''
@@ -861,7 +889,9 @@ class WelcomeHandler(CCHandler):
    }

    @listed
    def authentify(self, conn, login, password):
    @pass_connection
    @pass_rpc
    def authentify(self, conn, rpc, login, password):
        '''
        Authenticate the client.

@@ -921,7 +951,7 @@ class WelcomeHandler(CCHandler):
                                conn.getpeername(), login)
                raise AuthenticationError('Already connected')

            conn.set_handler(WelcomeHandler.ROLES.get(role)(self._server))
            rpc.set_handler(WelcomeHandler.ROLES.get(role)(self._server))
            logging.info('Authentication success from %s with login %s',
                         conn.getpeername(), login)
            return role
+14 −14
Original line number Diff line number Diff line
@@ -11,6 +11,8 @@ from datetime import datetime, timedelta
import logging
from threading import RLock

from sjrpc.core.async import AsyncWatcher

from ccserver.tql import TqlObject
from ccserver.orderedset import OrderedSet
from ccserver.exceptions import AlreadyRegistered, UnknownObjectError
@@ -49,9 +51,9 @@ class ObjectsDB(object):
        # The access lock of the object database:
        self._lock = RLock()

        # The list of received message while update:
        self._msgs = []
        self._requested = {} # object id -> set of tags
        # Asynchronous watcher:
        self._asyncwatcher = None

        # Total amount of query updates:
        self._nb_queries = 0
@@ -87,7 +89,7 @@ class ObjectsDB(object):
            by :meth:`update` method.
        '''

        self._msgs = []
        self._asyncwatcher = AsyncWatcher()

        # Cast input to the right type:
        if tags is not None:
@@ -168,8 +170,6 @@ class ObjectsDB(object):
        Process the responses received from clients.
        '''

        msgs = self._server.manager.wait(frozenset(self._msgs), timeout=4)
        now = datetime.now()

        # Avoid error if all tags are asked (not optimized case because we
        # can't known the real list of tags:
@@ -178,7 +178,8 @@ class ObjectsDB(object):

        received = set()

        for msg in msgs:
        now = datetime.now()
        for msg in self._asyncwatcher.iter(timeout=5):
            obj = msg['data']
            assert isinstance(obj, TqlObject), 'data obj is not TqlObject'

@@ -300,14 +301,13 @@ class ObjectsDB(object):
        
        if '__parent' in obj:
            subid = obj['id'].partition('.')[2]
            msg_id = client.connection.async_call('sub_tags', subid, tags,
                                                  tags_novalue, _data=obj)
            self._asyncwatcher.register(client.connection, 'sub_tags', subid,
                                        tags, tags_novalue, _data=obj)
        else:
            msg_id = client.connection.async_call('get_tags', tags,
                                                  tags_novalue, _data=obj)
            self._asyncwatcher.register(client.connection, 'get_tags',
                                        tags, tags_novalue, _data=obj)

        self._requested[obj['id']] = tags
        self._msgs.append(msg_id)

    def get_by_id(self, oid):
        '''
Loading