Skip to content
Snippets Groups Projects
Commit ab5b4d74 authored by Anael Beutot's avatar Anael Beutot
Browse files

Base squeleton for new cc-node.

Initialization, re-authentication...
parent 7e731b86
No related branches found
No related tags found
No related merge requests found
Showing with 376 additions and 1493 deletions
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Cloud-Control Node
# Copyright (C) 2011 SmartJog [http://www.smartjog.com]
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# * 2011-06: Thibault VINCENT <thibault.vincent@smartjog.com>
#
from ccnode.launcher import Launcher
if __name__ == '__main__':
Launcher().main()
import sys
import signal
import atexit
import time
import logging
import logging.config
from optparse import OptionParser
from os.path import isfile
from daemon import DaemonContext
from ccnode import __version__
from ccnode.node import Node
from ccnode.utils import signal_
from ccnode.config import NodeConfigParser
logger = logging.getLogger('ccnode')
DEFAULT_CONFIG_FILE = '/etc/cc-node.conf'
# command line arguments...
oparser = OptionParser(version='%%prog %s' % __version__)
oparser.add_option('-d', '--daemonize', default=False, action='store_true',
help=u'run as daemon and write pid file')
oparser.add_option('-c', '--config', metavar='FILE',
default=DEFAULT_CONFIG_FILE,
help=u'configuration file ABSOLUTE path (default: %default)')
oparser.add_option('-p', '--pidfile', metavar='FILE',
help=u'pid file path for daemon mode')
options, args = oparser.parse_args()
# check argument configuration
if options.daemonize and not options.pidfile:
sys.exit(u'Please supply a pid file...')
if not isfile(options.config):
sys.exit(u'Please supply a valid path to configuration file...')
# globals
need_reload = False
node = None
config_file = NodeConfigParser(options.config)
# configure logging
logging.config.fileConfig(options.config)
@signal_(signal.SIGUSR1)
def reload_config(signum, frame):
global need_reload
need_reload = True
@signal_(signal.SIGTERM)
def exit_node(signum=None, frame=None):
# clean all current jobs
# TODO
if node is not None:
# clean node
node.shutdown()
# exit
logger.info(u'Exiting node...')
sys.exit()
def run_node():
global need_reload, node
try:
node = Node(
server_host=config_file.server_host,
server_port=config_file.server_port,
user_name=config_file.server_user,
user_passwd=config_file.server_passwd,
)
node.start()
while True:
if need_reload:
logger.info(u'Reloading logging configuration...')
logging.config.fileConfig(options.config)
need_reload = False
signal.pause()
except KeyboardInterrupt:
exit_node()
except Exception:
logger.exception(u'Unknown error:')
# take care of pid file if daemon
if options.daemonize:
pidfile = open(options.pidfile)
files_preserve = [pidfile]
else:
files_preserve = None
with DaemonContext(detach_process=options.daemonize,
files_preserve=files_preserve,
stderr=sys.stderr,
stdout=sys.stdout):
# reload log config
logging.config.fileConfig(options.config)
# take care of pidfile
if options.daemonize:
pidfile.write('%s' % os.getpid())
pidfile.flush()
@atexit.register
def clean_pidfile():
pidfile.seek(0)
pidfile.truncate()
pidfile.flush()
logger.debug(u'Starting node')
while True:
run_node()
logger.critical(u'Restarting node...')
time.sleep(2)
# -*- coding: utf-8 -*-
#
# Cloud-Control Node
# Copyright (C) 2011 SmartJog [http://www.smartjog.com]
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# * 2011-06: Thibault VINCENT <thibault.vincent@smartjog.com>
#
__product__ = 'Cloud-Control Node'
__version__ = '0~dev'
__product__ = 'Cloud-Control Node'
__version__ = '0~dev'
__canonical__ = 'cc-node'
__develmode__ = False
def git_version():
global __version__
import os
import sys
from subprocess import Popen, PIPE, CalledProcessError
cwd = os.getcwd()
try:
os.chdir(os.path.dirname(sys.argv[0]))
p = Popen(["git", "log", "--pretty=format:%H" ], stdout=PIPE,
stderr=open("/dev/null", "wb"))
p.wait()
if p.returncode == 0:
githash = p.stdout.readline().strip()
if len(githash) > 0:
__version__ += "-git-%s" % githash
except OSError:
pass
finally:
os.chdir(cwd)
if __version__.find("dev") != -1:
__develmode__ = True
git_version()
# -*- coding: utf-8 -*-
#
# Cloud-Control Node
# Copyright (C) 2011 SmartJog [http://www.smartjog.com]
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# * 2011-06: Thibault VINCENT <thibault.vincent@smartjog.com>
#
import logging
from ConfigParser import SafeConfigParser
from __future__ import absolute_import
import ConfigParser
from optparse import OptionParser
from . import __version__
from ccnode import __version__
ARGS_DEFAULTS = {
'daemonize' : (bool, False),
'stdout' : (bool, False),
'config' : (str, '/etc/cc-node.conf'),
'pidfile' : (str, '/var/run/cc-node.pid'),
}
FILE_DEFAULTS = {
'address' : (str, ''),
'port' : (int, 1984),
'ssl' : (bool, True),
'login' : (str, ''),
'password' : (str, ''),
'verbosity' : (int, 0),
'virtualization' : (bool, True),
'command_execution' : (bool, True),
}
SECURE_OPTIONS = [
'password',
]
logger = logging.getLogger(__name__)
class ConfigurationError(Exception):
pass
class NodeConfigParser(object):
"""ConfigParser for ccnode config file."""
def __init__(self, file_path):
config = SafeConfigParser()
config.read(file_path)
class Configuration(object):
def __init__(self, logger):
self._logger = logger
# populate default options
self._options = {}
for name, (vtype, value) in FILE_DEFAULTS.iteritems():
self._options[name] = _ConfigValue(vtype, value)
for name, (vtype, value) in ARGS_DEFAULTS.iteritems():
self._options[name] = _ConfigValue(vtype, value, locking=True)
def parse_arguments(self):
parser = OptionParser(version = '%%prog %s' % __version__)
# --daemonize
parser.add_option('-d',
'--daemonize',
default = ARGS_DEFAULTS['daemonize'][1],
action = 'store_true',
help = 'run as daemon and write pid file')
# --stdout
parser.add_option('-s',
'--stdout',
action = 'store_true',
default = ARGS_DEFAULTS['stdout'][1],
help = 'log on standard output instead of syslog'
' (when not running in daemon mode)')
# --config
parser.add_option('-c',
'--config',
default = ARGS_DEFAULTS['config'][1],
help = 'configuration file path (default: %default)')
# --pidfile
parser.add_option('-p',
'--pidfile',
default = ARGS_DEFAULTS['pidfile'][1],
help = 'pid file path for daemon mode (default: '
'%default)')
# parse command line
self._logger.info('reading command-line arguments')
options = parser.parse_args()[0]
# apply arguments to current configuration
for name, value in options.__dict__.iteritems():
if name in self._options:
self._options[name].update(value, unlock=True)
if name not in SECURE_OPTIONS:
self._logger.info(' * %s = %s' % (name,
self._options[name].value()))
def parse_file(self):
parser = ConfigParser.SafeConfigParser()
# open configuration file
try:
self._logger.info('parsing configuration file')
parser.read(self._options['config'].value())
options = dict(parser.items('node'))
except ConfigParser.Error as err:
raise ConfigurationError('failed to open or parse configuration'
' file: %r' % err)
# apply to current configuration
for name, value in options.iteritems():
if name in self._options:
self._options[name].update(value, unlock=False)
if name not in SECURE_OPTIONS:
self._logger.info(' * %s = %s' % (name,
self._options[name].value()))
def get(self, name):
if name in self._options:
return self._options[name].value()
else:
return None
def update(self, name, value):
# TODO missing concurrency protection
if name in self._options:
self._options[name].update(value, unlock=False)
class _ConfigValue(object):
def __init__(self, value_type, default_value, locking=False):
self._type = value_type
self._default = default_value
self._value = default_value
self._locking = locking
def __str__(self):
return '' if self._value is None else str(self._value)
def update(self, value, unlock=False):
if not self._locking or (self._locking and unlock):
try:
# special case for boolean conversions
if self._type is bool and not isinstance(value, bool):
# any integer != 0 means True
if isinstance(value, int):
self._value = value != 0
# a string meaning 'yes'
elif isinstance(value, str):
self._value = value.strip().lower() in ('yes',
'y', '1', 'true', 'enable',
'enabled', 'active', 'on')
# TODO rethink about a default value, or how to warn
else:
self._value = False
# or just let python do the conversion
else:
self._value = self._type(value)
except ValueError:
raise ConfigurationError('option has type `%r`, cannot'
' convert from value `%s`' % (self._type, value))
except TypeError:
raise ConfigurationError('option convertion to `%r` failed'
% (self._type))
def value(self):
return self._value
def default_value(self):
return self._default
def is_default(self):
return self._value == self._default
# ccserver settings
self.server_host = config.get('ccserver', 'host')
self.server_port = config.getint('ccserver', 'port')
self.server_user = config.get('ccserver', 'user')
self.server_passwd = config.get('ccserver', 'password')
# TODO complete
"""Exceptions classes for ccnode."""
# -*- coding: utf-8 -*-
#
# Cloud-Control Node
# Copyright (C) 2011 SmartJog [http://www.smartjog.com]
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# * 2011-06: Thibault VINCENT <thibault.vincent@smartjog.com>
#
from __future__ import absolute_import
import logging
from threading import Thread
from .logging import Logger
logger = logging.getLogger(__name__)
class JobManager(Thread, object):
def __init__(self, logger):
self._logger = logger
pass
This diff is collapsed.
# -*- coding: utf-8 -*-
#
# Cloud-Control Node
# Copyright (C) 2011 SmartJog [http://www.smartjog.com]
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# * 2011-06: Thibault VINCENT <thibault.vincent@smartjog.com>
#
from __future__ import absolute_import
from signal import signal, SIGHUP, SIGUSR1
from time import sleep
from .logging import Logger, DEBUG, INFO, ERROR
from .config import Configuration
from .kernel import Core
from . import __product__, __version__
class Launcher(object):
# pylint: disable=R0903
def __init__(self):
# initialize core components
self._logger = Logger()
self._logger.info('initializing %s version %s' % (__product__,
str(__version__)))
self._config = Configuration(self._logger)
self._core = Core(self._logger, self._config)
def _on_sighup(self, signum, frame):
# pylint: disable=W0613
self._logger.warning('SIGHUP received, shutting down core')
self._core.shutdown()
def _on_sigusr1(self, signum, frame):
# pylint: disable=W0613
self._logger.info('SIGUSR1 received, reloading configuration file and'
' logging settings')
self._reload_conf()
def _reload_conf(self):
# (re)load configuration file
self._config.parse_file()
# (re)apply logging configuration
# verbosity level
verbosity = self._config.get('verbosity')
if verbosity >= 1:
self._logger.set_level(DEBUG)
self._logger.set_debug_level(verbosity)
else:
self._logger.set_level(INFO)
# stderr/syslog output selection
std_output = self._config.get('stdout')
self._logger.set_stderr_enabled(std_output)
self._logger.set_syslog_enabled(not std_output)
def main(self):
# read command line arguments
self._config.parse_arguments()
# register hangup signal to restart core
signal(SIGHUP, self._on_sighup)
# register USR1 signal to reload configuration file
# and apply logging settings
signal(SIGUSR1, self._on_sigusr1)
# run the core untill permanent shutdown is requested
while True:
try:
# (re)load configuration file and apply logging settings
self._reload_conf()
# start core main loop
self._core.run()
sleep(0.5)
except KeyboardInterrupt:
self._logger.warning('SIGINT received, shutting down core'
' post-mortem')
self._core.shutdown()
break
except Exception:
self._logger.error('unexpected error raised by core')
self._logger.backtrace(priority=ERROR)
sleep(5)
# -*- coding: utf-8 -*-
#
# Cloud-Control Node
# Copyright (C) 2011 SmartJog [http://www.smartjog.com]
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# * 2011-06: Thibault VINCENT <thibault.vincent@smartjog.com>
#
from __future__ import absolute_import
import syslog
import socket
from sys import stderr
from threading import Lock
from traceback import format_exc
from . import __canonical__, __develmode__
DEBUG = syslog.LOG_DEBUG
INFO = syslog.LOG_INFO
WARNING = syslog.LOG_WARNING
ERROR = syslog.LOG_ERR
CRITICAL = syslog.LOG_CRIT
ALL_PRIO = (DEBUG, INFO, WARNING, ERROR, CRITICAL)
PRIO_TAGS = {
DEBUG : 'D',
INFO : 'I',
WARNING : 'W',
ERROR : 'E',
CRITICAL: 'C',
}
class Logger(object):
def __init__(self):
self._max_prio = INFO
self._max_debug = 1
self._mutex = Lock()
self._mcast_sock = None
self._mcast_dst = None
self._syslog_enabled = not __develmode__
self._stderr_enabled = __develmode__
syslog.openlog(__canonical__,
syslog.LOG_PID ^ syslog.LOG_NDELAY ^ syslog.LOG_CONS,
syslog.LOG_DAEMON)
def set_level(self, priority):
if priority not in ALL_PRIO:
raise ValueError('not an allowed logging priority')
self._max_prio = priority
def set_debug_level(self, level):
if level <= 0:
raise ValueError('debug level is a positive integer')
self._max_debug = int(level)
def set_syslog_enabled(self, state):
self._syslog_enabled = bool(state)
def set_stderr_enabled(self, state):
self._stderr_enabled = bool(state)
def mcast_enable(self, addr, port):
with self._mutex:
self._mcast_dst = (addr, port)
self._mcast_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
socket.IPPROTO_UDP)
self._mcast_sock.setsockopt(socket.IPPROTO_IP,
socket.IP_MULTICAST_TTL, 2)
def mcast_disable(self):
with self._mutex:
self._mcast_sock = None
self._mcast_dst = None
def _log(self, priority, level, message, *args):
if priority not in ALL_PRIO:
priority = CRITICAL
printable = (priority <= self._max_prio and (level is None
or (priority == DEBUG and level <= self._max_debug)))
if printable or self._mcast_sock is not None:
try:
if len(args):
message = message % args
except TypeError:
message = '[PARTIAL] %s' % message
finally:
if printable:
if self._syslog_enabled:
syslog.syslog(priority, message)
if self._stderr_enabled:
if isinstance(level, int):
my_lvl = str(level)
else:
my_lvl = ' '
stderr.write('%s[%s%s]: %s\n' % (__canonical__,
PRIO_TAGS[priority], my_lvl, message))
with self._mutex:
if self._mcast_sock is not None:
self._mcast_sock.sendto('%i %s' %
(level, message), self._mcast_dst)
def debug(self, level, message, *args):
self._log(DEBUG, level, message, *args)
def info(self, message, *args):
self._log(INFO, None, message, *args)
def warning(self, message, *args):
self._log(WARNING, None, message, *args)
def error(self, message, *args):
self._log(ERROR, None, message, *args)
def critical(self, message, *args):
self._log(CRITICAL, None, message, *args)
def backtrace(self, priority=DEBUG, level=1):
if priority != DEBUG:
level = None
for line in format_exc().splitlines():
self._log(priority, level, 'BT: %s', line)
# -*- coding: utf-8 -*-
#
# Cloud-Control Node
# Copyright (C) 2011 SmartJog [http://www.smartjog.com]
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# * 2011-06: Thibault VINCENT <thibault.vincent@smartjog.com>
#
LIST = [
'tags',
'jobs',
'execute',
'shutdown',
'sysinfo',
]
# -*- coding: utf-8 -*-
#
# Cloud-Control Node
# Copyright (C) 2011 SmartJog [http://www.smartjog.com]
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# * 2011-06: Thibault VINCENT <thibault.vincent@smartjog.com>
#
from __future__ import absolute_import
from subprocess import Popen, PIPE, STDOUT
from ..kernel import BaseModule, ModuleError, Handler, pure
class Module(BaseModule):
_handler_methods = {
'execute_command' : '_h_execute_command',
}
def init(self, manager):
self._manager = manager
self._logger = manager.get_logger()
self._config = manager.get_configuration()
# create and populate our own handler
self._handler = Handler(self._logger, 'execute')
for name, method in self._handler_methods.iteritems():
self._handler.procedure_register(name, getattr(self, method))
# register it in the core handler
self._manager.get_handler().register(self._handler)
def destroy(self):
# FIXME do something about running commands (wait? kill?)
# unregister ourselves from the core handler
self._manager.get_handler().unregister(self._handler)
@pure
def _h_execute_command(self, command):
# do nothing when execution is disabled in configuration
if self._config.get('command_execution') is not True:
raise ModuleError('command execution is disabled by configuration')
# synchronous execution
self._logger.info('executing command: %s', command)
output = None
try:
output = Popen(command, shell=True, bufsize=-1, stdin=PIPE,
stdout=PIPE, stderr=STDOUT).communicate()[0]
# command not found or related error
except OSError as err:
raise ModuleError('could not execute command: %s' % err)
# return output string, mix of stdout and stderr
else:
return output
# -*- coding: utf-8 -*-
#
# Cloud-Control Node
# Copyright (C) 2011 SmartJog [http://www.smartjog.com]
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# * 2011-06: Thibault VINCENT <thibault.vincent@smartjog.com>
#
from __future__ import absolute_import
from subprocess import Popen, PIPE, STDOUT
from ..kernel import BaseModule, ModuleError, Handler, pure
class Module(BaseModule):
_handler_methods = {
'node_shutdown' : '_h_node_shutdown',
}
def init(self, manager):
self._manager = manager
self._logger = manager.get_logger()
self._config = manager.get_configuration()
# create and populate our own handler
self._handler = Handler(self._logger, 'shutdown')
for name, method in self._handler_methods.iteritems():
self._handler.procedure_register(name, getattr(self, method))
# register it in the core handler
self._manager.get_handler().register(self._handler)
def destroy(self):
# unregister ourselves from the core handler
self._manager.get_handler().unregister(self._handler)
@pure
def _h_node_shutdown(self, reboot=True, gracefull=True):
self._logger.info('server requested shutdown of local host with'
' options reboot=`%s` gracefull=`%s`', reboot, gracefull)
# build option list for the GNU shutdown command
options = ''
if reboot is False:
options += ' -h -P'
options += '' if gracefull else ' -n'
else:
options += ' -r'
options += ' -f' if gracefull else ' -n'
options += ' 0'
# shutdown machine
try:
Popen('/sbin/shutdown %s' % options, shell=True).communicate()
# command not found or related error
except OSError as err:
raise ModuleError('could not execute shutdown: %s' % err)
import time
import logging
from threading import Thread, Lock
from sjrpc.client import SimpleRpcClient
from sjrpc.utils import ConnectionProxy, RpcHandler, pure
from ccnode import __version__
from ccnode.tags import Tag
logger = logging.getLogger(__name__)
DEFAULT_TAGS = (Tag(u'version', __version__, 1),)
class DefaultHandler(RpcHandler):
def __init__(self, *args, **kwargs):
RpcHandler.__init__(self, *args, **kwargs)
self.tags = dict((t.name, t) for t in DEFAULT_TAGS)
@pure
def get_tags(self, tags=None, noresolve_tags=None):
"""
:param iterable tags: list of tags to return
:param iterable noresolve_tags: list of tags to not return
"""
logger.debug('Tags: %s, %s' % (unicode(tags), unicode(noresolve_tags)))
tags = set(tags) - set(noresolve_tags) if tags is not None else None
if tags is None:
tags = self.tags.iterkeys()
else:
tags = tags & set(self.tags.iterkeys())
result = dict((
t, # tag name
dict(
value=self.tags[t].value,
ttl=self.tags[t].ttl,
),
) for t in tags)
logger.debug(u'Returning: %s' % unicode(result))
return result
class Node(Thread):
"""Main class for ccnode."""
def __init__(self, server_host, server_port, user_name, user_passwd):
"""
:param string server_host: hostname for cc-server
:param int server_port: port for cc-server
:param string user_name: account name for authentication to cc-server
:param string user_passwd: password for cc-server authentication
"""
Thread.__init__(self)
# settings used as read only
self.server_host = server_host
self.server_port = int(server_port)
self.user_name = user_name
self.user_passwd = user_passwd
self.daemon = True
#: proxy
self.proxy = None
#: rpc connection manager
self.manager = None
#: role returned by cc-server
self.role = None
self._manager_lock = Lock()
def init_rpc(self):
self.manager = SimpleRpcClient.from_addr(
addr=self.server_host,
port=self.server_port,
enable_ssl=True,
default_handler=DefaultHandler(),
)
self.proxy = ConnectionProxy(self.manager)
def authentify(self):
"""Try to authenticate to the server while the server returns a bad
role.
"""
try:
role = self.proxy.authentify(self.user_name, self.user_passwd)
except Exception:
logger.exception(u'Unknow exception while authentifying.')
raise
# set handler according to which role was returned by the cc-server
if role == u'host':
logger.debug(u'Role host affected.')
self.role = u'host'
elif role == u'hv':
logger.debug(u'Role hypervisor affected.')
self.role = u'hv'
else:
logger.debug(u'Wrong role returned: %s' % role)
role = None
time.sleep(2)
self.role = role
def rpc(self):
"""Runs rpc main loop."""
try:
self.manager.run()
except Exception:
logger.exception(u'Unknown exception:')
finally:
self.shutdown()
def run(self):
"""Node main loop."""
while True:
# init rpc connection
while True:
try:
self.init_rpc()
except Exception as e:
logger.exception(u'Error in init.')
else:
break
time.sleep(2)
# launch main rpc thread
rpc_thread = Thread(target=self.rpc)
rpc_thread.daemon = True
rpc_thread.start()
# launch auth thread, make sure rpc is still running
while rpc_thread.is_alive() and self.role is None:
auth_thread = Thread(target=self.authentify)
auth_thread.daemon = True
auth_thread.start()
auth_thread.join()
# wait for rpc thread to terminates (it means error)
rpc_thread.join()
logger.error('Reconnecting to server.')
# reset settings
self.role = None
def shutdown(self):
with self._manager_lock:
if self.manager is not None:
self.manager.shutdown()
self.manager = None
self.role = None
from inspect import isfunction
class Tag(object):
"""Class that abstract tags."""
def __init__(self, name, valuable, ttl):
"""
:param string name: tag name
:param valuable: something that gives tag value, string or function
:param None,int ttl: Time to live for caching the tags
"""
self.name = name
self._value = valuable
self.ttl = ttl
@property
def value(self):
"""Returns tag value."""
if isfunction(self._value):
return self._value()
return self._value
# -*- coding: utf-8 -*-
#
# Cloud-Control Node
# Copyright (C) 2011 SmartJog [http://www.smartjog.com]
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Authors:
# * 2011-06: Thibault VINCENT <thibault.vincent@smartjog.com>
#
from __future__ import absolute_import
from threading import Lock
class RWLock(object):
# pylint: disable=R0903
def __init__(self):
self._mutex = Lock()
self._writemutex = Lock()
self._readers = 0
self._writers = 0
self.read = self._RLock(self)
self.write = self._WLock(self)
class _Lock(object):
def __init__(self, rwlock):
self._parent = rwlock
class _WLock(_Lock):
def __enter__(self):
self.acquire()
def __exit__(self, exc_type, exc_value, traceback):
self.release()
def acquire(self):
# pylint: disable=W0212
with self._parent._mutex:
self._parent._writers += 1
self._parent._writemutex.acquire()
def release(self):
# pylint: disable=W0212
with self._parent._mutex:
self._parent._writers -= 1
self._parent._writemutex.release()
class _RLock(_Lock):
def __enter__(self):
self.acquire()
def __exit__(self, exc_type, exc_value, traceback):
self.release()
def acquire(self):
# pylint: disable=W0212
self._parent._mutex.acquire()
if self._parent._writers > 0 or self._parent._readers == 0:
self._parent._mutex.release()
self._parent._writemutex.acquire()
self._parent._mutex.acquire()
self._parent._readers += 1
self._parent._mutex.release()
def release(self):
# pylint: disable=W0212
self._parent._mutex.acquire()
self._parent._readers -= 1
if self._parent._readers == 0:
self._parent._writemutex.release()
self._parent._mutex.release()
import signal
def signal_(signum):
"""Decorate a function to register as a handler to the signal."""
def decorator(func):
signal.signal(signum, func)
return func
return decorator
[node]
address = cc-server.lab.fr.lan
login = tvincent_kvm
password = toto
verbosity = 3
virtualization = no
command_execution = yes
[ccserver]
host=cc-server.lab.fr.lan
port=1984
user=tvincent_kvm
password=toto
[loggers]
keys=root,ccnode
[handlers]
keys=syslog
[formatters]
keys=simpleFormatter
[logger_root]
level=ERROR
handlers=syslog
# handlers=fileHandler
[logger_ccnode]
level=INFO
handlers=
qualname=ccnode
[handler_syslog]
class=handlers.SysLogHandler
formatter=simpleFormatter
args=('/dev/log', handlers.SysLogHandler.LOG_DAEMON)
# [handler_fileHandler]
# class=FileHandler
# formatter=simpleFormatter
# args=('/var/log/cc-node.log',)
[formatter_simpleFormatter]
# format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
format=cc-node - %(asctime)s - %(name)s - %(levelname)s - %(message)s
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment