Commit b636de90 authored by Anael Beutot's avatar Anael Beutot
Browse files

Updated to use the new sjrpc version.

parent e7de02be
Loading
Loading
Loading
Loading
+14 −11
Original line number Diff line number Diff line
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from optparse import OptionParser
from sjrpc.core import RpcError
from ccnode.ccnode import CCNode
import ConfigParser
import sys, os, atexit
import logging
import logging.handlers
import threading
from daemon import DaemonContext
from time import sleep
import logging.handlers
import sys
import os
import atexit
import ConfigParser
from math import exp
from time import sleep
from optparse import OptionParser

from daemon import DaemonContext
from sjrpc.core import RpcError

from ccnode.ccnode import CCNode
from ccnode import __version__

MAX_AUTH_TIMEOUT = 30
@@ -34,7 +37,7 @@ def authentication(node, suicide, login, password):
    Node authentication thread
    '''
    timeout = 1
    while not suicide.is_set() and node.get_manager().is_running():
    while not suicide.is_set() and node.get_rpc():
        logging.debug('Sending authentication request')
        if node.authentify(login, password):
            logging.error('Authentication suscessfull')
@@ -129,7 +132,7 @@ def run_node(options):
    except Exception as err:
        logging.error('run_node: `%s` -> `%s`', repr(err), err)
        try:
            node.get_manager().shutdown()
            node.get_rpc().shutdown()
        except:
            pass
    finally:
+13 −13
Original line number Diff line number Diff line
@@ -3,11 +3,13 @@
import logging
from threading import Timer, Lock, Event
from time import sleep
from sjrpc.client import SimpleRpcClient

from sjrpc.utils import ConnectionProxy
from sjrpc.core import RpcError
from sjrpc.core import RpcError, RpcConnection
# FIXME relative import
import handlers


class CCNode(object):
    '''
    Handle node initialization, connection to server, and authentication
@@ -23,15 +25,15 @@ class CCNode(object):
        self._is_xen = force_xen # hugly
        self._exec_cmd = exec_cmd # hugly
        self._handler = None
        self._manager = SimpleRpcClient.from_addr(server, port, enable_ssl=True,
                                                default_handler=self._handler)
        self._server = ConnectionProxy(self._manager)
        self._rpc = RpcConnection.from_addr_ssl(server, port,
                                                handler=self._handler)
        self._server = ConnectionProxy(self._rpc)
    
    def run(self):
        '''
        '''
        try:
            self._manager.run()
            self._rpc.run()
        except:
            raise
        finally:
@@ -67,7 +69,7 @@ class CCNode(object):
                logging.warning('Bad role affected by server: %s' % role)
                raise Exception()
            
            self._manager.all_connections().pop().set_handler(self._handler)
            self._rpc.get_protocol(0).set_handler(self._handler)
            self._scheduler_rearm()
            return True
    
@@ -91,10 +93,8 @@ class CCNode(object):
        '''
        return self._server
    
    def get_manager(self):
        '''
        '''
        return self._manager
    def get_rpc(self):
        return self._rpc

    def get_handler(self):
        '''
+0 −32
Original line number Diff line number Diff line
@@ -6,7 +6,6 @@ from lvm import LVM
from time import sleep
from sjrpc.core import RpcError
from sjrpc.utils import RpcHandler
from sjrpc.utils import pure
from errors import HostError, HypervisorError
from common import LocalHost
from jobs import ReceiveFileJob, SendFileJob, DrbdCopyJob, TCPTunnelJob
@@ -321,7 +320,6 @@ class NodeHandler(RpcHandler):
    #   Tag query
    ##################################
    
    @pure
    def get_tags(self, tags=None, noresolve_tags=None):
        '''
        '''
@@ -443,7 +441,6 @@ class NodeHandler(RpcHandler):
                debug('error while listing sub node tags `%r`:`%s`', err, err)
        return result
    
    @pure
    def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
        '''
        '''
@@ -549,7 +546,6 @@ class NodeHandler(RpcHandler):
    #   Host control
    ##################################
    
    @pure
    def node_shutdown(self, reboot=True, gracefull=True):
        '''
        '''
@@ -567,7 +563,6 @@ class NodeHandler(RpcHandler):
            info('unable to proceed, this feature is not available')
            raise NotImplementedError('host handler has no method `%s`' %method)
    
    @pure
    def execute_command(self, command):
        '''
        '''
@@ -583,7 +578,6 @@ class NodeHandler(RpcHandler):
    ###
    # Definition and migration of domains
    
    @pure
    def vm_define(self, data, format='xml'):
        '''
        '''
@@ -592,7 +586,6 @@ class NodeHandler(RpcHandler):
        debug('new VM has name `%s`', name)
        return name
    
    @pure
    def vm_undefine(self, name):
        '''
        '''
@@ -601,7 +594,6 @@ class NodeHandler(RpcHandler):
        vm.undefine()
        debug('succesfully undefined VM `%s`', name)
    
    @pure
    def vm_export(self, name, format='xml'):
        '''
        '''
@@ -612,7 +604,6 @@ class NodeHandler(RpcHandler):
    ###
    # Life cycle control
    
    @pure
    def vm_stop(self, vm_names=None, force=False):
        '''
        '''
@@ -635,7 +626,6 @@ class NodeHandler(RpcHandler):
            except:
                pass
    
    @pure
    def vm_start(self, vm_names=None):
        '''
        '''
@@ -654,7 +644,6 @@ class NodeHandler(RpcHandler):
            except:
                pass
    
    @pure
    def vm_suspend(self, vm_names=None):
        '''
        '''
@@ -673,7 +662,6 @@ class NodeHandler(RpcHandler):
            except:
                pass
    
    @pure
    def vm_resume(self, vm_names=None):
        '''
        '''
@@ -695,7 +683,6 @@ class NodeHandler(RpcHandler):
    ###
    # Migration and helpers
    
    @pure
    def tun_setup(self, local=True):
        '''
        '''
@@ -714,7 +701,6 @@ class NodeHandler(RpcHandler):
            'port' : job.tunnel_get_server_port(),
            }
    
    @pure
    def tun_connect(self, res, remote_res, remote_ip):
        '''
        '''
@@ -725,7 +711,6 @@ class NodeHandler(RpcHandler):
        # connect to the remote endpoint
        job.tunnel_connect((remote_ip, remote_res['port']))
    
    @pure
    def tun_connect_hv(self, res, migration=False):
        '''
        '''
@@ -745,7 +730,6 @@ class NodeHandler(RpcHandler):
        # connect to libvirt/hv
        job.tunnel_connect(endpoint)

    @pure
    def tun_destroy(self, res):
        '''
        '''
@@ -754,7 +738,6 @@ class NodeHandler(RpcHandler):
        # stop the tunnel
        jobmgr.cancel(res['jid'])
    
    @pure
    def vm_migrate_tunneled(self, name, tun_res, migtun_res):
        '''
        '''
@@ -776,7 +759,6 @@ class NodeHandler(RpcHandler):
    ###
    # Volume management
    
    @pure
    def vol_create(self, pool, name, size):
        '''
        '''
@@ -787,7 +769,6 @@ class NodeHandler(RpcHandler):
        else:
            raise NotImplementedError('host handler has no storage support')
    
    @pure
    def vol_delete(self, pool, name, wipe=False):
        '''
        '''
@@ -800,14 +781,12 @@ class NodeHandler(RpcHandler):
        else:
            raise NotImplementedError('host handler has no storage support')
    
    @pure
    def vol_copy(self, pool, name, dest_pool, dest_name):
        raise NotImplementedError()
    
    ###
    # Basic network copy
    
    @pure
    def vol_export(self, pool, name, raddr, rport):
        '''
        '''
@@ -835,7 +814,6 @@ class NodeHandler(RpcHandler):
        else:
            raise NotImplementedError('host handler has no storage support')
    
    @pure
    def vol_import(self, pool, name):
        '''
        '''
@@ -858,7 +836,6 @@ class NodeHandler(RpcHandler):
        else:
            raise NotImplementedError('host handler has no storage support')
    
    @pure
    def vol_import_wait(self, jid):
        '''
        '''
@@ -877,7 +854,6 @@ class NodeHandler(RpcHandler):
            res['checksum'] = job.get_checksum()
        return res
    
    @pure
    def vol_import_cancel(self, jid):
        '''
        '''
@@ -895,7 +871,6 @@ class NodeHandler(RpcHandler):
    ###
    # Live network copy
    
    @pure
    def drbd_setup(self, pool, name):
        '''
        '''
@@ -924,7 +899,6 @@ class NodeHandler(RpcHandler):
        else:
            raise NotImplementedError('host handler has no storage support')
    
    @pure
    def drbd_connect(self, res, remote_res, remote_ip):
        '''
        Connect the local live copy engine to the designated remote host that
@@ -937,7 +911,6 @@ class NodeHandler(RpcHandler):
        # connect node
        job.drbd_connect(remote_ip, remote_res['port'])
    
    @pure
    def drbd_role(self, res, primary):
        '''
        '''
@@ -948,7 +921,6 @@ class NodeHandler(RpcHandler):
        # change role
        job.drbd_role(primary=primary)
    
    @pure
    def drbd_takeover(self, res, state):
        '''
        '''
@@ -959,7 +931,6 @@ class NodeHandler(RpcHandler):
        # hijack the VM disk DM
        job.drbd_takeover(state)
    
    @pure
    def drbd_sync_status(self, res):
        '''
        Get status information about a running sync
@@ -977,7 +948,6 @@ class NodeHandler(RpcHandler):
            'speed' : None,
            }
    
    @pure
    def drbd_shutdown(self, res):
        '''
        Close connection and destroy all ressources allocated for a live copy.
@@ -992,14 +962,12 @@ class NodeHandler(RpcHandler):
    #   Job management
    ##################################
    
    @pure
    def job_list(self):
        '''
        List all existing jobs sorted by state, with ID and type.
        '''
        return self._host_handle.jobmgr.list()
    
    @pure
    def job_log(self, jid):
        '''
        Get log messages of a given job (human friendly text string with