pax_global_header 0000666 0000000 0000000 00000000064 12113401062 0014500 g ustar 00root root 0000000 0000000 52 comment=98f7dc7ff96edcafc8c725bf0e066c05d08987e7
cc-node-v24/ 0000775 0000000 0000000 00000000000 12113401062 0012765 5 ustar 00root root 0000000 0000000 cc-node-v24/.gitignore 0000664 0000000 0000000 00000000114 12113401062 0014751 0 ustar 00root root 0000000 0000000 *.pyc
doc/_build/*
*.swp
*.log
test_*.py
*.test
/ccnode/geany_run_script.sh
cc-node-v24/CHANGELOG 0000664 0000000 0000000 00000000000 12113401062 0014165 0 ustar 00root root 0000000 0000000 cc-node-v24/COPYRIGHT 0000664 0000000 0000000 00000000040 12113401062 0014252 0 ustar 00root root 0000000 0000000 Copytight © 2010-2012 Smartjog
cc-node-v24/LICENSE 0000664 0000000 0000000 00000016743 12113401062 0014005 0 ustar 00root root 0000000 0000000 GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc.
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.
cc-node-v24/MANIFEST.in 0000664 0000000 0000000 00000000030 12113401062 0014514 0 ustar 00root root 0000000 0000000 recursive-include etc *
cc-node-v24/README 0000664 0000000 0000000 00000000000 12113401062 0013633 0 ustar 00root root 0000000 0000000 cc-node-v24/bin/ 0000775 0000000 0000000 00000000000 12113401062 0013535 5 ustar 00root root 0000000 0000000 cc-node-v24/bin/cc-node 0000775 0000000 0000000 00000005375 12113401062 0015005 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import os
import sys
import atexit
from optparse import OptionParser
from os.path import isfile, abspath
from daemon import DaemonContext
from cloudcontrol.common.client.exc import ConfigError
from cloudcontrol.node import __version__
from cloudcontrol.node.node import NodeLoop
from cloudcontrol.node.config import NodeConfigParser, configure_logging
DEFAULT_CONFIG_FILE = '/etc/cc-node.conf'
# command line arguments...
oparser = OptionParser(version='%%prog %s' % __version__)
oparser.add_option('-d', '--daemonize', default=False, action='store_true',
help=u'run as daemon and write pid file')
oparser.add_option('-c', '--config', metavar='FILE',
default=DEFAULT_CONFIG_FILE,
help=u'configuration file ABSOLUTE path (default: %default)')
oparser.add_option('-p', '--pidfile', metavar='FILE',
help=u'pid file path for daemon mode')
options, args = oparser.parse_args()
# check argument configuration
if options.daemonize and not options.pidfile:
sys.exit(u'Please supply a pid file...')
options.config = abspath(options.config)
if not isfile(options.config):
sys.exit(u'Please supply a valid path to configuration file...')
configure_logging(1, 'console')
try:
config = NodeConfigParser(options.config)
except ConfigError as exc:
sys.exit(exc.message)
# take care of pid file if daemon
if options.daemonize:
pidfile = open(options.pidfile, 'w')
files_preserve = [pidfile]
else:
files_preserve = None
if config.debug:
stderr = sys.stderr
stdout = sys.stdout
else:
stderr = None
stdout = None
with DaemonContext(detach_process=options.daemonize,
files_preserve=files_preserve,
stderr=stderr,
stdout=stdout):
# take care of pidfile
if options.daemonize:
pidfile.write('%s' % os.getpid())
pidfile.flush()
@atexit.register
def clean_pidfile():
pidfile.seek(0)
pidfile.truncate()
pidfile.flush()
NodeLoop(options.config).start()
cc-node-v24/cloudcontrol/ 0000775 0000000 0000000 00000000000 12113401062 0015474 5 ustar 00root root 0000000 0000000 cc-node-v24/cloudcontrol/__init__.py 0000664 0000000 0000000 00000001360 12113401062 0017605 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
__import__('pkg_resources').declare_namespace(__name__)
cc-node-v24/cloudcontrol/node/ 0000775 0000000 0000000 00000000000 12113401062 0016421 5 ustar 00root root 0000000 0000000 cc-node-v24/cloudcontrol/node/__init__.py 0000664 0000000 0000000 00000001410 12113401062 0020526 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
__product__ = 'Cloud-Control Node'
__version__ = '24'
__canonical__ = 'cc-node'
cc-node-v24/cloudcontrol/node/config.py 0000664 0000000 0000000 00000013303 12113401062 0020240 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import logging
import logging.config
from StringIO import StringIO
from itertools import ifilterfalse
from ConfigParser import SafeConfigParser, NoOptionError, NoSectionError
from cloudcontrol.common.client.exc import ConfigError
logger = logging.getLogger(__name__)
class _ConfigProxy(object):
"""Simple ConfigParser proxy that provide default values for get* methods.
"""
def __init__(self, config_parser):
self.config = config_parser
@staticmethod
def config_error(msg):
logger.error(msg)
raise ConfigError(msg)
def __getattr__(self, name):
if name.startswith('get'):
def getter(section, option, *default):
assert not default or len(default) == 1
try:
return getattr(self.config, name)(section, option)
except NoSectionError:
self.config_error('Section "%s" is not present in config'
' file' % section)
except NoOptionError:
if default:
return default[0]
self.config_error(
'Attribute "%s" not specified in config'
' file (section "%s")' % (option, section))
except ValueError:
self.config_error(
'Configuration attribute "%s" value is invalid'
' (section "%s")' % (option, section))
return getter
# else
return getattr(self.config, name)
class NodeConfigParser(object):
"""ConfigParser for ccnode config file."""
def __init__(self, file_path):
config = _ConfigProxy(SafeConfigParser())
config.read(file_path)
# ccserver settings
self.server_host = config.get('node', 'address')
self.server_port = config.getint('node', 'port', 1984)
self.server_user = config.get('node', 'login')
self.server_passwd = config.get('node', 'password')
# node settings
try:
self.logging_level = config.getint('node', 'verbosity', 0)
except ConfigError:
try:
self.logging_level = dict(
debug=3,
info=2,
warning=1,
error=0,
)[config.get('node', 'verbosity')]
except KeyError:
_ConfigProxy.config_error(
'Configuration attribute "verbosity"'
' is invalid (section "node")')
self.debug = config.getboolean('node', 'debug', False)
self.logging_output = 'console' if self.debug else 'syslog'
# path settings
self.jobs_store_path = config.get('node', 'jobs_store_path',
'/var/lib/cc-node/jobs')
# plugins persistance
self.plugins_store_path = config.get('node', 'plugins_store_path',
'/var/lib/cc-node/plugins')
# RPC handler ACLs
acl_section_name = 'node_handler'
if config.has_section(acl_section_name):
self.forbidden_handlers = set(ifilterfalse(
lambda o: config.getboolean(acl_section_name, o, True),
config.options(acl_section_name),
))
else:
self.forbidden_handlers = set()
# backward compatibility
command_execution_handlers = {'shutdown', 'execute'}
if config.getboolean('node', 'command_execution', None) == False:
self.forbidden_handlers |= command_execution_handlers
else:
self.forbidden_handlers -= command_execution_handlers
# deprecated options
for o in ('detect_hypervisor', 'force_xen'):
if config.get('node', o, None) is not None:
logger.warning('%s config option is not supported anymore', o)
def configure_logging(level, output):
level = {
0: 'ERROR', 1: 'WARNING',
2: 'INFO', 3: 'DEBUG',
}[level]
output = dict(
console="""
[handler_output]
class=StreamHandler
formatter=simpleFormatter
args=(sys.stderr,)
[formatter_simpleFormatter]
format=cc-node - %(asctime)s - %(name)s - %(levelname)s - %(message)s
""",
syslog="""
[handler_output]
class=handlers.SysLogHandler
formatter=simpleFormatter
args=('/dev/log', handlers.SysLogHandler.LOG_DAEMON)
[formatter_simpleFormatter]
class=cloudcontrol.common.helpers.formatter.EncodingFormatter
format=cc-node - %(name)s - %(levelname)s - %(message)s
""",
)[output]
# create config parser for logging configuration
logging.config.fileConfig(StringIO("""
[loggers]
keys=root,ccnode,cccommon,sjrpc
[handlers]
keys=output
[formatters]
keys=simpleFormatter
[logger_root]
level=ERROR
handlers=output
[logger_ccnode]
level=%(level)s
handlers=
qualname=cloudcontrol.node
[logger_cccommon]
level=%(level)s
handlers=
qualname=cloudcontrol.common
[logger_sjrpc]
level=ERROR
handlers=
qualname=sjrpc
%(output)s
""" % dict(level=level, output=output)))
cc-node-v24/cloudcontrol/node/exc.py 0000664 0000000 0000000 00000003704 12113401062 0017556 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
"""Exceptions classes for ccnode."""
class CCNodeError(Exception):
"""Base exception class for cc-node."""
pass
class PluginError(CCNodeError):
"""Exception related to plugin execution."""
pass
class UndefinedDomain(CCNodeError):
"""Operation on a domain that does not exist was tried."""
pass
class PoolStorageError(CCNodeError):
"""Pool or volume was not found."""
pass
class TunnelError(CCNodeError):
"""Error occured during TunnelJob execution."""
pass
class DRBDAllocationError(CCNodeError):
"""Cannot create DRBD volume."""
pass
class DRBDError(CCNodeError):
"""Error occured during DRBDJob execution."""
pass
class ConsoleError(CCNodeError):
"""Error relative to VM virtio console handling."""
pass
class ConsoleAlreadyOpened(ConsoleError):
"""VM virtio console is already opened."""
pass
class VMMigrationError(CCNodeError):
"""Error during live migration job."""
pass
class JobError(CCNodeError):
"""General exception for a job."""
pass
class RemoteExecutionError(CCNodeError):
"""Thrown when a remote command execution error occurs."""
pass
class ForbiddenHandler(CCNodeError):
"""Raised when a handler is called but disabled by configuration."""
pass
cc-node-v24/cloudcontrol/node/host/ 0000775 0000000 0000000 00000000000 12113401062 0017376 5 ustar 00root root 0000000 0000000 cc-node-v24/cloudcontrol/node/host/__init__.py 0000664 0000000 0000000 00000040633 12113401062 0021515 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import os
import pty
import errno
import fcntl
import subprocess
import logging
import os.path
import stat
import shutil
import struct
import socket
import termios
import tempfile
import threading
from fcntl import ioctl
import cPickle as pickle
from itertools import imap, chain
from subprocess import Popen, PIPE, STDOUT
import pyev
from sjrpc.utils import pass_connection, threadless
from sjrpc.core.exceptions import RpcError
from cloudcontrol.common.client.tags import Tag, tag_inspector, TagDB
from cloudcontrol.common.client.plugins import Base as BasePlugin, rpc_handler
from cloudcontrol.common.jobs import JobsManager, JobsStore
from cloudcontrol.common.helpers.logger import patch_logging; patch_logging()
from cloudcontrol.node.utils import EvPopen
from cloudcontrol.node.exc import RemoteExecutionError
from cloudcontrol.node.host import tags
from cloudcontrol.node.host.jobs import NodeJobsManagerInterface, ScriptJob
from cloudcontrol.node.host.plugins import PluginMethodJob, Plugin
logger = logging.getLogger(__name__)
def disk_tag_value(disk_name):
def size():
s = open(os.path.join('/sys/block', disk_name, 'size')).read().strip()
try:
s = int(s)
if s > 0:
return s * 512
else:
return None
except ValueError:
return None
return size
class FakePtySocket(object):
"""A fake socket object wrapping a :class:`subprocess.Popen` object
standard input/output.
"""
def __init__(self, fd):
self._fd = fd
def recv(self, size):
return os.read(self._fd, size)
def send(self, data):
return os.write(self._fd, data)
def fileno(self):
return self._fd
def setblocking(self, blocking):
if not blocking:
# Disable blocking mode on stdin:
current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
pass
else:
raise NotImplementedError('Not implemented')
def close(self):
pass
class RemoteShell(object):
"""Handles basic operations on remote shell."""
def __init__(self, conn, exec_='/bin/bash'):
"""
:param conn: sjRPC connection
:param exec_: path of binary to execute
"""
# handles close when waiting process to end from an other thread
self.master, self.slave = pty.openpty()
self.endpoint = FakePtySocket(self.master)
self.proto = conn.create_tunnel(endpoint=self.endpoint,
close_endpoint_on_shutdown=False,
on_shutdown=self.on_tunnel_shutdown)
self.conn = conn
self.child_watcher = None
try:
self.process = subprocess.Popen(
[exec_], stdout=self.slave,
bufsize=0, stdin=self.slave,
stderr=self.slave,
preexec_fn=os.setsid,
close_fds=True,
cwd=os.environ.get('HOME', '/'),
)
except OSError:
logger.exception('Error while executing remote shell')
# close opened fds
self.close()
raise
self.child_watcher = conn.loop.child(self.process.pid, False,
self.child_cb)
self.child_watcher.start()
@property
def label(self):
return self.proto.label
def resize(self, row, col, xpixel, ypixel):
if self.master is not None:
ioctl(self.master, termios.TIOCSWINSZ,
struct.pack('HHHH', row, col, xpixel, ypixel))
def on_tunnel_shutdown(self, *args):
# *args is for callback arguments (ignored)
self.proto, proto = None, self.proto # prevents multiple calls to
# close method
self.close()
logger.debug('Tunnel closed by sjRPC')
def child_cb(self, watcher, revents):
logger.debug('Tunnel closed by process termination')
self.process.returncode = watcher.rstatus
self.close()
def close(self):
if self.child_watcher is not None:
self.child_watcher.stop()
self.child_watcher = None
if self.process.returncode is None:
# process is still alive
self.process.kill()
if self.master is not None:
try:
os.close(self.master)
except OSError:
logger.error('Error when closing file descriptor for pty')
self.master = None
if self.slave is not None:
try:
os.close(self.slave)
except OSError:
logger.error('Error when closing slave file descriptor for pty')
self.slave = None
if self.proto is not None:
try:
self.proto.shutdown()
except Exception:
logger.error('Error while trying to close RPC tunnel')
self.proto = None
class Handler(BasePlugin):
"""Handler for host role."""
def __init__(self, *args, **kwargs):
BasePlugin.__init__(self, *args, **kwargs)
# add plugin tags
self.tag_db.add_tags(tag_inspector(tags, self))
# disk related tags
self.tag_db.add_tags(imap(
lambda d: Tag('disk%s_size' % d, disk_tag_value(d), 60),
self.tag_db['__main__']['disk']._calculate_value().split(),
))
# running shells
self.shells = dict()
# running remote commands
self.commands = set()
#: jobs manager (different from MainLoop.jobs_manager)
try:
self.jobs_manager = JobsManager(logger, NodeJobsManagerInterface(self),
JobsStore(self.main.config.jobs_store_path))
except EnvironmentError as e:
logger.critical('Cannot access jobs directory: %s', e.strerror)
self.main.stop()
raise
#: loaded plugins
self.plugins = {} # plugin name -> plugin object
#: cache directory for scripts
try:
self.scripts_dir = tempfile.mkdtemp(prefix='cc-node-script-cache')
except EnvironmentError as e:
logger.critical('Cannot create temporary directory for scripts: %s',
e.strerror)
self.main.stop()
raise
def start(self):
BasePlugin.start(self)
# load plugins if persistent file is here
if os.path.isfile(self.main.config.plugins_store_path):
logger.debug('Loading previous plugins')
try:
with open(self.main.config.plugins_store_path) as f:
to_load = pickle.load(f)
except EnvironmentError as exc:
logger.error('Cannot load previous plugins: %s (%s)',
exc.errno, exc.strerror)
except (
EOFError,
AttributeError,
ImportError,
IndexError,
pickle.UnpicklingError,
) as exc:
logger.error('Cannot read file, bad format, %s', exc)
else:
def plugins_install():
for p in to_load:
try:
self.plugin_install(self.main.rpc_con.rpc, None, p)
except Exception:
logger.exception('Error while loading plugin %s', p)
else:
logger.debug('Successfuly loaded plugin %s', p)
th = threading.Thread(target=plugins_install)
th.daemon = True
th.start()
else:
logger.debug('No previously loaded plugins')
def stop(self):
# remove script cache directory
shutil.rmtree(self.scripts_dir, ignore_errors=True)
# kill all currently running shells and commands
logger.debug('Kill currently running shells and commands')
for stuff in chain(self.shells.values(),
list(self.commands)):
stuff.close()
self.shells.clear()
self.commands.clear()
BasePlugin.stop(self)
def update_plugin_index(self):
self.tag_db['__main__']['plugins'].update_value()
# write plugins to file
try:
with open(self.main.config.plugins_store_path, 'w') as f:
pickle.dump([p for p in self.plugins], f)
except EnvironmentError as exc:
logger.error('Cannot save loaded plugins in \'%s\': %s (%s)',
self.main.config.plugins_store_path,
exc.errno, exc.strerror)
else:
logger.debug('Plugins state saved')
@rpc_handler
def execute_command(self, command):
"""Execute an arbitrary shell command on the host.
:param string command: shell command to run
"""
logger.debug('Executing command %s', command)
try:
remote_command = EvPopen(self.main, command, close_fds=True,
shell=True, stdout=PIPE, stderr=STDOUT)
except Exception:
logger.exception('Error while starting subprocess for executing '
' command %s', command)
raise
self.commands.add(remote_command)
try:
stdout, _ = remote_command.communicate()
except Exception:
logger.exception('Error while communicating with subprocess for'
' command %s', command)
raise
self.commands.remove(remote_command)
if remote_command.returncode != 0:
# 127 means command not found, 126 means not executable
if remote_command.returncode == 127:
raise RemoteExecutionError('Command not found: %s' % stdout)
elif remote_command.returncode == 126:
raise RemoteExecutionError('Command is not executable')
raise RemoteExecutionError('Child exited with non zero status %s' %
remote_command.returncode)
return stdout
@rpc_handler
def node_shutdown(self, reboot=True, gracefull=True):
"""Halt/Reboot the node.
:param bool reboot: halt/reboot the system
:param bool gracefull: force the operation (gracefull == not force)
"""
args = ['/sbin/shutdown']
if reboot:
args.append('-r')
if gracefull:
logger.info('Going to reboot the host...')
args.append('-f')
else:
logger.info('Going to force the reboot of the host...')
args.append('-n')
else:
# halt
args.append('-h -P')
if not gracefull:
logger.info('Going to halt the host...')
args.append('-n')
else:
logger.info('Going to force the halt of the host...')
args.append('0')
return self.execute_command(' '.join(args))
@threadless
@pass_connection
@rpc_handler
def shell(self, conn, shell='/bin/bash'):
"""Create a shell tunnel and return the label of the created tunnel.
"""
logger.debug('Opening shell')
remote_shell = RemoteShell(conn, shell)
self.shells[remote_shell.label] = remote_shell
return remote_shell.label
@threadless
@rpc_handler('resize')
def shell_resize(self, label, row, col, xpixel, ypixel):
"""Resize the shell's attached terminal."""
logger.debug('Shell resize')
self.shells[label].resize(row, col, xpixel, ypixel)
@pass_connection
@rpc_handler
def forward(self, conn, label, port, destination='127.0.0.1'):
"""TCP port forwarding."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
logger.exception('Cannot create socket in forward handler')
raise
try:
sock.connect((destination, port))
except socket.error:
logger.exception('Error while connecting to destination (forward)')
raise
conn.create_tunnel(label=label, endpoint=sock)
@pass_connection
@rpc_handler
def script_run(self, conn, sha1, script, owner, batch=None, *args):
# retrive script if not here
filename = os.path.join(self.scripts_dir, sha1)
if not os.access(filename, os.X_OK):
try:
sha1, content = conn.call('script_get', script)
except RpcError:
logger.error('Error while retrieving script: %s', script)
raise
with open(filename, "w") as f:
try:
f.write(content)
os.chmod(filename, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
except OSError:
logger.error(
'Could not set execution permission to script: %s',
script)
raise
except IOError:
logger.error(
'Could not write script to repository: %s', script)
raise
return self.jobs_manager.spawn(ScriptJob, owner, batch=batch,
settings=dict(
script=script,
filename=filename,
args=args,
)).id
@rpc_handler
def job_cancel(self, job_id):
self.jobs_manager.get(job_id).cancel()
@rpc_handler
def job_purge(self, job_id):
self.jobs_manager.purge(job_id)
@rpc_handler
def job_attachment(self, job_id, name):
"""
:param name: attachment name
"""
try:
return self.jobs_manager.get(job_id).read_attachment(name)
except Exception:
logger.exception('Error while getting job attachment')
raise
@pass_connection
@rpc_handler
def plugin_install(self, conn, sha1, name):
"""
:param conn: RPC connection
:param sha1: sha1 string or None (get the latest)
:param name: plugin name
"""
# check if plugin is not already loaded and upgrade it if the sha1 hash
# has changed:
if name in self.plugins:
if self.plugins[name].sha1 != sha1:
self.plugins[name].uninstall()
del self.plugins[name]
else:
return
# get plugin from server:
sha1_get, content = conn.call('plugin_get', name)
if sha1 is not None and sha1 != sha1_get:
raise RuntimeError('Requested sha1 is not available on the server')
# load the plugin:
plugin_logger = logger.getChild('plugin.%s' % name)
self.plugins[name] = Plugin(plugin_logger, TagDB(self.tag_db),
name, sha1, content)
self.update_plugin_index()
@rpc_handler
def plugin_uninstall(self, name):
plugin = self.plugins.pop(name, None)
if plugin is None:
raise KeyError('Plugin %r is not running' % name)
plugin.uninstall()
self.update_plugin_index()
@rpc_handler
def plugin_run(self, name, method, owner, batch=None, **kwargs):
if name not in self.plugins:
raise KeyError('Plugin %r is not running' % name)
try:
func = self.plugins[name].methods[method]
except KeyError:
raise KeyError('Unknown method %r' % method)
return self.jobs_manager.spawn(PluginMethodJob, owner, batch=batch,
settings=dict(
plugin_name=name,
method_name=method,
method=func,
method_kwargs=kwargs,
)).id
cc-node-v24/cloudcontrol/node/host/jobs.py 0000664 0000000 0000000 00000006162 12113401062 0020712 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
"""Interface implementation for JobsManager from cc-common."""
from datetime import datetime
import subprocess
from cloudcontrol.common.jobs import JobsManagerInterface, Job, JobCancelError
from cloudcontrol.common.tql.db.helpers import taggify
from cloudcontrol.common.client.tags import TagDB, Tag
class NodeJobsManagerInterface(JobsManagerInterface):
TAG_ATTRIBUTES = ('title', 'status', 'state', 'owner', 'created', 'ended',
'attachments', 'batch')
def __init__(self, host_handler):
self.handler = host_handler
self.tag_db = TagDB(self.handler.tag_db)
def on_job_created(self, job):
def get_tag_value(tag):
return lambda job: taggify(getattr(job, tag))
self.tag_db.add_sub_object(
job.id,
(Tag(tag, get_tag_value(tag), parent=job)
for tag in self.TAG_ATTRIBUTES),
'job',
)
def tag_duration(job):
if job.ended is None:
ended = datetime.fromtimestamp(self.handler.main.evloop.now())
else:
ended = job.ended
return taggify(ended - job.created)
self.tag_db.add_sub_tag(job.id, Tag('duration', tag_duration, 2, 1, job))
def on_job_updated(self, job):
for tag in self.TAG_ATTRIBUTES:
self.tag_db[job.id][tag].update_value()
def on_job_purged(self, job_id):
self.tag_db.remove_sub_object(job_id)
class ScriptJob(Job):
"""Job that runs a script."""
def job(self, script, filename, args):
self.title = 'Script: %s %s' % (script, ' '.join(args))
self.process = None
to_exec = (filename,) + args
output = self.attachment('output')
self.logger.debug('To exec: %s', to_exec)
try:
self.process = subprocess.Popen(to_exec, stdout=output,
stderr=output, close_fds=True)
self.logger.info('Script running with pid: %d', self.process.pid)
ex = self.process.wait()
if ex != 0:
raise JobCancelError('Script returned exit code %d' % ex)
except subprocess.CalledProcessError:
self.logger.error('Error while executing script: %s', script)
raise
def cancel(self):
super(ScriptJob, self).cancel()
if self.process is not None:
self.logger.info('Terminating job %s', self.title)
self.process.terminate()
cc-node-v24/cloudcontrol/node/host/plugins.py 0000664 0000000 0000000 00000010567 12113401062 0021442 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import imp
import logging
from collections import defaultdict
from cloudcontrol.common.client.exc import TagConflict
from cloudcontrol.common.client.tags import Tag
from cloudcontrol.common.jobs import Job
from cloudcontrol.common.tql.db.helpers import taggify
from cloudcontrol.node.exc import PluginError
logger = logging.getLogger(__name__)
def _tag_direct_use_error(*args, **kwargs):
raise RuntimeError('Plugins tags can\'t be called directly')
class Plugin(object):
""" Represent a loaded cloudcontrol plugin.
"""
def __init__(self, logger, tag_db, name, sha1, plugin_body):
self.logger = logger
self.name = name
self.sha1 = sha1
self.tags = []
self.tag_db = tag_db
self.methods = {}
self.events = defaultdict(lambda: [])
self._load_module(plugin_body)
self.trigger('install', self)
def _load_module(self, plugin_body):
# load the code into a module:
module = imp.new_module(self.name)
module.logger = self.logger
module.tag = self._decorator_tag
module.method = self._decorator_method
module.on = self._decorator_event
try:
exec plugin_body in module.__dict__
conflicts = self.tag_db.check_tags_conflict(*[tag.name for tag in
self.tags])
if conflicts:
raise TagConflict(
'Tags with names %s already exist' % ','.join(conflicts))
else:
self.tag_db.add_tags(self.tags)
except Exception as exc:
err_msg = 'Error during plugin installation (%s): %s' % (
self.name, exc)
logger.exception(err_msg)
# make sure all tags are stopped
self.tag_db.set_parent(None)
raise RuntimeError(err_msg)
else:
# prevents module from being garbage collected
self.module = module
def _decorator_tag(self, ttl=-1, refresh=None, name=None, background=False):
def decorator(func):
tag = Tag(func.__name__ if name is None else name,
lambda: taggify(func()), ttl=ttl, refresh=refresh,
background=background)
self.tags.append(tag)
return _tag_direct_use_error
return decorator
def _decorator_method(self, name=None):
def decorator(func):
register_name = func.__name__ if name is None else name
if register_name in self.methods:
raise PluginError('Already defined method %s' % register_name)
self.methods[register_name] = func
return func
return decorator
def _decorator_event(self, event):
def decorator(func):
self.events[event].append(func)
return func
return decorator
def trigger(self, event, *args, **kwargs):
"""Trigger an event."""
for func in self.events[event]:
func(*args, **kwargs)
def uninstall(self):
self.trigger('uninstall')
# shutdown the tags database:
self.tag_db.set_parent(None)
class PluginMethodJob(Job):
"""Job that run a plugin method."""
def job(self, plugin_name, method_name, method, method_kwargs):
kwargs_str = ', '.join(('%s=%s' % (k, v) for k, v in method_kwargs.iteritems()))
self.title = 'Plugin: %s.%s(%s)' % (plugin_name, method_name, kwargs_str)
try:
returned = method(self, **method_kwargs)
except Exception:
self.logger.exception('Error while executing method')
raise
if returned is not None:
self.attachment('output').write(returned)
cc-node-v24/cloudcontrol/node/host/tags.py 0000664 0000000 0000000 00000010251 12113401062 0020705 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
"""This module acts as a little framework for defining tags in a simple way.
Just define a string or a function and it will be introspected and used as a
tag value.
"""
import re
import os as os_
import platform as platform_
from multiprocessing import cpu_count
from socket import getfqdn
import psutil
from cloudcontrol.common.client.tags import ttl, refresh, background
@ttl(3600 * 24) # one day
@refresh(30)
@background
def h():
"""Hostname tag."""
return getfqdn()
# CPU related tags
def arch():
"""Hardware CPU architecture."""
return {
u'i386': u'x86',
u'i486': u'x86',
u'i586': u'x86',
u'i686': u'x86',
u'x86_64': u'x64',
}.get(platform_.machine(), u'unknown')
def cpu():
"""Number of CPU (core) on the host."""
try:
return unicode(cpu_count())
except NotImplementedError:
return None
@ttl(10)
@refresh(2)
def cpuuse():
"""CPU usage in percentage."""
return u'%.1f' % psutil.cpu_percent()
# memory related tags
def mem():
"""Total physical memory available on system."""
return unicode(psutil.avail_phymem() + psutil.used_phymem())
@ttl(5)
@refresh(10)
def memfree():
"""Available physical memory on system."""
return unicode(psutil.avail_phymem())
@ttl(5)
@refresh(10)
def memused():
"""Used physical memory on system."""
return unicode(psutil.used_phymem())
@ttl(5)
@refresh(10)
def membuffers():
"""Buffers memory use."""
return unicode(psutil.phymem_buffers())
@ttl(5)
@refresh(10)
def memcache():
"""Caches memory use."""
return unicode(psutil.cached_phymem())
# disks related tags
def disk():
"""List of disk devices on the host."""
disk_pattern = re.compile(r'[sh]d[a-z]+')
return u' '.join(d for d in os_.listdir(
'/sys/block/') if disk_pattern.match(d))
# other hardware related tags
def chaserial():
"""Blade chassis serial number."""
return open('/sys/class/dmi/id/chassis_serial').read().strip() or None
def chaasset():
"""Blade chassis asset tag."""
return open('/sys/class/dmi/id/chassis_asset_tag').read().strip() or None
def hmodel():
"""Host hardware model."""
return open('/sys/class/dmi/id/product_name').read().strip() or None
def hserial():
"""Host hardware serial number."""
return open('/sys/class/dmi/id/product_serial').read().strip() or None
def hvendor():
"""Host hardware vendor."""
return open('/sys/class/dmi/id/sys_vendor').read().strip() or None
def hbios():
"""Host BIOS version."""
return u'%s (%s)' % (
open('/sys/class/dmi/id/bios_version').read().strip() or None,
open('/sys/class/dmi/id/bios_date').read().strip() or None,
)
# Operating system related tags
def os():
"""Operating system (linux/windows)."""
return unicode(platform_.system().lower())
def platform():
"""Python platform.platform() info."""
return unicode(platform_.platform())
def uname():
"""As uname command (see python os.uname)."""
return u' '.join(os_.uname()) or None
@ttl(5)
@refresh(5)
def uptime():
"""Uptime of the system in seconds."""
return open('/proc/uptime').read().split()[0].split(u'.')[0] or None
@ttl(5)
@refresh(5)
def load():
"""Average of the number of processes in the run queue over the last 1, 5
and 15 minutes."""
load_ = None
try:
load_ = u' '.join(unicode(l) for l in os_.getloadavg())
except OSError:
pass
return load_
def plugins(handler):
return ' '.join(handler.plugins) or None
cc-node-v24/cloudcontrol/node/hypervisor/ 0000775 0000000 0000000 00000000000 12113401062 0020633 5 ustar 00root root 0000000 0000000 cc-node-v24/cloudcontrol/node/hypervisor/__init__.py 0000664 0000000 0000000 00000054225 12113401062 0022754 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import logging
import socket
from StringIO import StringIO
from xml.etree import cElementTree as et
import libvirt
from sjrpc.utils import threadless, pass_connection
from cloudcontrol.common.client.tags import Tag, tag_inspector
from cloudcontrol.common.client.plugins import (
rpc_handler,
rpc_handler_decorator_factory,
get_rpc_handlers,
)
from cloudcontrol.node.host import Handler as HostHandler
from cloudcontrol.node.hypervisor import tags
from cloudcontrol.node.hypervisor.kvm import KVM, LiveMigration
from cloudcontrol.node.exc import (
UndefinedDomain, DRBDError, PoolStorageError
)
from cloudcontrol.node.hypervisor.jobs import (
ImportVolume, ExportVolume, TCPTunnel, DRBD,
)
logger = logging.getLogger(__name__)
libvirt_handler_marker = '_libvirt_rpc_handler'
libvirt_handler = rpc_handler_decorator_factory(libvirt_handler_marker)
# FIXME find a way to refactor Handler and Hypervisor class
class Handler(HostHandler):
def __init__(self, *args, **kwargs):
"""
:param loop: MainLoop instance
:param hypervisor_name: hypervisor name
"""
self.hypervisor_name = kwargs.pop('hypervisor_name')
HostHandler.__init__(self, *args, **kwargs)
#: keep index of asynchronous calls
self.async_calls = dict()
self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb)
self.hypervisor = None
self._virt_connected = False
# list of libvirt related RPC handlers
self.virt_handlers = get_rpc_handlers(self, libvirt_handler_marker)
# register tags
self.tag_db.add_tags(tag_inspector(tags, self))
@property
def virt_connected(self):
return self._virt_connected
@virt_connected.setter
def virt_connected(self, value):
self._virt_connected = value
# update tags
for tag in ('vir_status', 'sto', 'nvm', 'vmpaused', 'vmstarted',
'vmstopped', 'hvver', 'libvirtver', 'hv'):
self.tag_db['__main__'][tag].update_value()
def start(self):
self.timer.start()
HostHandler.start(self)
def stop(self):
self.timer.stop()
if self.hypervisor is not None:
self.hypervisor.stop()
HostHandler.stop(self)
def virt_connect_cb(self, *args):
# initialize hypervisor instance
try:
self.hypervisor = KVM(
name=self.hypervisor_name,
handler=self,
)
except libvirt.libvirtError:
logger.exception('Error while connecting to libvirt')
return
self.virt_connected = True
# register hypervisor storage tags
for name, storage in self.hypervisor.storage.storages.iteritems():
self.tag_db.add_tags((
Tag('sto%s_state' % name, lambda sto: sto.state, 5, 5, storage),
Tag('sto%s_size' % name,
lambda sto: sto.capacity, 5, 5, storage),
Tag('sto%s_free' % name,
lambda sto: sto.available, 5, 5, storage),
Tag('sto%s_used' % name,
lambda sto: sto.capacity - sto.available, 5, 5, storage),
Tag('sto%s_type' % name, lambda sto: sto.type, 5, 5, storage),
Tag('sto%s_vol' % name,
lambda sto: ' '.join(sto.volumes) if sto.volumes else None,
5, 5, storage),
))
# register domains
for dom in self.hypervisor.domains.itervalues():
self.tag_db.add_sub_object(dom.name, dom.tags.itervalues(), 'vm')
# we must refresh those tags only when domains tags are registered to
# have the calculated values
for tag in ('cpualloc', 'cpurunning', 'memalloc', 'memrunning'):
self.tag_db['__main__'][tag].update_value()
# register libvirt handlers
self.rpc_handler.update(self.virt_handlers)
for k, v in self.virt_handlers.iteritems():
self.main.reset_handler(k, v)
# if everything went fine, unregister the timer
self.timer.stop()
def virt_connect_restart(self):
"""Restart libvirt connection.
This method might be called when libvirt connection is lost.
"""
if not self.virt_connected:
return
logger.error('Connection to libvirt lost, trying to restart')
# update connection state
self.virt_connected = False
# refresh those tags
for tag in ('cpualloc', 'cpurunning', 'memalloc', 'memrunning'):
self.tag_db['__main__'][tag].update_value()
# unregister tags that will be re registered later
for storage in self.hypervisor.storage.storages:
self.tag_db.remove_tags((
'sto%s_state' % storage,
'sto%s_size' % storage,
'sto%s_free' % storage,
'sto%s_used' % storage,
'sto%s_type' % storage,
'sto%s_vol' % storage,
))
# unregister sub objects (for the same reason)
for sub_id in self.tag_db.keys():
if sub_id == '__main__':
continue
self.tag_db.remove_sub_object(sub_id)
# stop and delete hypervisor instance
self.hypervisor.stop()
self.hypervisor = None
# remove handlers related to libvirt
for handler in self.virt_handlers:
del self.rpc_handler[handler]
self.main.remove_handler[handler]
# launch connection timer
self.timer.start()
def iter_vms(self, vm_names):
"""Utility function to iterate over VM objects using their names."""
if vm_names is None:
return
get_domain = self.hypervisor.domains.get
for name in vm_names:
dom = get_domain(name)
if dom is not None:
yield dom
@libvirt_handler
def vm_define(self, data, format='xml'):
logger.debug('VM define')
if format != 'xml':
raise NotImplementedError('Format not supported')
return self.hypervisor.vm_define(data)
@libvirt_handler
def vm_undefine(self, name):
logger.debug('VM undefine %s', name)
vm = self.hypervisor.domains.get(name)
if vm is not None:
vm.undefine()
@libvirt_handler
def vm_export(self, name, format='xml'):
logger.debug('VM export %s', name)
if format != 'xml':
raise NotImplementedError('Format not supported')
vm = self.hypervisor.domains.get(name)
if vm is None:
return
return vm.lv_dom.XMLDesc(0)
@libvirt_handler
def vm_stop(self, name):
logger.debug('VM stop %s', name)
try:
self.hypervisor.domains[name].stop()
except libvirt.libvirtError:
logger.exception('Error while stopping VM %s', name)
raise
except KeyError:
msg = 'Cannot stop VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_destroy(self, name):
logger.debug('VM destroy %s', name)
try:
self.hypervisor.domains[name].destroy()
except libvirt.libvirtError as exc:
# Libvirt raises exception 'domain is not running' even if domain
# is running, might be a bug in libvirt
if 'domain is not running' not in str(exc) or (
self.hypervisor.domains[name].state != 'running'):
logger.exception('Error while destroying VM %s', name)
raise
except KeyError:
msg = 'Cannot destroy VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_start(self, name, pause=False):
"""
:param str name: VM name to start
:param bool pause: start VM in pause
"""
logger.debug('VM start %s', name)
try:
self.hypervisor.domains[name].start(pause)
except libvirt.libvirtError:
logger.exception('Error while starting VM %s', name)
raise
except KeyError:
msg = 'Cannot start VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_suspend(self, name):
logger.debug('VM suspend %s', name)
try:
self.hypervisor.domains[name].suspend()
except libvirt.libvirtError:
logger.exception('Error while suspending VM %s', name)
raise
except KeyError:
msg = 'Cannot suspend VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_resume(self, name):
logger.debug('VM resume %s', name)
try:
self.hypervisor.domains[name].resume()
except libvirt.libvirtError:
logger.exception('Error while resuming VM %s', name)
raise
except KeyError:
msg = 'Cannot resume VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_migrate_tunneled(self, name, tun_res, migtun_res, unsafe=False,
timeout=60.):
"""Live migrate VM through TCP tunnel.
:param name: VM name to migrate
:param tun_res: result of tunnel_setup handler
:param migtun_res: result of tunnel setup handler
:param bool unsafe: unsafe migration
:param float timeout: timeout for libvirt migration (prevents libvirt
from trying to acquire domain lock forever)
:param float timeout: migration timeout in seconds
"""
logger.debug('VM live migrate %s', name)
try:
# this is the port used by our libvirt in the cc-node (client
# libvirt) to connect to the remote libvirtd
remote_virt_port = tun_res['port']
except KeyError:
logger.error('Invalid formatted argument tun_res for live'
' migration')
raise
try:
# this is the port used by local libvirtd to connect to the remote
# libvirtd (see http://libvirt.org/migration.html)
remote_virt_port2 = migtun_res['port']
except KeyError:
logger.error('Invalid formatted argument migtun_res for live'
' migration')
raise
try:
vm = self.hypervisor.domains[name]
except KeyError:
logger.exception('Cannot find domain %s on hypervisor for live'
' migration', name)
raise
migration = LiveMigration(self.main, vm, remote_virt_port,
remote_virt_port2, timeout, unsafe)
try:
migration.wait()
except Exception:
logger.exception('Error during live migration for vm %s', name)
logger.debug('Exit status %d', migration.return_status)
raise
logger.info('Sucessfuly migrated vm %s', name)
@threadless
@pass_connection
@libvirt_handler
def vm_open_console(self, conn, name):
"""
:param conn: sjRPC connection instance
:param name: VM name
"""
vm = self.hypervisor.domains[name]
# create connection to the VM console
try:
endpoint = vm.open_console()
except socket.error:
# cannot create socketpair
logger.error('Cannot create connection to VM console')
raise
except Exception:
logger.exception('Error while trying to open console for domain %s',
name)
raise
def on_shutdown(tun):
"""Method of Tunnel protocol close callback."""
vm.close_console()
# connect as tunnel endpoint
proto = conn.create_tunnel(endpoint=endpoint, on_shutdown=on_shutdown)
return proto.label
@libvirt_handler
def vm_disable_virtio_cache(self, name):
"""Set virtio cache to none on VM disks.
:param name: VM name
"""
vm = self.hypervisor.domains[name]
# get VM XML
try:
xml = vm.lv_dom.XMLDesc(0)
except libvirt.libvirtError:
logger.exception('Error while getting domain XML from libvirt, %s',
vm.name)
raise
xml_tree = et.ElementTree()
xml_tree.parse(StringIO(xml))
for disk in xml_tree.findall('devices/disk'):
# check that disk is virtio
target = disk.find('target')
if target is None or target.get('bus') != 'virtio':
continue
# modify cache attr
driver = disk.find('driver')
assert driver is not None
driver.set('cache', 'none')
logger.debug('Set cache attribute for disk %s of VM %s',
target.get('dev'), name)
# write back the XML tree
out = StringIO()
xml_tree.write(out) # check encoding is fine
try:
self.hypervisor.vir_con.defineXML(out.getvalue())
except libvirt.libvirtError:
logger.exception('Cannot update XML file for domain %s', name)
raise
@libvirt_handler
def vm_set_autostart(self, name, autostart=True):
"""Set autostart on VM.
:param name: VM name
:param bool autostart: autostart value to set
"""
vm = self.hypervisor.domains[name]
vm.lv_dom.setAutostart(int(bool(autostart)))
# update autostart value now instead of 10 seconds lag
vm.tags['autostart'].update_value()
@libvirt_handler
def vol_create(self, pool, name, size):
logger.debug('Volume create %s, pool %s, size %s', name, pool, size)
try:
self.hypervisor.storage.create_volume(pool, name, size)
except Exception:
logger.exception('Error while creating volume')
raise
@libvirt_handler
def vol_delete(self, pool, name):
logger.debug('Volume delete %s, pool %s', name, pool)
try:
self.hypervisor.storage.delete_volume(pool, name)
except Exception:
logger.exception('Error while deleting volume')
raise
@libvirt_handler
def vol_import(self, pool, name):
"""
:param pool: pool name where the volume is
:param name: name of the volume
"""
logger.debug('Volume import pool = %s, volume = %s', pool, name)
try:
pool = self.hypervisor.storage.get_storage(pool)
if pool is None:
raise PoolStorageError('Pool storage does not exist')
volume = pool.volumes.get(name)
if volume is None:
raise PoolStorageError('Volume does not exist')
# create the job
job = self.main.job_manager.create(ImportVolume, volume)
job.start()
except Exception:
logger.exception('Error while starting import job')
raise
return dict(id=job.id, port=job.port)
@libvirt_handler
def vol_import_wait(self, job_id):
"""Block until completion of the given job id."""
job = self.main.job_manager.get(job_id)
logger.debug('Waiting for import job to terminate')
job.wait()
logger.debug('Import job terminated')
return dict(id=job.id, log='', checksum=job.checksum)
@libvirt_handler
def vol_import_cancel(self, job_id):
"""Cancel import job."""
logger.debug('Cancel import job')
job = self.main.job_manager.get(job_id)
self.main.job_manager.cancel(job_id)
# wait for job to end
job.join() # we don't call wait as it is already called in
# vol_import_wait handler
@libvirt_handler
def vol_export(self, pool, name, raddr, rport):
"""
:param pool: pool name where the volume is
:param name: name of the volume
:param raddr: IP address of the destination to send the volume to
:param rport: TCP port of the destination
"""
pool = self.hypervisor.storage.get_storage(pool)
if pool is None:
raise PoolStorageError('Pool storage does not exist')
volume = pool.volumes.get(name)
if volume is None:
raise PoolStorageError('Volume does not exist')
try:
job = self.main.job_manager.create(ExportVolume, volume, raddr, rport)
job.start()
job.wait()
except Exception:
logger.exception('Error while exporting volume')
raise
logger.debug('Export volume successfull')
return dict(id=job.id, log='', checksum=job.checksum)
@threadless
def tun_setup(self, local=True):
"""Set up local tunnel and listen on a random port.
:param local: indicate if we should listen on localhost or all
interfaces
"""
logger.debug('Tunnel setup: local = %s', local)
# create job
job = self.main.job_manager.create(TCPTunnel)
job.setup_listen('127.0.0.1' if local else '0.0.0.0')
return dict(
jid=job.id,
key='FIXME',
port=job.port,
)
@threadless
@rpc_handler
def tun_connect(self, res, remote_res, remote_ip):
"""Connect tunnel to the other end.
:param res: previous result of `tun_setup` handler
:param remote_res: other end result of `tun_setup` handler
:param remote_ip: where to connect
"""
logger.debug('Tunnel connect %s %s', res['jid'], remote_ip)
job = self.main.job_manager.get(res['jid'])
job.setup_connect((remote_ip, remote_res['port']))
job.start()
@threadless
@rpc_handler
def tun_connect_hv(self, res, migration=False):
"""Connect tunnel to local libvirt Unix socket.
:param res: previous result of `tun_setup` handler
"""
logger.debug('Tunnel connect hypervisor %s', res['jid'])
job = self.main.job_manager.get(res['jid'])
job.setup_connect('/var/run/libvirt/libvirt-sock')
job.start()
@threadless
@rpc_handler
def tun_destroy(self, res):
"""Close given tunnel.
:param res: previous result as givent by `tun_setup` handler
"""
logger.debug('Tunnel destroy %s', res['jid'])
job = self.main.job_manager.get(res['jid'])
self.main.job_manager.cancel(job.id)
job.wait()
@libvirt_handler
def drbd_setup(self, pool, name):
"""Create DRBD volumes.
:param pool: storage pool
:param name: storage volume name
"""
pool = self.hypervisor.storage.get_storage(pool)
if pool is None:
raise DRBDError('Cannot setup DRBD: pool storage does not exist')
elif pool.type != 'logical':
raise DRBDError('Cannot setup DRBD: pool storage is not LVM')
volume = pool.volumes.get(name)
if volume is None:
raise DRBDError('Cannot setup DRBD: volume does not exist')
try:
job = self.main.job_manager.create(DRBD, self.hypervisor.storage,
pool, volume)
except Exception:
logger.exception('Error while creating DRBD job')
raise
job.setup()
logger.debug('DRBD setup successfull')
return dict(
jid=job.id,
port=job.drbd_port,
)
@libvirt_handler
def drbd_connect(self, res, remote_res, remote_ip):
"""Set up DRBD in connect mode. (Wait for connection and try to connect
to the remote peer.
:param res: previous result of `drbd_setup` handler
:param remote_res: result of remote `drbd_setup` handler
:param remote_ip: IP of remote peer
"""
job = self.main.job_manager.get(res['jid'])
job.connect(remote_ip, remote_res['port'])
job.wait_connection()
@libvirt_handler
def drbd_role(self, res, primary):
"""Set up DRBD role.
:param res: previous result of `drbd_setup` handler
:param bool primary: if True, set up in primary mode else secondary
"""
job = self.main.job_manager.get(res['jid'])
if primary:
job.switch_primary()
else:
job.switch_secondary()
@libvirt_handler
def drbd_takeover(self, res, state):
"""Set up DRBD device as the VM disk. FIXME
:param res: previous result of `drbd_setup` handler
:param state: FIXME
"""
job = self.main.job_manager.get(res['jid'])
job.takeover()
@libvirt_handler
def drbd_sync_status(self, res):
"""Return synchronization status of a current DRBD job.
:param res: previous result of `drbd_setup` handler
"""
status = self.main.job_manager.get(res['jid']).status()
result = dict(
done=status['disk'] == 'UpToDate',
completion=status['percent'],
)
logger.debug('DRBD status %s', result)
return result
@libvirt_handler
def drbd_shutdown(self, res):
"""Destroy DRBD related block devices.
:param res: previous result of `drbd_setup` handler
"""
logger.debug('DRBD shutdown')
job = self.main.job_manager.get(res['jid'])
job.cleanup()
# remove job from job_manager list
self.main.job_manager.notify(job)
cc-node-v24/cloudcontrol/node/hypervisor/domains/ 0000775 0000000 0000000 00000000000 12113401062 0022265 5 ustar 00root root 0000000 0000000 cc-node-v24/cloudcontrol/node/hypervisor/domains/__init__.py 0000664 0000000 0000000 00000034357 12113401062 0024412 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import errno
import logging
import socket
import weakref
from StringIO import StringIO
from xml.etree import cElementTree as et
from collections import namedtuple
from itertools import izip, count
import pyev
import libvirt
from cloudcontrol.common.client.tags import Tag, tag_inspector
from cloudcontrol.node.hypervisor import lib as _libvirt
from cloudcontrol.node.hypervisor.lib import DOMAIN_STATES as STATE
from cloudcontrol.node.hypervisor.domains import vm_tags
from cloudcontrol.node.utils import SocketBuffer
from cloudcontrol.node.exc import ConsoleAlreadyOpened, ConsoleError
logger = logging.getLogger(__name__)
NetworkInterface = namedtuple('NetworkInterface', ('source', 'mac', 'model'))
class VirtualMachine(object):
"""Represent a VM instance."""
#: buffer size for console connection handling
BUFFER_LEN = 1024
def __init__(self, dom, hypervisor):
"""
:param dom: libvirt domain instance
:param hypervisor: hypervisor where the VM is
"""
self.hypervisor = weakref.proxy(hypervisor)
#: UUID string of domain
self.uuid = dom.UUIDString()
self.name = dom.name()
#: state of VM: started, stoped, paused
self._state = STATE[dom.info()[0]]
#: tags for this VM
# FIXME use a tag db instance
self.tags = dict((t.name, t) for t in tag_inspector(vm_tags, self))
#: Driver cache behavior for each VM storage, see
#: http://libvirt.org/formatdomain.html#elementsDisks
self.cache_behaviour = dict()
# define dynamic tags
for i, v in izip(count(), self.iter_disks()):
for t in (
Tag('disk%s_size' % i, v.capacity, 10),
Tag('disk%s_path' % i, v.path, 10),
Tag('disk%s_pool' % i, v.storage, 10), # FIXME: change
Tag('disk%s_vol' % i, v.name, 10),
Tag('disk%s_cache' %i,
lambda: self.cache_behaviour.get(v.path), 10)
):
self.tags[t.name] = t
for i, nic in izip(count(), self.iter_nics()):
for t in (
Tag('nic%s_mac' % i, nic.mac),
Tag('nic%s_source' % i, nic.source),
Tag('nic%s_model' % i, nic.model),
):
self.tags[t.name] = t
#: keep record of CPU stats (libev timestamp, cpu time)
self.cpu_stats = (hypervisor.handler.main.evloop.now(), dom.info()[4])
# attributes related to console handling
self.stream = None
self.sock = None # socketpair endpoint
self.read_watcher = None # ev watcher for sock
self.write_watcher = None # ev watcher for sock
self.from_tunnel = None # buffer
self.from_stream = None # buffer
self.stream_handling = 0 # libvirt stream event mask
self.redefine_on_stop = False # for XML update (see KVM class)
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
self.tags['status'].update_value()
self.tags['vncport'].update_value()
@property
def lv_dom(self):
"""Libvirt domain instance."""
return self.hypervisor.vir_con.lookupByUUIDString(self.uuid)
def start(self, pause=False):
flags = 0
if pause:
flags |= libvirt.VIR_DOMAIN_START_PAUSED
self.lv_dom.createWithFlags(flags)
def stop(self):
self.lv_dom.shutdown()
def suspend(self):
self.lv_dom.suspend()
def resume(self):
self.lv_dom.resume()
def destroy(self):
self.lv_dom.destroy()
def undefine(self):
self.lv_dom.undefine()
@property
def disks(self):
return list(self.iter_disks())
def iter_disks(self):
for d in et.ElementTree().parse(
StringIO(self.lv_dom.XMLDesc(0))
).findall('devices/disk'):
if d.get('device') != 'disk':
continue
type_ = d.get('type')
if type_ not in ('file', 'block'):
continue
path = d.find('source').get(dict(file='file', block='dev')[type_])
# update cache behaviour
driver = d.find('driver')
if driver is None:
driver = {}
self.cache_behaviour[path] = driver.get('cache', 'default')
volume = self.hypervisor.storage.get_volume(path)
if volume is None:
continue
yield volume
@property
def nics(self):
return list(self.iter_nics())
def iter_nics(self):
for nic in et.ElementTree().parse(
StringIO(self.lv_dom.XMLDesc(0))
).findall('devices/interface'):
if nic.get('type') == 'bridge':
try:
mac = nic.find('mac').get('address')
except AttributeError:
mac = None
try:
model = nic.find('model').get('type')
except AttributeError:
model = None
try:
source = nic.find('source').get('bridge')
except AttributeError:
source = None
yield NetworkInterface(
mac=mac,
source=source,
model=model,
)
def open_console(self):
if self.stream is not None:
raise ConsoleAlreadyOpened('Console for this VM is already'
' opened')
if str(
self.hypervisor.handler.tag_db['__main__']['libvirtver'].value,
).startswith('8'):
raise ConsoleError(
'Cannot open console, not compatible with this version of libvirt')
logger.info('Opening console stream on VM %s', self.name)
try:
self.stream = self.hypervisor.vir_con.newStream(
libvirt.VIR_STREAM_NONBLOCK)
except libvirt.libvirtError:
logger.error('Cannot create new stream for console %s', self.name)
self.close_console()
raise
self.stream_handling = libvirt.VIR_STREAM_EVENT_READABLE | (
libvirt.VIR_STREAM_EVENT_ERROR | libvirt.VIR_STREAM_EVENT_HANGUP)
try:
self.lv_dom.openConsole(None, self.stream, 0)
self.stream.eventAddCallback(self.stream_handling,
self.virt_console_stream_cb,
None)
except libvirt.libvirtError:
logger.error('Cannot open console on domain %s', self.name)
self.close_console()
raise
try:
self.sock, tunnel_endpoint = socket.socketpair()
except socket.error:
logger.error('Cannot create socket pair for console on domain %s',
self.name)
self.close_console()
raise
try:
self.sock.setblocking(0)
except socket.error:
logger.error('Cannot set socket to non blocking for console on'
' domain %s', self.name)
self.close_console()
raise
self.read_watcher = self.hypervisor.handler.main.evloop.io(
self.sock, pyev.EV_READ, self.read_from_tun_cb)
self.read_watcher.start()
self.write_watcher = self.hypervisor.handler.main.evloop.io(
self.sock, pyev.EV_WRITE, self.write_to_tun_cb)
# self.write_watcher.start()
self.from_tunnel = SocketBuffer(4096)
self.from_stream = SocketBuffer(4096)
return tunnel_endpoint
def virt_console_stream_cb(self, stream, events, opaque):
"""Handles read/write from/to libvirt stream."""
if events & libvirt.VIR_EVENT_HANDLE_ERROR or (
events & libvirt.VIR_EVENT_HANDLE_HANGUP):
# error/hangup
# logger.debug('Received error on stream')
self.close_console()
return
if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
# logger.debug('Write to stream')
# logger.debug('Event %s', self.stream_handling)
while True:
try:
to_send = self.from_tunnel.popleft()
except IndexError:
# update libvirt event mask
self.stream_handling ^= libvirt.VIR_STREAM_EVENT_WRITABLE
self.stream.eventUpdateCallback(self.stream_handling)
break
send_buffer = to_send
total_sent = 0
while True:
try:
written = self.stream.send(send_buffer)
except:
# libvirt send error
logger.exception('Error while writing to stream')
self.close_console()
return
if written == -2: # equivalent to EAGAIN
self.from_tunnel.appendleft(to_send[total_sent:])
break
elif written == len(send_buffer):
break
total_sent += written
send_buffer = buffer(to_send, total_sent)
if not self.from_tunnel.is_full():
self.read_watcher.start()
if events & libvirt.VIR_EVENT_HANDLE_READABLE:
# logger.debug('Read from stream')
# logger.debug('Event %s', self.stream_handling)
while True:
try:
incoming = self.stream.recv(self.BUFFER_LEN)
except:
pass
if incoming == -2:
# equivalent to EAGAIN
break
elif not incoming:
# EOF
self.close_console()
return
self.from_stream.append(incoming)
if self.from_stream.is_full():
# update libvirt event mask
self.stream_handling ^= libvirt.VIR_STREAM_EVENT_READABLE
self.stream.eventUpdateCallback(self.stream_handling)
break
if not self.from_stream.is_empty():
self.write_watcher.start()
def read_from_tun_cb(self, watcher, revents):
"""Read data from tunnel and save into buffer."""
# logger.debug('Read from tunnel')
# logger.debug('Event %s', self.stream_handling)
while True:
try:
incoming = self.sock.recv(self.BUFFER_LEN)
except socket.error as exc:
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
break
logger.exception('Error reading on socket for vm console')
self.close_console()
return
if not incoming:
# EOF (we could wait before closing console stream)
self.close_console()
return
self.from_tunnel.append(incoming)
if self.from_tunnel.is_full():
self.read_watcher.stop()
break
if not self.from_tunnel.is_empty():
# update libvirt event callback
self.stream_handling |= libvirt.VIR_STREAM_EVENT_WRITABLE
self.stream.eventUpdateCallback(self.stream_handling)
def write_to_tun_cb(self, watcher, revents):
"""Write data from buffer to tunnel."""
# logger.debug('Write to tunnel')
# logger.debug('Event %s', self.stream_handling)
while True:
try:
to_send = self.from_stream.popleft()
except IndexError:
self.write_watcher.stop()
break
send_buffer = to_send
total_sent = 0
while True:
try:
written = self.sock.send(send_buffer)
except socket.error as exc:
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
self.from_stream.appenleft(to_send[total_sent:])
break
logger.exception('Error writing on socket for vm console')
self.close_console()
return
if written == len(send_buffer):
break
total_sent += written
send_buffer = buffer(to_send, total_sent)
if not self.from_stream.is_full():
# update libvirt event callback
self.stream_handling |= libvirt.VIR_STREAM_EVENT_READABLE
self.stream.eventUpdateCallback(self.stream_handling)
def close_console(self):
logger.info('Closing console stream on VM %s', self.name)
if self.stream is not None:
try:
self.stream.eventRemoveCallback()
except Exception:
logger.error('Error while removing callback on stream')
try:
self.stream.finish()
except Exception:
logger.error('Cannot finnish console stream')
self.stream = None
if self.sock is not None:
try:
self.sock.close()
except socket.error:
logger.error('Cannot close socket')
self.sock = None
if self.read_watcher is not None:
self.read_watcher.stop()
self.read_watcher = None
if self.write_watcher is not None:
self.write_watcher.stop()
self.write_watcher = None
self.from_tunnel = None
self.from_stream = None
cc-node-v24/cloudcontrol/node/hypervisor/domains/vm_tags.py 0000664 0000000 0000000 00000007532 12113401062 0024306 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import logging
from functools import wraps
from xml.etree import cElementTree as et
from StringIO import StringIO
import libvirt
from cloudcontrol.common.client.tags import ttl, refresh
logger = logging.getLogger(__name__)
def _vir_tag(func):
"""Catches libvirt related exception.
Decorator used for tag declarations that interacts with libvirt.
"""
@wraps(func)
def decorated(dom):
if not dom.hypervisor.handler.virt_connected:
return
try:
return func(dom)
except libvirt.libvirtError as exc:
if 'Domain not found' in str(exc):
# sometimes, libvirt tells us too late when a domain is removed
# we just ignore the error and remove the domain
dom.hypervisor.vm_unregister(dom.name)
return
logger.exception('Unexpected libvirt error')
dom.hypervisor.handler.virt_connect_restart()
return decorated
def uuid(dom):
"""Unique identifier of the domain."""
return dom.uuid
def status(dom):
return dom.state
def hv(dom):
#FIXME: what shoud be the value of this tag ?
return dom.hypervisor.name
def htype(dom):
return dom.hypervisor.type
@_vir_tag
def arch(dom):
"""VM CPU architecture."""
try:
return dict(i686='x86', x86_64='x64')[et.ElementTree().parse(
StringIO(dom.lv_dom.XMLDesc(0))).find('os/type').get('arch')]
except Exception:
logger.exception('Error while get Architecture tag')
def h(dom):
"""Name of the VM."""
return dom.name
@_vir_tag
def cpu(dom):
"""Number of CPU of the VM."""
return dom.lv_dom.info()[3]
@ttl(10)
@refresh(5)
@_vir_tag
def cpuuse(dom):
"""Represent CPU use in percent average on 5 seconds."""
state, _, _, vcpu, cpu_time = dom.lv_dom.info()
if state != 1: # if VM is not running
return None
old_cpu_stats = dom.cpu_stats
dom.cpu_stats = (dom.hypervisor.handler.main.evloop.now(), cpu_time)
# calculate CPU percentage, code is from virt-manager in domain.py:1210
return '%.2f' % max(0., min(100., ((cpu_time - old_cpu_stats[1]) * 100.) / (
(dom.cpu_stats[0] - old_cpu_stats[0]) * 1000. * 1000. * 1000.)))
@_vir_tag
def mem(dom):
"""Memory currently allocated."""
return dom.lv_dom.info()[2] * 1024
@_vir_tag
def memmax(dom):
"""Maximum memory allocation."""
return dom.lv_dom.info()[1] * 1024
@_vir_tag
def vncport(dom):
"""VNC port for the VM console access."""
try:
port = et.ElementTree().parse(
StringIO(dom.lv_dom.XMLDesc(0))
).find('devices/graphics').get('port')
except Exception:
logger.exception('VNCPort')
raise
if 0 < int(port) < 65536:
return port
return
@ttl(10)
@refresh(10)
@_vir_tag
def disk(dom):
"""Get backend disks."""
return u' '.join(map(str, xrange(len(dom.disks)))) or None
@ttl(10)
@refresh(10)
@_vir_tag
def nic(dom):
"""VM network interfaces."""
return u' '.join(map(str, xrange(len(dom.nics)))) or None
@refresh(10)
@_vir_tag
def autostart(dom):
"""Autostart status."""
return {True: 'yes', False: 'no'}[bool(dom.lv_dom.autostart())]
cc-node-v24/cloudcontrol/node/hypervisor/jobs.py 0000664 0000000 0000000 00000104410 12113401062 0022142 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import io
import os
import errno
import socket
import logging
from functools import partial
from os.path import exists as path_exists
from time import sleep
from hashlib import md5
from StringIO import StringIO
from subprocess import CalledProcessError
import sys
from xml.etree import ElementTree as et
import pyev
from cloudcontrol.node.exc import TunnelError, DRBDAllocationError, DRBDError
from cloudcontrol.node.jobs import BaseIOJob, ForkedJob
from cloudcontrol.node.utils import SocketBuffer, subproc_call, Singleton
logger = logging.getLogger(__name__)
class ImportVolume(BaseIOJob):
"""Import volume job.
"""
BUFFER_LEN = 8192 * 16
HASH = md5
def __init__(self, job_manager, volume):
BaseIOJob.__init__(self, job_manager)
self.checksum = None
self.volume = volume
# where the other node will connect
self.port = None
# fds
self.sock = None
self.client_sock = None
self.disk = None
@property
def open_fds(self):
return [fo.fileno() for fo in (self.sock, self.client_sock, self.disk)
if fo is not None]
def clean_fds(self):
if self.sock is not None:
self.sock.close()
self.sock = None
if self.client_sock is not None:
self.client_sock.close()
self.client_sock = None
if self.disk is not None:
self.disk.close()
self.disk = None
def pre_job(self):
"""
:returns: port number the socket is listening on
"""
# create socket
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
logger.exception('Error while creating socket for volume export')
self.clean_fds()
raise
try:
self.sock.settimeout(10.)
except socket.error:
logger.exception('Cannot set timeout on socket for volume export')
self.clean_fds()
raise
try:
self.sock.bind(('0.0.0.0', 0))
except socket.error:
logger.exception('Error while binding socket for volume export')
self.clean_fds()
raise
try:
self.sock.listen(1)
except socket.error:
logger.exception('Error while listening on socket')
self.clean_fds()
raise
# open local disk
try:
self.disk = io.open(self.volume.path, 'wb', 0)
except IOError:
logger.exception('Error while trying to open local disk')
self.clean_fds()
raise
self.port = self.sock.getsockname()[1]
return self.port
def run_job(self):
try:
self.client_sock, _ = self.sock.accept()
except socket.timeout:
sys.stderr.write('Error for importing job: client did not connect\n')
self.clean_fds()
raise
except socket.error:
sys.stderr.write('Error while accepting socket\n')
self.clean_fds()
raise
# close the listening socket
self.sock.close()
self.sock = None
checksum = self.HASH()
# start downloading disk image
while self.running:
try:
received = [] # keep a list of received buffers in order to do
# only one concatenation in the end
total_received = 0
while True:
recv_buf = self.client_sock.recv(self.BUFFER_LEN - total_received)
# sys.stderr.write('Received %d\n' % len(recv_buf))
if not recv_buf: # EOF
# in case received in not empty, we will come back here
# once again and it returns EOF one more time
break
total_received += len(recv_buf)
received.append(recv_buf)
if total_received == self.BUFFER_LEN:
break
except socket.error:
sys.stderr.write('Error while receiving disk image\n')
self.clean_fds()
raise
buffer_ = b''.join(received)
if not buffer_:
sys.stderr.write('Received EOF import job\n')
break
checksum.update(buffer_)
try:
written = 0
# FIXME never write small chuncks
# in which case does disk.write would not write all the buffer ?
to_send = buffer_
while True:
written += self.disk.write(to_send)
# sys.stderr.write('Written %s to disk\n' % written)
to_send = buffer(buffer_, written)
if not to_send:
break
except IOError:
sys.stderr.write('Error while writing image to disk\n')
self.clean_fds()
raise
# here we could not have received the full disk but we don't consider
# this as an error in the import part
self.checksum = checksum.hexdigest()
# clean the fds
self.clean_fds()
sys.stderr.write('Volume import done\n')
class ExportVolume(BaseIOJob):
"""Export volume job.
"""
BUFFER_LEN = 8192 * 16
HASH = md5
def __init__(self, job_manager, volume, raddr, rport):
"""
:param volume: :class:`Volume` instance
:param raddr: remote IP address
:param rport: remote TCP port
"""
BaseIOJob.__init__(self, job_manager)
# where to connect to send the volume
self.raddr = raddr
self.rport = rport
self.volume = volume
self.checksum = None
# fds
self.sock = None
self.disk = None
@property
def open_fds(self):
return [fo.fileno() for fo in (self.sock, self.disk)
if fo is not None]
def clean_fds(self):
if self.sock is not None:
self.sock.close()
self.sock = None
if self.disk is not None:
self.disk.close()
self.disk = None
def pre_job(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# connect to the remote host
try:
self.sock.connect((self.raddr, self.rport))
except socket.error as exc:
logger.exception('Error while trying to connect to remote host %s',
exc.strerror)
self.clean_fds()
raise
# open local volume
try:
self.disk = io.open(self.volume.path, 'rb', 0)
except IOError:
logger.exception('Error while opening disk for export job')
self.clean_fds()
raise
def run_job(self):
checksum = self.HASH()
# sent_count = 0
# do copy
while self.running:
try:
read = self.disk.read(self.BUFFER_LEN)
except IOError:
sys.stderr.write('Error while reading from disk\n')
self.clean_fds()
break
# read length may be less than BUFFER_LEN but we don't care as it
# will go over TCP
if not read: # end of file
# sys.stderr.write('EOF, exported %d bytes\n' % sent_count)
break
# sent_count += len(read)
# sys.stderr.write('Read %d from disk\n' % len(read))
checksum.update(read)
try:
self.sock.sendall(read)
except socket.error:
sys.stderr.write('Error while sending through socket\n')
self.clean_fds()
break
self.checksum = checksum.hexdigest()
self.clean_fds()
class TCPTunnel(ForkedJob):
"""Handles a TCP tunnel."""
BUFFER_LEN = 8096
def __init__(self, job_manager, connect=None, listen='0.0.0.0'):
"""
:param job_manager: :class:`JobManager` instance
:param connect: where to connect one end of the tunnel (a tuple, as
given to socket.connect)
:param listen: which interface to listen to for the other end of the
tunnel
"""
ForkedJob.__init__(self, job_manager)
# create a new libev loop that will run inside our child
self.ev_loop = pyev.Loop()
self.connect = connect
self.listen = listen
#: port is assigned by the kernel
self.port = None
# keep state information for both ends
self.listen_state = 'CLOSED'
self.connect_state = 'CLOSED'
#: very basic error report
self.error = None
# these are the watchers
self.source_reader = None
self.source_writer = None
self.dest_reader = None
self.dest_writer = None
#: source_sock is the socket that will listen for remote|local to happen
self.source_sock = None
#: dest sock connects to an other setuped tunnel
self.dest_sock = None
# input buffer is used for data that is coming from source_sock and goes
# to dest_sock
self.input_buffer = SocketBuffer()
# output_buffer is usde for data that is coming from dest_sock and goes
# to source_sock
self.output_buffer = SocketBuffer()
@property
def open_fds(self):
return [fo.fileno() for fo in (self.source_sock, self.dest_sock)
if fo is not None]
def after_fork(self):
self.ev_loop.reset()
def close(self):
# as this could be called from child, don't use logger (this is for
# debug anyway)
sys.stderr.write('Closing job %d' % self.id)
# stop watchers
if self.source_reader is not None:
self.source_reader.stop()
self.source_reader = None
if self.source_writer is not None:
self.source_writer.stop()
self.source_writer = None
if self.dest_reader is not None:
self.dest_reader.stop()
self.dest_reader = None
if self.dest_writer is not None:
self.dest_writer.stop()
self.dest_writer = None
# close sockets
if self.source_sock is not None:
self.source_sock.close()
self.source_sock = None
if self.dest_sock is not None:
self.dest_sock.close()
self.dest_sock = None
# clear buffers (this memory won't be needed anyway)
self.input_buffer = None
self.output_buffer = None
# reset states
self.listen_state = 'CLOSED'
self.connect_state = 'CLOSED'
def stop(self):
self.close()
def setup_listen(self, interface=None):
"""Setup source socket.
:param interface: specify which interface to listen onto
"""
if interface is not None:
self.listening = interface
logger.debug('Setup listening %s %d', self.listen, self.id)
try:
self.source_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
logger.exception('Error while creating source_sock for tunnel job'
' %d', self.id)
self.close()
raise
try:
self.source_sock.setblocking(0)
except socket.error:
logger.exception('Cannot set source_sock in blocking mode for'
' tunnel job %d', self.id)
self.close()
raise
try:
self.source_sock.bind((self.listen, 0))
except socket.error:
logger.exception('Error while binding source_sock for tunnel job'
' %d', self.id)
self.close()
raise
self.port = self.source_sock.getsockname()[1]
logger.debug('Listening on port %s', self.port)
try:
self.source_sock.listen(1)
except socket.error:
logger.exception('Error while listening on source_sock for tunnel'
' job %d', self.id)
self.close()
raise
self.listen_state = 'LISTENING'
# ready to accept
self.source_reader = self.ev_loop.io(self.source_sock,
pyev.EV_READ, self.accept_cb)
self.source_reader.start()
def setup_connect(self, endpoint=None):
"""Start connection to remote end.
:param endpoint: specify where to connect (same as connect argument in
constructor), can be specified in both places
"""
if endpoint is not None:
self.connect = endpoint
if self.connect is None:
raise TunnelError('Remote endpoint to connect to was not specified')
logger.debug('Connect to endpoint %s %d', self.connect, self.id)
try:
if isinstance(self.connect, tuple):
addr_family = socket.AF_INET
else:
addr_family = socket.AF_UNIX
self.dest_sock = socket.socket(addr_family, socket.SOCK_STREAM)
except socket.error:
logger.exception('Error while creating dest_sock for tunnel job'
' %d', self.id)
self.close()
raise
try:
self.dest_sock.setblocking(0)
except socket.error:
logger.exception('Error while sitting non block mode on dest_sock'
' for tunnel job %d', self.id)
raise
error = self.dest_sock.connect_ex(self.connect)
if error and error != errno.EINPROGRESS:
raise socket.error('Error during connect for tunnel job, %s' %
os.strerror(error))
self.dest_writer = self.ev_loop.io(self.dest_sock,
pyev.EV_WRITE, self.connect_cb)
self.dest_writer.start()
self.connect_state = 'CONNECTING'
def run_job(self):
sys.stderr.write('Will start ev loop in child\n')
self.ev_loop.start()
def accept_cb(self, watcher, revents):
try:
new_source, remote = self.source_sock.accept()
except socket.error as exc:
if exc.errno == errno.EAGAIN or errno.EWOULDBLOCK:
# we will come back
return
# else
self.fatal_exc('Error while accepting new connection on'
' sock_source for tunnel job')
# everything went fine
self.source_sock.close() # we won't accept connections
self.source_sock = new_source
# set new socket non blocking
try:
self.source_sock.setblocking(0)
except socket.error as exc:
self.fatal_exc('Cannot set source socket in non blocking for'
' tunnel job: %s', exc.strerror)
self.source_reader.stop()
self.source_reader = self.ev_loop.io(new_source, pyev.EV_READ,
self.read_cb)
self.source_writer = self.ev_loop.io(new_source, pyev.EV_WRITE,
self.write_cb)
sys.stderr.write('Successfully accepted remote client %s for tunnel'
' job %d\n' % (remote, self.id))
self.listen_state = 'CONNECTED'
if self.connect_state == 'CONNECTED':
# start the watchers only if both ends are ready to accept data
self.source_reader.start()
self.dest_reader.start()
def connect_cb(self, watcher, revents):
# check that connection was a success
error = self.dest_sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if error:
self.fatal('Error during connect for tunnel job, %s\n' %
os.strerror(error))
# else we setup watcher with proper events
self.dest_reader = self.ev_loop.io(self.dest_sock, pyev.EV_READ,
self.read_cb)
self.dest_writer.stop()
self.dest_writer = self.ev_loop.io(self.dest_sock, pyev.EV_WRITE,
self.write_cb)
sys.stderr.write('Successfully connected to remote endpoint %s %d\n' %
(self.connect, self.id))
self.connect_state = 'CONNECTED'
if self.listen_state == 'CONNECTED':
# start the watchers only if both ends are ready to accept data
self.source_reader.start()
self.dest_reader.start()
def read_cb(self, watcher, revents):
if watcher == self.dest_reader:
# sys.stderr.write('Read event on dest %s\n' % self.id)
sock = self.dest_sock
buffer_ = self.output_buffer
other_watcher = self.source_writer
else:
# sys.stderr.write('Read event on source %s\n' % self.id)
sock = self.source_sock
buffer_ = self.input_buffer
other_watcher = self.dest_writer
# sys.stderr.write('Will loop into event\n')
while True:
try:
incoming = sock.recv(self.BUFFER_LEN)
except socket.error as exc:
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
# sys.stderr.write('EAGAIN\n')
break
# else: unexpected error
self.fatal_exc('Unexpected error while reading on socket'
' for tunnel job, %s\n', exc.strerror)
if not incoming:
# EOF
# sys.stderr.write('EOF\n')
self.close()
return
# sys.stderr.write('Read %d bytes\n' % len(incoming))
buffer_.append(incoming)
if buffer_.is_full():
# sys.stderr.write('Buffer is full\n')
watcher.stop()
break
# we did read some bytes that we could write to the other end
if not buffer_.is_empty():
# sys.stderr.write('Starting other watcher\n')
other_watcher.start()
# sys.stderr.write('Read event done\n')
def write_cb(self, watcher, revents):
if watcher == self.dest_writer:
# sys.stderr.write('Write event on dest %s', self.id)
sock = self.dest_sock
buffer_ = self.input_buffer
other_watcher = self.source_reader
else:
# sys.stderr.write('Write event on source %s\n' % self.id)
sock = self.source_sock
buffer_ = self.output_buffer
other_watcher = self.dest_reader
while True:
try:
to_send = buffer_.popleft()
except IndexError:
# buffer is empty, we should stop write event
# sys.stderr.write('Buffer is empty\n')
watcher.stop()
break
send_buffer = to_send
total_sent = 0
while True:
try:
written = sock.send(send_buffer)
except socket.error as exc:
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
buffer_.appendleft(to_send[total_sent:])
# sys.stderr.write('EAGAIN\n')
break
# else: unexpected error
self.fatal_exc('Unexpected error while writting on socket'
' for tunnel job, %s', exc.strerror)
# sys.stderr.write('Written %d bytes\n' % written)
if written == len(send_buffer):
break
# else
total_sent += written
send_buffer = buffer(to_send, total_sent)
# if we can read on the other end
if not buffer_.is_full():
# sys.stderr.write('Starting other watcher\n')
other_watcher.start()
# sys.stderr.write('Proccessed write event\n')
class DRBDAllocator(object):
"""Keeps a list of allocated DRBD devices."""
__metaclass__ = Singleton
RMMOD = '/sbin/rmmod'
MODPROBE = '/sbin/modprobe'
#: maximum number of DRBD devices
MINOR_MAX = 100
def __init__(self, main_loop):
self.volumes = set()
self.subproc_call = partial(subproc_call, main_loop)
self.reload_kernel_module()
def new_volume(self):
for i in xrange(self.MINOR_MAX):
if i not in self.volumes:
self.volumes.add(i)
break
else:
raise DRBDAllocationError('Cannot allocate DRBD volume')
return i
def remove_volume(self, id_):
self.volumes.remove(id_)
def reload_kernel_module(self):
# FIXME find an other way to set parameters to drbd module
# try to remove kernel module
try:
self.subproc_call([self.RMMOD, 'drbd'])
except CalledProcessError:
# this is not an error if drbd module wasn't loaded
if 'drbd' in open('/proc/modules').read():
logger.error('Cannot remove drbd kernel module')
raise
# load kernel module with proper parameters
try:
# we use greater minor_count than the default which seems to small.
# we set usermode helper to bin true because by default, the module
# is calling some drbd helpers that returns non 0 value and make the
# synchronisation halt.
self.subproc_call([self.MODPROBE, 'drbd',
'minor_count=%d' % self.MINOR_MAX,
'usermode_helper=/bin/true'])
except CalledProcessError:
logger.error('Cannot load drbd kernel module')
class DRBD(object):
"""Manage DRBD job."""
DMSETUP = '/sbin/dmsetup'
DRBDSETUP = '/sbin/drbdsetup'
DRBDMETA = '/sbin/drbdmeta'
DRBD_TIMEOUT = '30'
DRBD_RATE = '50000'
def __init__(self, job_manager, storage_index, lvm_pool, lvm_volume):
"""
:param job_manager: :class:`JobManager` instance
:param storage_index: :class:`StorageIndex` instance
:param lvm_pool: :class:`Storage` instance
:param lvm_volume: :class:`Volume` instance
"""
#: job id
self.id = job_manager.job_id.next()
self.subproc_call = partial(subproc_call, job_manager.main)
self.allocator = DRBDAllocator(job_manager.main)
# define a set of states
self.state = 'INIT'
self.storage = storage_index
self.pool = lvm_pool
self.volume = lvm_volume
self.meta_volume = None
#: DRBD id as returned by DRBDAllocator
self.drbd_id = None
self.drbd_port = None
#: DRBD device full path
self.drbd_path = None
self.drbd_table = None
self.drbd_status = dict(conn=None)
#: name of DM copy of LV
self.dm_table = None
self.dm_copy = '%s-%s.copy' % (
'vg', self.volume.name.replace('-', '--'))
# each step is executed in the RPC call thread, thus exception are
# propagated directly to the cc-server
def stop(self):
pass
def cleanup(self):
# reset DM to initial state
try:
table = self.subproc_call([self.DMSETUP, 'table', self.volume.path])
except CalledProcessError:
logger.error('Error while getting table of VM LV')
else:
if table != self.dm_table:
try:
self.subproc_call([self.DMSETUP, 'load', self.volume.path],
self.dm_table)
self.subproc_call([self.DMSETUP, 'suspend', self.volume.path])
self.subproc_call([self.DMSETUP, 'resume', self.volume.path])
except CalledProcessError:
logger.error('Error while loading back VM LV table')
# FIXME this is kind of critical, we should tell the user to
# call a Gaetant
# stop drbd volume
# if path_exists(self.drbd_path):
if self.drbd_id is not None:
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'disconnect'])
except CalledProcessError:
logger.error('Error while disconnecting DRBD device %s',
self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'secondary'])
except CalledProcessError:
logger.error('Error while switching DRBD device to secondary'
' (%s)', self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'detach'])
except CalledProcessError:
logger.error('Error while detaching DRBD device %s',
self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'down'])
except CalledProcessError:
logger.error('Error while bringing down DRBD device %s',
self.drbd_path)
self.allocator.remove_volume(self.drbd_id)
self.drbd_id = None
self.drbd_port = None
self.drbd_path = None
self.drbd_table = None
self.drbd_status = dict(conn=None)
# remove drbd meta volume
if self.meta_volume is not None:
try:
self.storage.delete_volume(
self.pool.name,
self.volume.name + '.drbdmeta',
)
except: # FIXME
logger.exception('Error while removing DRBD metadata LV')
self.meta_volume = None
# remove copy DM
if path_exists('/dev/mapper/' + self.dm_copy):
try:
self.subproc_call([self.DMSETUP, 'remove', self.dm_copy])
except CalledProcessError:
logger.error('Error while removing DM copy')
self.dm_table = None
# set mapper
def setup(self):
logger.debug('Create DRBD meta device')
self.meta_volume = self.storage.create_volume(
self.pool.name,
self.volume.name + '.drbdmeta',
# see
# http://www.drbd.org/users-guide/ch-internals.html#s-meta-data-size
# for external metadata size calculation
max(self.volume.capacity / 32768 + 4 * 2 ** 20, 128 * 2 ** 20),
)
logger.debug('Create a copy DM of the LV')
# get LV table
try:
self.dm_table = self.subproc_call([self.DMSETUP, 'table',
'--showkeys', self.volume.path])
except CalledProcessError:
logger.error('Cannot get DM table of VM LV')
raise DRBDError('Cannot get DM table of VM LV')
# create new DM
logger.debug('Got table of LV "%s"', self.dm_table)
try:
self.subproc_call([self.DMSETUP, 'create', self.dm_copy], self.dm_table)
except CalledProcessError:
logger.error('Cannot create copy DM of LV with table "%s"',
self.dm_table)
raise
logger.debug('Setup DRBD device')
# get drbd path
self.drbd_id = self.allocator.new_volume()
self.drbd_port = 7788 + self.drbd_id # FIXME magic number
self.drbd_path = '/dev/drbd%d' % self.drbd_id
# wipe drbd metadata (just in case)
try:
self.subproc_call([self.DRBDMETA, '--force', self.drbd_path,
'v08', self.meta_volume.path, '0', 'wipe-md'])
except CalledProcessError:
pass
try:
self.subproc_call([self.DRBDMETA, '--force', self.drbd_path,
'v08', self.meta_volume.path, '0', 'create-md'])
except CalledProcessError:
logger.error('Cannot create DRBD external metadata on device')
raise DRBDError('Cannot create DRBD metadata')
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'disk',
'/dev/mapper/%s' % self.dm_copy,
self.meta_volume.path,
'0', '--create-device'])
except CalledProcessError:
logger.error('Error while creating DRBD device')
raise DRBDError('Cannot create DRBD device')
self.drbd_table = '0 %d linear %s 0' % (
self.volume.capacity / 512, # FIXME comment
self.drbd_path,
)
logger.debug('Setup DRBD done')
self.state = 'SETUP'
def connect(self, remote_addr, remote_port):
logger.debug('Setup networking for DRBD')
# connect to remote node
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'net',
'0.0.0.0:%d' % self.drbd_port,
'%s:%d' % (remote_addr, remote_port),
'C', '-m', '-S', '10000000'])
except CalledProcessError:
logger.error('Error while setting up network facility for DRBD')
raise DRBDError('Cannot set up network for DRBD')
sleep(.5) # FIXME
logger.debug('Set up bandwidth limit')
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'syncer', '-r',
self.DRBD_RATE])
except CalledProcessError:
logger.error('Cannot set bandwidth rate limit on DRBD')
raise DRBDError('Error while setting bandwidth limit')
self.state = 'CONNECTED'
def wait_connection(self):
self.state = 'WAIT PEER CONNECT'
sleep(.5) # FIXME
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'wait-connect',
'-t', self.DRBD_TIMEOUT,
'-d', self.DRBD_TIMEOUT,
'-o', self.DRBD_TIMEOUT])
except CalledProcessError:
logger.error('Error while waiting for remote DRBD to connect,'
' timeout = %s', self.DRBD_TIMEOUT)
raise DRBDError('Error while waiting DRBD connect')
sleep(.5) # FIXME
self.state = 'CONNECTED'
def switch_primary(self):
logger.debug('Switch DRBD %s in primary mode', self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'primary', '-o'])
except CalledProcessError:
logger.error('Error while switching to primary role (%s)',
self.drbd_path)
raise DRBDError('Cannot switch to primary role')
self.state = 'CONNECTED PRIMARY'
def switch_secondary(self):
logger.debug('Switch DRBD %s in secondary mode', self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'secondary'])
except CalledProcessError:
logger.error('Error while switching to secondary role (%s)',
self.drbd_path)
raise DRBDError('Cannot switch to secondary role')
self.state = 'CONNECTED SECONDARY'
def wait_sync(self):
self.state = 'WAIT SYNC'
sleep(.5) # FIXME
logger.debug('Wait sync %s', self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'wait-sync'])
except CalledProcessError:
logger.error('Error while waiting for synchronisation of DRBD'
' device (%s)', self.drbd_path)
raise DRBDError('Wait sync error')
self.state = 'SYNC DONE'
def disconnect(self):
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'disconnect'])
except CalledProcessError:
logger.error('Error while disconnecting DRBD device %s',
self.drbd_path)
raise DRBDError('Cannot disconnect device')
self.state = 'DISCONNECTED'
def status(self):
"""DRBD status."""
try:
out = self.subproc_call([self.DRBDSETUP, self.drbd_path, 'status'])
except CalledProcessError:
logger.error('Error while getting DRBD status (%s)', self.drbd_path)
raise DRBDError('Status: error while executing DRBD status')
try:
status = et.ElementTree().parse(StringIO(out))
except:
logger.error('Error while parsing status command output for DRBD'
' device %s', self.drbd_path)
raise DRBDError('Status: cannot parse output')
self.drbd_status = dict(
conn=status.get('cs'),
disk=status.get('ds1'),
rdisk=status.get('ds2'),
role=status.get('ro1'),
rrole=status.get('ro2'),
percent=status.get('resynced_percent', None),
)
return self.drbd_status
def takeover(self):
"""Set up DRBD device as VM backing device."""
logger.debug('DRBD takeover %s', self.drbd_path)
assert self.drbd_table is not None
try:
self.subproc_call([self.DMSETUP, 'load', self.volume.path],
self.drbd_table)
except CalledProcessError:
logger.error('Error while loading new table for VM LV')
raise DRBDError('Takeover: cannot load DM table')
try:
self.subproc_call([self.DMSETUP, 'suspend', self.volume.path])
except CalledProcessError:
logger.error('Error while suspending VM LV')
raise DRBDError('Takeover: cannot suspend DM')
try:
self.subproc_call([self.DMSETUP, 'resume', self.volume.path])
except CalledProcessError:
logger.error('Error while resuming VM LV')
raise DRBDError('Takeover: cannot resume DM')
cc-node-v24/cloudcontrol/node/hypervisor/kvm.py 0000664 0000000 0000000 00000037544 12113401062 0022017 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
"""KVM hypervisor support."""
import re
import os
import sys
import signal
import logging
import weakref
import threading
import traceback
import libvirt
from cloudcontrol.common.client.utils import main_thread
from cloudcontrol.node.hypervisor.lib import (
DOMAIN_STATES, EVENTS,
EventLoop as VirEventLoop,
StorageIndex,
)
from cloudcontrol.node.hypervisor.domains import VirtualMachine
from cloudcontrol.node.utils import close_fds, set_signal_map, num_to_sig
from cloudcontrol.node.exc import VMMigrationError
logger = logging.getLogger(__name__)
BAD_VM_NAME = re.compile('^job-(0|([1-9]\d*))$')
# FIXME create abstract base class for any hypervisor
class KVM(object):
"""Container for all hypervisor related state."""
def __init__(self, name, handler):
"""
:param str name: name of hypervisor instance
:param Handler handler: hypervisor handler
"""
self.handler = weakref.proxy(handler)
#: hv attributes
self.name = name
self.type = u'kvm'
# register libvirt error handler
libvirt.registerErrorHandler(self.vir_error_cb, None)
# libvirt event loop abstraction
self.vir_event_loop = VirEventLoop(self.handler.main.evloop)
self.vir_con = libvirt.open('qemu:///system') # currently only support KVM
# findout storage
self.storage = StorageIndex(handler, self.vir_con)
logger.debug('Storages: %s', self.storage.paths)
#: domains: vms, containers...
self.domains = dict()
# find defined domains
for dom_name in self.vir_con.listDefinedDomains():
dom = self.vir_con.lookupByName(dom_name)
self.domains[dom.name()] = VirtualMachine(dom, self)
# find started domains
for dom_id in self.vir_con.listDomainsID():
dom = self.vir_con.lookupByID(dom_id)
self.domains[dom.name()] = VirtualMachine(dom, self)
logger.debug('Domains: %s', self.domains)
self.vir_con.domainEventRegister(self.vir_cb, None)
def stop(self):
self.vir_event_loop.stop()
# unregister callback
try:
self.vir_con.domainEventDeregister(self.vir_cb)
except libvirt.libvirtError:
# in case the libvirt connection is broken, it will raise the error
pass
ret = self.vir_con.close()
logger.debug('Libvirt still handling %s ref connections', ret)
def vir_error_cb(self, ctxt, err):
"""Libvirt error callback.
See http://libvirt.org/errors.html for more informations.
:param ctxt: arbitrary context data (not needed because context is
givent by self
:param err: libvirt error code
"""
logger.error('Libvirt error %s', err)
def vir_cb(self, conn, dom, event, detail, opaque):
"""Callback for libvirt event loop."""
logger.debug('Received event %s on domain %s, detail %s', event,
dom.name(), detail)
event = EVENTS[event]
if event == 'Added':
# prevents name conflicts with others cc-node objects
if BAD_VM_NAME.match(dom.name()):
logger.error('Cannot register VM %s as its name would '
'conflits with others cc-node objects')
return
# update Storage pools in case VM has volumes that were created
self.storage.update()
# if vm is redefined while running we need to refresh its devices
# when stopped
redefine_on_stop = False
if dom.name() in self.domains:
# sometimes libvirt send us the same event multiple times
# this can be the result of a change in the domain configuration
# we first remove the old domain
vm = self.domains.pop(dom.name())
self.handler.tag_db.remove_sub_object(vm.name)
if vm.state not in ('stopped', 'crashed'):
# if the vm was updated while it was "on", then the
# modifications will not be reflected since we construct the
# object/tags from running XML
redefine_on_stop = True
logger.debug('Domain %s recreated', dom.name())
self.vm_register(dom, redefine_on_stop)
elif event == 'Removed':
vm_name = dom.name()
self.vm_unregister(vm_name)
logger.info('Removed domain %s', vm_name)
elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved',
'Restored'):
vm = self.domains.get(dom.name())
# sometimes libvirt sent a start event before a created event so be
# careful
if vm is not None:
try:
state = DOMAIN_STATES[dom.info()[0]]
except libvirt.libvirtError as exc:
# checks that domain was not previously removed
# seems to happen only in libvirt 0.8.8
if 'Domain not found' in str(exc):
self.vm_unregister(dom.name())
else:
raise
else:
logger.info('Domain change state from %s to %s', vm.state,
state)
if event == 'Stopped' and vm.redefine_on_stop:
# if the vm was changed while it was running, then we
# need to recreate it now as stated above
self.vm_unregister(vm.name)
self.vm_register(dom)
else:
vm.state = state
self.update_domain_count()
def vm_register(self, dom, redefine_on_stop=False):
"""Register a VM to the hypervisor object.
:param dom: libvirt domain instance
:param redefine_on_stop: if we need to reread the domain XML on stop
"""
vm = VirtualMachine(dom, self)
logger.info('Created domain %s', vm.name)
vm.redefine_on_stop = redefine_on_stop
self.domains[vm.name] = vm
self.handler.tag_db.add_sub_object(vm.name, vm.tags.itervalues(), 'vm')
self.update_domain_count()
def vm_unregister(self, name):
"""Unregister a VM from the cc-server and remove it from the index."""
try:
vm = self.domains.pop(name)
except KeyError:
# domain already removed, see hypervisor/domains/vm_tags.py
# sometimes libvirt send us the remove event too late
# we still update storage and tag attributes
pass
else:
self.handler.tag_db.remove_sub_object(vm.name)
# update Storage pools in case VM had volumes that were deleted
self.storage.update()
self.update_domain_count()
def update_domain_count(self):
"""Update domain state count tags."""
# update domain state counts
for tag in ('nvm', 'vmpaused', 'vmstarted', 'vmstopped', 'cpualloc',
'cpurunning', 'memalloc', 'memrunning'):
self.handler.tag_db['__main__'][tag].update_value()
def vm_define(self, xml_desc):
"""Create a VM on the Hypervisor
:param str xml_desc: XML description in libvirt format
:return: VM name created
"""
try:
return self.vir_con.defineXML(xml_desc).name()
except libvirt.libvirtError:
logger.exception('Error while creating domain')
# reraise exception for the cc-server
raise
def _count_domain(self, filter=lambda d: True):
count = 0
for dom in self.domains.itervalues():
if filter(dom):
count += 1
return count
@property
def vm_started(self):
"""Number of VMs started."""
return self._count_domain(lambda d: d.state == 'running')
@property
def vm_stopped(self):
"""Number of VMs stopped."""
return self._count_domain(lambda d: d.state == 'stopped')
@property
def vm_paused(self):
"""Number of VMs paused."""
return self._count_domain(lambda d: d.state == 'paused')
@property
def vm_total(self):
"""Total number of VMs on the hypervisor."""
return self._count_domain()
class LiveMigration(object):
def __init__(self, main_loop, vm, node2virt_port, virt2virt_port, timeout,
unsafe=False):
"""Performs live migration in a forked process.
:param main_loop: instance of MainLoop
:param vm: instance of VM to migrate
:param node2virt_port: port for ccnode -> distant libvirt
:param virt2virt_port: port for local libvirt -> distant libvirt
:param float timeout: timeout for libvirt migration (prevents libvirt
from trying to acquire domain lock forever)
:param bool unsafe: for Libvirt >= 0.9.11, see
http://libvirt.org/html/libvirt-libvirt.html#virDomainMigrateFlags
"""
self.main = main_loop
self.vm = vm
self.node2virt_port = node2virt_port
self.virt2virt_port = virt2virt_port
self.timeout = timeout
self.unsafe = unsafe
#: child pid
self.pid = None
self.error_msg = None
self.return_status = None
# event for caller thread to wait migration termination
self.event = threading.Event()
self.do_fork()
def create_watchers(self):
self.timeout_watcher = self.main.evloop.timer(self.timeout, 0.,
self.timeout_cb)
self.child_watcher = self.main.evloop.child(self.pid, False,
self.child_cb)
self.timeout_watcher.start()
self.child_watcher.start()
def child_cb(self, watcher, revents):
self.pid = None
self.return_status = watcher.rstatus
watcher.stop()
if self.timeout_watcher.active:
self.timeout_watcher.stop()
self.child_watcher = None
logger.debug('Status: %s', self.return_status)
# test if killed, then set msg
if os.WIFSIGNALED(self.return_status):
signo = os.WTERMSIG(self.return_status)
if signo == signal.SIGKILL:
self.error_msg = 'Migration timeout for vm %s' % self.vm.name
else:
self.error_msg = 'Migration failed for vm %s, (%s)' % (
self.vm.name, num_to_sig(signo))
else:
# test status
status = os.WEXITSTATUS(self.return_status)
if status == 1:
self.error_msg = (
'Migration failed for vm %s, due to libvirt error'
% self.vm.name)
elif status == 4:
self.error_msg = 'Cannot open new connection to libvirt'
elif status == 5:
self.error_msg = 'Cannot open connection to remote libvirt'
elif status != 0:
self.error_msg = 'Migration failed for vm %s (%d)' % (
self.vm.name, status)
self.event.set()
def timeout_cb(self, watcher, revents):
# kill the young
logger.debug('Killing child migration process')
os.kill(self.pid, signal.SIGKILL)
@main_thread
def do_fork(self):
# we fork and open a new connection to libvirt because sometimes libvirt
# python binding, while doing a operation,
# doesn't seem to realease CPython's GIL, therefore all node
# operations are blocked
# the only solution we have found right now is to use a dedicated
# libvirt connection for the migration and fork, the migration operation
# in itself is handled by the child while other threads can be scheduled
try:
pid = os.fork()
except OSError:
logger.error('Cannot fork before running live migration')
raise
if pid == 0:
# child
try:
self.child_work()
except:
# whatever the matter is we MUST NOT return to libev or sjRPC
traceback.print_exc('Error uncatched')
finally:
os._exit(42)
self.pid = pid
self.create_watchers()
def child_work(self):
# migration is performed here
sys.stderr.write('Hello from child !\n')
sys.stderr.write('Debug is %s\n' % self.main.config.debug)
try:
close_fds(debug=self.main.config.debug)
set_signal_map({
signal.SIGTERM: lambda *args: os._exit(1),
signal.SIGUSR1: signal.SIG_IGN,
signal.SIGINT: signal.SIG_IGN,
# FIXME need more signal ?
})
except:
sys.stderr.write('Error while performing post fork work\n')
traceback.print_exc(file=sys.stderr)
# create a new libvirt connection dedicated to migration
sys.stderr.write('Open new connection to libvirt\n')
try:
new_con = libvirt.open('qemu:///system')
domain = new_con.lookupByUUIDString(self.vm.uuid)
except libvirt.libvirtError:
sys.stderr.write('Cannot connect to libvirt\n')
os._exit(4)
except:
# error
traceback.print_exc(sys.stderr)
os._exit(2)
sys.stderr.write('Open destination libvirt connection\n')
try:
dest_virt_con = libvirt.open(
'qemu+tcp://127.0.0.1:%d/system' % self.node2virt_port)
except libvirt.libvirtError:
sys.stderr.write('Cannot connect to remote libvirt for live'
' migrating vm %s', self.vm.name)
os._exit(5)
except:
# error
traceback.print_exc(file=sys.stderr)
os._exit(2)
try:
if self.unsafe:
# VIR_MIGRATE_UNSAFE is not defined for libvirt < 0.9.11
append_flags = getattr(libvirt, 'VIR_MIGRATE_UNSAFE', 0)
else:
append_flags = 0
sys.stderr.write('Do migrate\n')
domain.migrate(
dest_virt_con,
libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_PEER2PEER |
libvirt.VIR_MIGRATE_TUNNELLED |
libvirt.VIR_MIGRATE_PERSIST_DEST |
libvirt.VIR_MIGRATE_UNDEFINE_SOURCE |
append_flags,
None,
'qemu+tcp://127.0.0.1:%d/system' % self.virt2virt_port,
0,
)
except libvirt.libvirtError:
sys.stderr.write('libvirt error during migration\n')
traceback.print_exc(file=sys.stderr)
os._exit(1)
except:
# whatever the matter is we MUST NOT return to libev or sjRPC
sys.stderr.write('error during migration\n')
traceback.print_exc(file=sys.stderr)
os._exit(2)
else:
os._exit(0)
finally:
new_con.close()
dest_virt_con.close()
def wait(self):
self.event.wait()
if self.return_status != 0:
raise VMMigrationError(self.error_msg)
cc-node-v24/cloudcontrol/node/hypervisor/lib.py 0000664 0000000 0000000 00000034515 12113401062 0021763 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
"""Helpers for libvirt."""
import logging
from itertools import chain, imap, count
from StringIO import StringIO
from xml.etree import cElementTree as et
import pyev
import libvirt
from cloudcontrol.common.client.tags import Tag
from cloudcontrol.node.utils import Singleton
from cloudcontrol.node.exc import PoolStorageError
logger = logging.getLogger(__name__)
#: corresponding name for enum state of libvirt domains
# see http://libvirt.org/html/libvirt-libvirt.html#virDomainState
DOMAIN_STATES = (
'stopped', # 0 no state
'running', # 1 running
'blocked', # 2 blocked
'paused', # 3 paused
'running', # 4 shutdown
'stopped', # 5 shuttoff
'crashed', # 6 crashed
'suspended', # 7 suspended
)
STORAGE_STATES = (
'inactive',
'building',
'running',
'degraded',
'inaccessible',
'???', # 5
)
#: libvirt events
EVENTS = (
'Added',
'Removed',
'Started',
'Suspended',
'Resumed',
'Stopped',
'Saved',
'Restored',
)
# following event loop implementation was inspired by libvirt python example
# but updated to work with libev
class LoopHandler(object):
"""This class contains the data we need to track for a single file handle.
"""
def __init__(self, loop, handle, fd, events, cb, opaque):
self.handle = handle
self.fd = fd
self._events = self.virt_to_ev(events)
self._cb = cb
self.opaque = opaque
self.watcher = loop.io(self.fd, self._events, self.ev_cb,
None, pyev.EV_MAXPRI)
def ev_to_virt(self, events):
"""Convert libev events into libvirt one."""
result = 0
if events & pyev.EV_READ:
result |= libvirt.VIR_EVENT_HANDLE_READABLE
if events & pyev.EV_WRITE:
result |= libvirt.VIR_EVENT_HANDLE_WRITABLE
return result
def virt_to_ev(self, events):
"""Convert libvirt event to libev one."""
result = 0
if events & (libvirt.VIR_EVENT_HANDLE_READABLE |
libvirt.VIR_EVENT_HANDLE_ERROR |
libvirt.VIR_EVENT_HANDLE_HANGUP):
result |= pyev.EV_READ
if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
result |= pyev.EV_WRITE
return result
def _set(self):
self.watcher.stop()
if self._events != 0:
self.watcher.set(self.fd, self._events)
self.watcher.start()
@property
def events(self):
return self._events
@events.setter
def events(self, events):
self._events = self.virt_to_ev(events)
self._set()
def start(self):
self.watcher.start()
def stop(self):
self.watcher.stop()
def ev_cb(self, watcher, revents):
# convert events
events = self.ev_to_virt(revents)
self._cb(self.handle, self.watcher.fd, events, self.opaque[0],
self.opaque[1])
class LoopTimer(object):
"""This class contains the data we need to track for a single periodic
timer.
"""
def __init__(self, loop, timer, interval, cb, opaque):
self.timer = timer
self._interval = float(interval)
self._cb = cb
self.opaque = opaque
self.watcher = None
self.loop = loop
self._set()
def _set(self):
self.stop()
if self._interval >= 0.: # libvirt sends us interval == -1
self.watcher = self.loop.timer(self._interval, self._interval,
self.ev_cb, None, pyev.EV_MAXPRI)
else:
self.watcher = None
self.start()
@property
def interval(self):
return self._interval
@interval.setter
def interval(self, value):
self._interval = float(value)
self._set()
def start(self):
if self.watcher is not None:
self.watcher.start()
def stop(self):
if self.watcher is not None:
self.watcher.stop()
def ev_cb(self, *args):
self._cb(self.timer, self.opaque[0], self.opaque[1])
class EventLoop(object):
"""This class is used as an interface between the libvirt event handling and
the main pyev loop.
It cannot be used from other threads.
"""
# singletion usage is very important because libvirt.virEventRegisterImpl
# would segfaults on libvirt 0.8.X
__metaclass__ = Singleton
def __init__(self, loop):
"""
:param loop: pyev loop instance
"""
self.loop = loop
self.handle_id = count()
self.timer_id = count()
self.handles = dict()
self.timers = dict()
# This tells libvirt what event loop implementation it
# should use
libvirt.virEventRegisterImpl(
self.add_handle,
self.update_handle,
self.remove_handle,
self.add_timer,
self.update_timer,
self.remove_timer,
)
def add_handle(self, fd, events, cb, opaque):
"""Registers a new file handle 'fd', monitoring for 'events' (libvirt
event constants), firing the callback cb() when an event occurs.
Returns a unique integer identier for this handle, that should be
used to later update/remove it.
Note: unlike in the libvirt example, we don't use an interrupt trick as
we run everything in the same thread, furthermore, calling start watcher
method from a different thread could be dangerous.
"""
handle_id = self.handle_id.next()
h = LoopHandler(self.loop, handle_id, fd, events, cb, opaque)
h.start()
self.handles[handle_id] = h
# logger.debug('Add handle %d fd %d events %d', handle_id, fd, events)
return handle_id
def add_timer(self, interval, cb, opaque):
"""Registers a new timer with periodic expiry at 'interval' ms,
firing cb() each time the timer expires. If 'interval' is -1,
then the timer is registered, but not enabled.
Returns a unique integer identier for this handle, that should be
used to later update/remove it.
Note: same note as for :py:meth:`add_handle` applies here
"""
timer_id = self.timer_id.next()
h = LoopTimer(self.loop, timer_id, interval, cb, opaque)
h.start()
self.timers[timer_id] = h
# logger.debug('Add timer %d interval %d', timer_id, interval)
return timer_id
def update_handle(self, handle_id, events):
"""Change the set of events to be monitored on the file handle.
"""
h = self.handles.get(handle_id)
if h:
h.events = events
# logger.debug('Update handle %d fd %d events %d', handle_id, h.fd, events)
def update_timer(self, timer_id, interval):
"""Change the periodic frequency of the timer.
"""
t = self.timers.get(timer_id)
if t:
t.interval = interval
def remove_handle(self, handle_id):
"""Stop monitoring for events on the file handle.
"""
h = self.handles.pop(handle_id, None)
if h:
h.stop()
# logger.debug('Remove handle %d', handle_id)
def remove_timer(self, timer_id):
"""Stop firing the periodic timer.
"""
t = self.timers.pop(timer_id, None)
if t:
t.stop()
# logger.debug('Remove timer %d', timer_id)
def stop(self):
for handl in chain(self.handles.itervalues(), self.timers.itervalues()):
handl.stop()
self.handles = dict()
self.timers = dict()
class StorageIndex(object):
"""Keep an index of all storage volume paths."""
def __init__(self, handler, lv_con):
"""
:param handler: Hypervisor handler instance
:param lv_con: Libvirt connection
"""
self.handler = handler
self.lv_con = lv_con
self.storages = dict(
(s.name, s) for s in imap(
Storage,
imap(
lv_con.storagePoolLookupByName,
chain(
lv_con.listDefinedStoragePools(),
lv_con.listStoragePools(),
),
),
),
)
self.paths = None
self.update_path_index()
def update(self):
"""Update storage pools and volumes."""
# go through all storage pools and check if it is already in the index
for lv_storage in imap(
self.lv_con.storagePoolLookupByName,
chain(
self.lv_con.listDefinedStoragePools(),
self.lv_con.listStoragePools(),
),
):
if lv_storage.name() in self.storages:
# update
self.storages[lv_storage.name()].update()
else:
# add storage pool
s = Storage(lv_storage)
self.storages[s.name] = s
# add tags
self.handler.tag_db.add_tags((
Tag('sto%s_state' % s.name, lambda: s.state, 5, 5),
Tag('sto%s_size' % s.name, lambda: s.capacity, 5, 5),
Tag('sto%s_free' % s.name, lambda: s.available, 5, 5),
Tag('sto%s_used' % s.name,
lambda: s.capacity - s.available, 5, 5),
Tag('sto%s_type' % s.name, lambda: s.type, 5, 5),
))
self.update_path_index()
def update_path_index(self):
self.paths = dict(
(v.path, v) for v in chain.from_iterable(imap(
lambda s: s.volumes.itervalues(),
self.storages.itervalues(),
)),
)
def get_volume(self, path):
return self.paths.get(path)
def get_storage(self, name):
return self.storages.get(name)
def create_volume(self, pool_name, volume_name, capacity):
"""Create a new volume in the storage pool.
:param str name: name for the volume
:param int capacity: size for the volume
"""
# get volume
logger.debug('asked pool %s', pool_name)
logger.debug('Pool state %s', self.storages)
try:
pool = self.storages[pool_name]
except KeyError:
raise PoolStorageError('Invalid pool name')
if pool is None:
raise Exception('Storage pool not found')
try:
new_volume = pool.lv_storage.createXML("""
%s
%d
""" % (volume_name, capacity), 0)
except libvirt.libvirtError:
logger.exception('Error while creating volume')
raise
new_volume = Volume(new_volume)
# if success add the volume to the index
self.paths[new_volume.path] = new_volume
# and also to its storage pool
self.storages[new_volume.storage].volumes[new_volume.name] = new_volume
return new_volume
def delete_volume(self, pool_name, volume_name):
"""Delete a volume in the givent storage pool.
:param str pool_name: name for the storage pool
:param str volume_name: name for the volume
"""
# get volume
try:
pool = self.storages[pool_name]
except KeyError:
raise PoolStorageError('Invalid pool name')
try:
volume = pool.volumes[volume_name]
except KeyError:
raise PoolStorageError('Invalid volume name')
# delete from index
del self.paths[volume.path]
del self.storages[pool_name].volumes[volume_name]
# delete volume
try:
volume.lv_volume.delete(0)
except libvirt.libvirtError:
logger.exception('Error while deleting volume')
raise
class Storage(object):
"""Storage abstraction."""
def __init__(self, lv_storage):
"""
:param lv_storage: Libvirt pool storage instance
"""
self.uuid = lv_storage.UUID()
self.name = lv_storage.name()
self.lv_storage = lv_storage
self.state, self.capacity = None, None
self.allocation, self.available = None, None
self.type = et.ElementTree().parse(
StringIO(lv_storage.XMLDesc(0))).get('type')
self.volumes = dict((v.name, v) for v in imap(
Volume,
(lv_storage.storageVolLookupByName(n) for n in
lv_storage.listVolumes()),
))
self.update_attr()
def update(self):
self.update_attr()
# update volumes
for vol_name in self.lv_storage.listVolumes():
if vol_name in self.volumes:
# update volume
self.volumes[vol_name].update()
else:
# add volume
v = Volume(self.lv_storage.storageVolLookupByName(vol_name))
self.volumes[v.name] = v
def update_attr(self):
self.state, self.capacity, self.allocation, self.available = self.lv_storage.info()
self.state = STORAGE_STATES[self.state]
self.type = et.ElementTree().parse(
StringIO(self.lv_storage.XMLDesc(0))).get('type')
class Volume(object):
"""Volume abstraction."""
def __init__(self, lv_volume):
"""
:param lv_volume: Libvirt volume instance
"""
self.storage = lv_volume.storagePoolLookupByVolume().name()
self.path = lv_volume.path()
self.name = lv_volume.name()
self.capacity, self.allocation = None, None
self.lv_volume = lv_volume
self.update()
def update(self):
self.capacity, self.allocation = self.lv_volume.info()[1:]
cc-node-v24/cloudcontrol/node/hypervisor/tags.py 0000664 0000000 0000000 00000011072 12113401062 0022144 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import logging
from functools import wraps
import libvirt
from cloudcontrol.node.utils import and_
logger = logging.getLogger(__name__)
def _virt_tag(func):
"""Catches libvirt related exception.
Decorator used for tag declarations that interacts with libvirt.
"""
@wraps(func)
def decorated(handl):
if not handl.virt_connected:
return
try:
return func(handl)
except libvirt.libvirtError:
logger.exception('Unexpected libvirt error')
handl.vir_con_restart()
return decorated
def _check_virt_connected(func):
"""Check is libvirt is connected before caculating tag."""
@wraps(func)
def decorated(handl):
if not handl.virt_connected:
return
return func(handl)
return decorated
def vir_status(handl):
"""Local libvirt connection status."""
return {True: 'connected', False: 'disconnected'}[handl.virt_connected]
# hypervisor related tags
def htype():
"""Hypervisor type."""
# FIXME for other support
return u'kvm'
@_check_virt_connected
def hv(handl):
"""Hypervisor name."""
# What is the point of this tag ? if the information not already in a and id
# ?
return handl.hypervisor.name
def hvm():
"""Hardware virtualization enable."""
# see
# http://www.linux-kvm.org/page/FAQ#How_can_I_tell_if_I_have_Intel_VT_or_AMD-V.3F
# or
# http://www.cyberciti.biz/faq/linux-xen-vmware-kvm-intel-vt-amd-v-support/
# if we are in a xen hypervisor we won't see vt in /proc/cpuinfo
result = {True: u'yes', False: u'no'}
if htype() == u'kvm':
# findout in /proc/cpuinfo if all CPUs have virtualisation enabled
return result[and_(
set(
'vmx', # Intel VT
'svm', # AMD
) & set(
l.split(': ')[-1].split()
) for l in open('/proc/cpuinfo').readline() if l.startswith('Tags')
)]
return None
@_virt_tag
def hvver(handl):
"""Hypervisor version."""
return handl.hypervisor.vir_con.getVersion()
@_virt_tag
def libvirtver(handl):
"""Version of running libvirt."""
return handl.hypervisor.vir_con.getLibVersion()
# jobs
def rjobs():
"""Number of currently running jobs."""
# storage pools
@_check_virt_connected
def sto(handl):
"""Storage pool names."""
return u' '.join(handl.hypervisor.storage.storages.iterkeys())
# Vm related tags
@_check_virt_connected
def nvm(handl):
"""Number of VMS in the current hypervisor."""
return handl.hypervisor.vm_total
@_check_virt_connected
def vmpaused(handl):
"""Count of VMs paused."""
return handl.hypervisor.vm_paused
@_check_virt_connected
def vmstarted(handl):
"""Count of VMs started."""
return handl.hypervisor.vm_started
@_check_virt_connected
def vmstopped(handl):
"""Count of VMs Stopped."""
return handl.hypervisor.vm_stopped
@_check_virt_connected
def cpurunning(handl):
"""CPU total used by running VMs on the hypervisor."""
return sum(int(vm.tags['cpu'].value) for vm in
handl.hypervisor.domains.itervalues() if vm.tags['cpu'].value and
vm.state == 'running')
@_check_virt_connected
def cpualloc(handl):
"""CPU total used by all VMs on the hypervisor."""
return sum(int(vm.tags['cpu'].value) for vm in
handl.hypervisor.domains.itervalues() if vm.tags['cpu'].value)
@_check_virt_connected
def memrunning(handl):
"""Memory used by running VMs on the hypervisor."""
return sum(int(vm.tags['mem'].value) for vm in
handl.hypervisor.domains.itervalues() if vm.tags['mem'].value and
vm.state == 'running')
@_check_virt_connected
def memalloc(handl):
"""Memory used by all VMs on the hypervisor."""
return sum(int(vm.tags['mem'].value) for vm in
handl.hypervisor.domains.itervalues() if vm.tags['mem'].value)
cc-node-v24/cloudcontrol/node/jobs.py 0000664 0000000 0000000 00000027772 12113401062 0017747 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import errno
import logging
import os
import resource
import signal
import subprocess
import sys
import traceback
from threading import Thread, Event
from StringIO import StringIO
from itertools import count
from cloudcontrol.node.exc import JobError
from cloudcontrol.node.utils import num_to_sig, close_fds
logger = logging.getLogger(__name__)
class JobManager(object):
def __init__(self, main_loop):
"""
:param main_loop: :class:`MainLoop` instance
"""
self.job_id = count()
self.main = main_loop
#: keep an index of all jobs
self.jobs = {}
def job_start(self):
pass
def job_stop(self):
pass
def notify(self, job):
"""Called when a job is done."""
# by now only remove the job
self.remove(job.id)
def cancel(self, job_id):
"""Cancel a job."""
self.jobs[job_id].stop()
def remove(self, job_id):
try:
return self.jobs.pop(job_id)
except KeyError:
logger.error('Job %s does not exist', job_id)
def create(self, job_constructor, *args, **kwargs):
"""Create a new job and populate job id."""
job = job_constructor(self, *args, **kwargs)
self.jobs[job.id] = job
return job
def get(self, job_id):
return self.jobs[job_id]
def start(self):
pass
def stop(self):
logger.debug('Stopping all currently running jobs')
for job in self.jobs.itervalues():
try:
job.stop()
except Exception:
pass
class BaseThreadedJob(Thread):
"""Job running in a background thread.
Handles job notification to the job manager.
"""
def __init__(self, job_manager):
Thread.__init__(self)
#: report progress in %
self.progress = 0.
self.job_manager = job_manager
#: job id
self.id = job_manager.job_id.next()
self.running = False
def pre_job(self):
"""Job preparation that is called when doing start, it can raise
exceptions to report error to the caller. In the latter case, it will
also removes itself from the job list.
"""
pass
def run_job(self):
"""Overide this method to define what your job do."""
raise NotImplementedError
def run(self):
try:
self.run_job()
except Exception:
pass
finally:
self.running = False
def notify(self):
self.job_manager.notify(self)
def start(self):
"""Start job in a background thread."""
# first we run pre_job as it could raise an exception
try:
self.pre_job()
except Exception:
# in case of error we must remove the job from the manager
self.notify()
raise
# then we start the watcher when it's safe
self.running = True
Thread.start(self) # thread will signal when it's done using async
def wait(self):
"""For jobs running in a background, this method MUST be called in order
to remove the job from the list in the end.
"""
self.join()
self.notify()
def start_current(self):
"""Start job in current thread."""
try:
self.pre_job()
self.running = True
self.run_job()
# we could log exceptions here but rather do it inside the run_job
# method
finally:
self.notify()
def stop(self):
self.running = False
class ForkedJob(object):
"""Job that executes in a fork.
When inherit, you must define open_fds property that list file descriptors
that must be kept in the child and closed in the parent.
.. warning::
logging should not be used in the child as this would cause a deadlock
to occur, see http://bugs.python.org/issue6721
"""
def __init__(self, job_manager):
"""
:param job_manager: :class:`JobManager` instance
"""
self.job_manager = job_manager
#: job id
self.id = job_manager.job_id.next()
self.running = False
# event for other thread to wait for job termination
self.job_done = Event()
self.fork_pid = None
# internal event used to wait for forked process termination
self.fork_die = Event()
# libev child watcher
self.fork_watcher = None
# return status of forked process
self.return_status = None
def create_fork_watcher(self):
self.fork_watcher = self.job_manager.main.evloop.child(self.fork_pid, False, self.child_cb)
self.fork_watcher.start()
def child_cb(self, watcher, revents):
self.return_status = watcher.rstatus
self.fork_die.set()
watcher.stop()
self.fork_watcher = None
def pre_job(self):
"""This method represents any preparation job that must be done in the
parent before the fork.
"""
pass
def run_job(self):
"""This represent the work that will be done in the forked child.
This method MUST be redefined in subclasses.
"""
pass
def after_fork(self):
"""This method will be called just after fork in the child.
It does nothing by default and it can be redifined in subclasses.
"""
pass
def fatal(self, fmt, *args, **kwargs):
"""Write error message in stderr and exit.
:param str fmt: format string
:param \*args: arguments for format string
:param \*\*kwargs: can contain ('status', :int:) -> exit status of process
"""
try:
status = int(kwargs.get('status', 1))
except (ValueError, TypeError):
sys.stderr.write('Bad status argument %s' % status)
os._exit(42)
try:
fmt = fmt % args
except (ValueError, TypeError):
sys.stderr.write('Bad formatting for string: %s' % fmt)
os._exit(42)
try:
sys.stderr.write(fmt)
except IOError:
os._exit(42)
os._exit(status)
def fatal_exc(self, fmt, *args, **kwargs):
"""Write error message and traceback and exit.
:param str fmt: format string
:param \*args: arguments for format string
:param \*\*kwargs: can contain ('status', :int:) -> exit status of process
"""
tb = StringIO()
tb.write('\n')
traceback.print_exc(file=tb)
tb.write('\n')
fmt += '%s'
args = args + (tb.getvalue(),)
self.fatal(fmt, *args, status=kwargs.get('status', 1))
def reset_signal_mask(self):
signal.signal(signal.SIGTERM, lambda *args: os._exit(1))
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
def run(self):
"""This method performs all the hard work by doing the actual fork.
It catches all possible exceptions in the child, as this would prevents
the latter from going back in the stack and doing nasty things with
libev loop or sjRPC.
Thus you do not need to capture all exceptions in your code,
furthermore, if you need to exit from the child, you'd better use
`os._exit `_ function.
"""
try:
self.fork_pid = os.fork()
except OSError as exc:
logger.error('Cannot fork (job %s): %s', self.id, exc.strerror)
raise
self.running = True
if self.fork_pid == 0:
# child
# just hope to not receive any signals in between since there is no
# way to block signals in python :(
try:
self.reset_signal_mask()
close_fds(exclude_fds=self.open_fds,
debug=self.job_manager.main.config.debug)
self.after_fork()
except:
traceback.print_exc()
os._exit(1)
try:
self.run_job()
except:
sys.stderr.write('Error during job %s\n' % self.id)
traceback.print_exc()
os._exit(1)
else:
sys.stderr.write('Job execution went well %s\n' % self.id)
os._exit(0)
else:
self.create_fork_watcher()
# close child fds
for fd in self.open_fds:
try:
os.close(fd)
except OSError as exc:
if exc.errno == errno.EBADF:
# FIXME this is weird but it seems to happen sometimes
logger.debug('Error while closing fd %s in parent,'
' EBADF (job %s', fd, self.id)
continue
logger.error('Error while closing fds in parent: %s',
exc.strerror)
raise
def start(self):
"""This will start the job by executing :py:meth:`pre_job` method and
:py:meth:`run`.
"""
self.pre_job()
self.job_manager.main.call_in_main_thread(self.run)
def stop(self):
"""This would be called to stop the job.
"""
if self.fork_die.is_set():
return
try:
os.kill(self.fork_pid, signal.SIGKILL)
except OSError as exc:
if exc.errno == errno.ESRCH:
logger.debug('Child already killed')
return
logger.error('Cannot kill child for IO job: %s', exc.strerror)
raise
def notify(self):
self.job_manager.notify(self)
def wait(self):
"""This will wait for the fork to end and raise exception depending on
child return status.
.. warning::
This method MUST be called.
"""
if self.fork_pid is None:
return
try:
self.fork_die.wait()
if self.return_status >> 8 != 0:
if self.return_status & 0xff == signal.SIGKILL:
logger.error('Job was killed')
else:
raise JobError('Exception during job, returned %s, signal'
' %s' % (
self.return_status >> 8,
num_to_sig(self.return_status & 0xff)))
finally:
self.fork_pid = None
self.job_done.set()
self.notify()
def join(self):
"""This provides an API similar to `threading.Thread.join
`_
, you can wait
for the job termination from multiple points in your program but one of
these and only one MUST be `wait` method.
"""
self.job_done.wait()
class BaseIOJob(ForkedJob):
"""Fork job that set ionice on the child."""
#: level of io nice that will be set (see :manpage:`ionice(1)`)
IO_NICE = 7
def after_fork(self):
try:
subprocess.check_call(['ionice', '-n%d' % self.IO_NICE,
'-p%d' % os.getpid()], close_fds=True)
except subprocess.CalledProcessError as exc:
sys.stderr.write('Cannot set ionice, return code %s\n' % exc.returncode)
cc-node-v24/cloudcontrol/node/node.py 0000664 0000000 0000000 00000011000 12113401062 0017710 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import os
import logging
import logging.config
from functools import wraps
from cloudcontrol.common.client.loop import RPCStartHandler, MainLoop
from cloudcontrol.common.client.tags import Tag
from cloudcontrol.node import __version__
from cloudcontrol.node.config import NodeConfigParser, configure_logging
from cloudcontrol.node.jobs import JobManager
from cloudcontrol.node.exc import ForbiddenHandler
logger = logging.getLogger(__name__)
def _rights_check(handler_name):
"""Method instance decorator for checking permissions before executing an
RPC handler.
"""
def decorator(func):
@wraps(func)
def decorated(*args, **kwargs):
if handler_name in func.im_self.main.config.forbidden_handlers:
logger.error('Remote tried to call forbidden handler "%s"',
handler_name)
raise ForbiddenHandler('Forbidden handler "%s"' % handler_name)
return func(*args, **kwargs)
return decorated
return decorator
class NodeRPCStartHandler(RPCStartHandler):
def handle_authentication_response(self, response=None):
# set handler according to which role was returned by the cc-server
if response == self.loop.role and response is not None:
# we don't need to reload the plugins
# but we need to register the objects and tags
self.loop.tag_db.rpc_register()
elif response == u'host':
# close previous plugins if needed
if self.loop.role is not None:
self.loop.close_plugins()
logger.debug('Role host affected')
from cloudcontrol.node.host import Handler as HostHandler
self.loop.main_plugin = HostHandler(loop=self.loop)
self.loop.role = 'host'
# (re)-register the tags of the main loop
self.loop.tag_db.rpc_register()
self.loop.register_plugin(self.loop.main_plugin)
elif response == u'hv':
# close previous plugins if needed
if self.loop.role is not None:
self.loop.close_plugins()
logger.debug('Role hypervisor affected')
# set libvirt environement variables
os.environ['LIBVIRT_DEBUG'] = '4'
# os.environ['LIBVIRT_LOG_FILTERS'] = ''
os.environ['LIBVIRT_LOG_OUTPUT'] = '4:stderr'
# we don't import those modules at the top because some dependancies
# may not be installed
from cloudcontrol.node.hypervisor import Handler as HypervisorHandler
self.loop.main_plugin = HypervisorHandler(
hypervisor_name=self.loop.config.server_user,
loop=self.loop,
)
self.loop.role = 'hv'
# (re)-register the tags of the main loop
self.loop.tag_db.rpc_register()
self.loop.register_plugin(self.loop.main_plugin)
else:
logger.error('Failed authentication, role returned: %s', response)
self._goto(self.handle_error)
return
self._goto(self.handle_done)
class NodeLoop(MainLoop):
CONFIG_CLASS = NodeConfigParser
CONNECT_CLASS = NodeRPCStartHandler
DEFAULT_TAGS = (Tag(u'version', __version__),)
def __init__(self, config_path):
MainLoop.__init__(self, config_path)
self.job_manager = JobManager(self)
def reset_handler(self, name, handl):
# we decorate each handler for permissions to be checked before each
# invocation
MainLoop.reset_handler(self, name, _rights_check(name)(handl))
def configure_logging(self):
configure_logging(self.config.logging_level, self.config.logging_output)
def stop(self, watcher=None, revents=None):
MainLoop.stop(self, watcher, revents)
# stop running jobs
self.job_manager.stop()
cc-node-v24/cloudcontrol/node/utils.py 0000664 0000000 0000000 00000035665 12113401062 0020152 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import os
import gc
import sys
import types
import errno
import signal
import pickle
import logging
import resource
import threading
import traceback
import subprocess
from collections import deque
from functools import wraps
from subprocess import _eintr_retry_call
import pyev
from cloudcontrol.common.client.utils import main_thread
logger = logging.getLogger(__name__)
def and_(iter_):
"""Do an and logic condition over the iterable element.
:param iterable iter: meat for condition
"""
for i in iter_:
if not i:
return False
return True
def _main_thread(func):
"""EvPopen constructor decorator."""
@wraps(func)
def decorated(self, main_loop, *args, **kwargs):
return main_loop.call_in_main_thread(func, self, main_loop, *args, **kwargs)
return decorated
class EvPopen(subprocess.Popen):
@_main_thread
def __init__(self, main_loop, *args, **kwargs):
"""Class that acts as `subprocess.Popen` but uses libev child handling.
:param main_loop: `NodeLoop` instance
:param \*args: arguments for :py:class:`subprocess.Popen`
:param \*\*kwargs: keyword arguments for :py:class:`subprocess.Popen`
"""
self.main = main_loop
# this could raise but don't worry about zombies, they will be collected
# by libev
subprocess.Popen.__init__(self, *args, **kwargs)
# check stdout, stderr fileno and create watchers if needed
self.stdout_watcher = self.stderr_watcher = None
self.child_watcher = self.main.evloop.child(self.pid, False,
self.child_cb)
self.child_watcher.start()
self._stdout_output = list()
self._stderr_output = list()
# take an optional event for other threads to wait for process
# termination
self.stdout_done = threading.Event()
self.stderr_done = threading.Event()
self.process_done = threading.Event()
@main_thread
def create_std_watchers(self):
if self.stdout is not None:
self.stdout_watcher = self.main.evloop.io(self.stdout,
pyev.EV_READ,
self.stdout_cb)
self.stdout_watcher.start()
else:
self.stdout_done.set()
if self.stderr is not None and self.stderr.fileno() != self.stdout.fileno():
self.stderr_watcher = self.main.evloop.io(self.stderr,
pyev.EV_READ,
self.stderr_cb)
self.stderr_watcher.start()
else:
self.stderr_done.set()
def stdout_cb(self, watcher, revents):
data = os.read(watcher.fd, 1024)
if data:
self._stdout_output.append(data)
else:
self.stdout_watcher.stop()
self.stdout_watcher = None
self.stdout.close()
self.stdout = None
self.stdout_done.set()
def stderr_cb(self, watcher, revents):
data = os.read(watcher.fd, 1024)
if data:
self._stderr_output.append(data)
else:
self.stderr_watcher.stop()
self.stderr_watcher = None
self.stderr.close()
self.stderr = None
self.stderr_done.set()
def child_cb(self, watcher, revents):
self._handle_exitstatus(self.child_watcher.rstatus)
self.child_watcher.stop()
self.child_watcher = None
self.process_done.set()
# overiding parent methods
def _internal_poll(self, *args, **kwargs):
# ignore all parameters
return self.returncode
def _communicate(self, stdin=None):
self.create_std_watchers()
if stdin:
if self.stdin is None:
logger.warning('Ignoring stdin input for %s', self)
else:
fd = self.stdin.fileno()
while True:
count = os.write(fd, stdin)
if count == len(stdin):
self.stdin.close()
self.stdin = None
break
else:
stdin = stdin[:count]
self.stdout_done.wait()
self.stderr_done.wait()
# FIXME handle universal newlines
self.process_done.wait()
return tuple(map(u''.join, (self._stdout_output, self._stderr_output)))
# This is basically a copy-paste from stdlib subprocess module to
# prevent calling waitpid which would race with libev and would raise
# ECHILD
def _execute_child(self, args, executable, preexec_fn, close_fds,
cwd, env, universal_newlines,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite):
"""Execute program (POSIX version)"""
if isinstance(args, types.StringTypes):
args = [args]
else:
args = list(args)
if shell:
args = ["/bin/sh", "-c"] + args
if executable is None:
executable = args[0]
# For transferring possible exec failure from child to parent
# The first char specifies the exception type: 0 means
# OSError, 1 means some other error.
errpipe_read, errpipe_write = os.pipe()
try:
try:
self._set_cloexec_flag(errpipe_write)
gc_was_enabled = gc.isenabled()
# Disable gc to avoid bug where gc -> file_dealloc ->
# write to stderr -> hang. http://bugs.python.org/issue1336
gc.disable()
try:
self.pid = os.fork()
except:
if gc_was_enabled:
gc.enable()
raise
self._child_created = True
if self.pid == 0:
# Child
try:
# Close parent's pipe ends
if p2cwrite is not None:
os.close(p2cwrite)
if c2pread is not None:
os.close(c2pread)
if errread is not None:
os.close(errread)
os.close(errpipe_read)
# Dup fds for child
if p2cread is not None:
os.dup2(p2cread, 0)
if c2pwrite is not None:
os.dup2(c2pwrite, 1)
if errwrite is not None:
os.dup2(errwrite, 2)
# Close pipe fds. Make sure we don't close the same
# fd more than once, or standard fds.
if p2cread is not None and p2cread not in (0,):
os.close(p2cread)
if c2pwrite is not None and c2pwrite not in (p2cread, 1):
os.close(c2pwrite)
if errwrite is not None and errwrite not in (p2cread, c2pwrite, 2):
os.close(errwrite)
# Close all other fds, if asked for
if close_fds:
self._close_fds(but=errpipe_write)
if cwd is not None:
os.chdir(cwd)
if preexec_fn:
preexec_fn()
if env is None:
os.execvp(executable, args)
else:
os.execvpe(executable, args, env)
except:
exc_type, exc_value, tb = sys.exc_info()
# Save the traceback and attach it to the exception object
exc_lines = traceback.format_exception(exc_type,
exc_value,
tb)
exc_value.child_traceback = ''.join(exc_lines)
os.write(errpipe_write, pickle.dumps(exc_value))
# This exitcode won't be reported to applications, so it
# really doesn't matter what we return.
os._exit(255)
# Parent
if gc_was_enabled:
gc.enable()
finally:
# be sure the FD is closed no matter what
os.close(errpipe_write)
if p2cread is not None and p2cwrite is not None:
os.close(p2cread)
if c2pwrite is not None and c2pread is not None:
os.close(c2pwrite)
if errwrite is not None and errread is not None:
os.close(errwrite)
# Wait for exec to fail or succeed; possibly raising exception
# Exception limited to 1M
data = _eintr_retry_call(os.read, errpipe_read, 1048576)
finally:
# be sure the FD is closed no matter what
os.close(errpipe_read)
if data != "":
# _eintr_retry_call(os.waitpid, self.pid, 0)
child_exception = pickle.loads(data)
for fd in (p2cwrite, c2pread, errread):
if fd is not None:
os.close(fd)
raise child_exception
def wait(self):
self.process_done.wait()
return self.returncode
# end overiding
def close(self):
# stop std* watchers
if self.stdout_watcher is not None:
self.stdout_watcher.stop()
self.stdout_watcher = None
if self.stderr_watcher is not None:
self.stderr_watcher.stop()
self.stderr_watcher = None
# close std* file objects if needed
if self.stdin is not None:
self.stdin.close()
self.stdin = None
if self.stdout is not None:
self.stdout.close()
self.stdout = None
if self.stderr is not None:
self.stderr.close()
self.stderr = None
if self.child_watcher is not None:
self.child_watcher.stop()
self.child_watcher = None
if self.returncode is None:
# we must kill the child
self.kill()
# libev handles zombies
def subproc_call(main_loop, args, stdin=None):
"""
:param args: arguments for subprocess call
:param stdin: stdin data as string
"""
proc = EvPopen(main_loop, args, bufsize=4096, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
close_fds=True)
result, _ = proc.communicate(stdin)
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode,
'Error while executing command')
return result
class SocketBuffer(deque):
"""Holds bytes in a list.
This class don't handle maximum size but instead give help like handling
count automatically.
"""
def __init__(self, max_len=8 * 64 * 1024):
deque.__init__(self)
self.max_len = max_len
self.current_len = 0
def append(self, x):
deque.append(self, x)
self.current_len += len(x)
def appendleft(self, x):
deque.appendleft(self, x)
self.current_len += len(x)
def clear(self):
deque.clear(self)
self.current_len = 0
def extend(self, iterable):
raise NotImplementedError
def extendleft(self, iterable):
raise NotImplementedError
def pop(self):
elt = deque.pop(self)
self.current_len -= len(elt)
return elt
def popleft(self):
elt = deque.popleft(self)
self.current_len -= len(elt)
return elt
def remove(value):
raise NotImplementedError
def reverse(self):
raise NotImplementedError
def rotate(self, n):
raise NotImplementedError
def is_full(self):
return self.current_len >= self.max_len
def is_empty(self):
return self.current_len == 0
class Singleton(type):
"""Singleton metaclass."""
def __init__(cls, name, bases, dict):
super(Singleton, cls).__init__(cls, bases, dict)
cls._instance = None
def __call__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instance
def close_fds(exclude_fds=None, debug=False):
"""Close all fds uneeded fds in child when using fork.
:param exclude_fds: list of file descriptors that should not be closed (0,
1, 2 must not be set here, see debug)
:param bool debug: indicates if std in/out should be left open (usually for
debuging purpose)
"""
# get max fd
limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if limit == resource.RLIM_INFINITY:
max_fd = 2048
else:
max_fd = limit
if exclude_fds is None:
exclude_fds = []
if debug:
exclude_fds += [0, 1, 2] # debug
for fd in xrange(max_fd, -1, -1):
if fd in exclude_fds:
continue
try:
os.close(fd)
except OSError as exc:
if exc.errno != errno.EBADF:
raise
# wasn't open
if not debug:
sys.stdin = open(os.devnull)
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')
assert sys.stdin.fileno() == 0
assert sys.stdout.fileno() == 1
assert sys.stderr.fileno() == 2
def set_signal_map(map_):
"""Set signal map in fork children.
:param mapping map_: (signal code, handler)...
:returns: old handlers as dict
"""
previous_handlers = dict()
for sig, handler in map_.iteritems():
previous_handlers[sig] = signal.signal(sig, handler)
return previous_handlers
sig_names = dict((k, v) for v, k in signal.__dict__.iteritems() if
v.startswith('SIG'))
def num_to_sig(num):
"""Returns signal name.
:param num: signal number
"""
return sig_names.get(num, 'Unknown signal')
cc-node-v24/debian/ 0000775 0000000 0000000 00000000000 12113401062 0014207 5 ustar 00root root 0000000 0000000 cc-node-v24/debian/cc-node.default 0000664 0000000 0000000 00000000163 12113401062 0017065 0 ustar 00root root 0000000 0000000 # /etc/default/cc-node
# CloudControl node daemon options
# Set to true to enable the cc-node daemon
ENABLED=true
cc-node-v24/debian/cc-node.dirs 0000664 0000000 0000000 00000000026 12113401062 0016400 0 ustar 00root root 0000000 0000000 /var/lib/cc-node/jobs
cc-node-v24/debian/cc-node.init 0000775 0000000 0000000 00000010146 12113401062 0016411 0 ustar 00root root 0000000 0000000 #! /bin/sh
### BEGIN INIT INFO
# Provides: cc-node
# Required-Start: $local_fs $remote_fs $network $syslog
# Required-Stop: $local_fs $remote_fs $network $syslog
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: start the CloudControl node
# Description: starts the CloudControl node using start-stop-daemon
### END INIT INFO
# Author: Antoine Millet
# Thibault VINCENT
PATH=/sbin:/usr/sbin:/bin:/usr/bin
DESC="CloudControl node"
NAME=cc-node
DAEMON=/usr/bin/cc-node
PIDFILE=/var/run/$NAME.pid
DAEMON_OPTS="-d -p $PIDFILE"
# Defaults:
USER=root
GROUP=root
# Exit if the package is not installed
[ -x "$DAEMON" ] || exit 0
# Load various rcS variables
. /lib/init/vars.sh
# Override the VERBOSE variable so we always have feedback messages
VERBOSE=yes
# Define LSB log_* functions.
# Depend on lsb-base (>= 3.2-14) to ensure that this file is present
# and status_of_proc is working.
. /lib/lsb/init-functions
# Read and parse configuration variable file if it is present
[ -r /etc/default/$NAME ] && {
. /etc/default/$NAME
# Do not start if service is disabled
if [ "$ENABLED" != "true" ] ; then
echo "$DESC disabled, see /etc/default/$NAME"
exit 0
fi
}
#
# Function that starts the daemon/service
#
do_start()
{
start-stop-daemon --start --quiet --name $NAME --test \
--exec $(readlink -f $(which python)) > /dev/null || return 2
start-stop-daemon --start --quiet --pidfile $PIDFILE --chuid $USER:$GROUP \
--exec $DAEMON -- $DAEMON_OPTS || return 1
}
#
# Function that stops the daemon/service
#
do_stop()
{
start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 \
--pidfile $PIDFILE --name $NAME
RETVAL="$?"
[ "$RETVAL" = 2 ] && return 2
# Wait for children to finish too if this is a daemon that forks
# and if the daemon is only ever run from this initscript.
# If the above conditions are not satisfied then add some other code
# that waits for the process to drop all resources that could be
# needed by services started subsequently. A last resort is to
# sleep for some time.
start-stop-daemon --stop --quiet --oknodo --retry=0/30/KILL/5 \
--exec $DAEMON
[ "$?" = 2 ] && return 2
# Many daemons don't delete their pidfiles when they exit.
rm -f $PIDFILE
return "$RETVAL"
}
#
# Function that sends a SIGHUP to the daemon/service
#
do_reload() {
start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE \
--name $NAME
return 0
}
case "$1" in
start)
[ "$VERBOSE" != no ] && log_daemon_msg "Starting $DESC" "$NAME"
do_start
case "$?" in
0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;;
2) [ "$VERBOSE" != no ] && log_end_msg 1 ;;
esac
;;
stop)
[ "$VERBOSE" != no ] && log_daemon_msg "Stopping $DESC" "$NAME"
do_stop
case "$?" in
0|1) [ "$VERBOSE" != no ] && log_end_msg 0 ;;
2) [ "$VERBOSE" != no ] && log_end_msg 1 ;;
esac
;;
status)
status_of_proc "$DAEMON" "$NAME" && exit 0 || exit $?
;;
restart|force-reload)
#
# If the "reload" option is implemented then remove the
# 'force-reload' alias
#
[ "$VERBOSE" != no ] && log_daemon_msg "Restarting $DESC" "$NAME"
do_stop
case "$?" in
0|1)
do_start
case "$?" in
0) [ "$VERBOSE" != no ] && log_end_msg 0 ;;
1) [ "$VERBOSE" != no ] && log_end_msg 1 ;; # Old process is still running
*) [ "$VERBOSE" != no ] && log_end_msg 1 ;; # Failed to start
esac
;;
*)
# Failed to stop
[ "$VERBOSE" != no ] && log_end_msg 1
;;
esac
;;
*)
echo "Usage: $SCRIPTNAME {start|stop|status|restart|force-reload}" >&2
exit 3
;;
esac
:
cc-node-v24/debian/cc-node.install 0000664 0000000 0000000 00000000145 12113401062 0017107 0 ustar 00root root 0000000 0000000 usr/lib/python2.*/*/cloudcontrol/node
usr/lib/python2.*/*/cc_node-*
usr/bin/cc-node
etc/cc-node.conf
cc-node-v24/debian/cc-node.postinst 0000664 0000000 0000000 00000002434 12113401062 0017327 0 ustar 00root root 0000000 0000000 #!/bin/sh
set -e
CONF="/etc/cc-node.conf"
if [ -f "$CONF" ]; then
# secure the config file
chmod 0640 "$CONF"
# replace login by hostname if unset
if grep '\$\$LOGIN\$\$' "$CONF" >/dev/null; then
login=$(hostname)
echo "*** CC-Node login : ${login}"
sed -e "s/\\\$\\\$LOGIN\\\$\\\$/${login}/g" -i "$CONF"
fi
# generate a random password if unset
if grep '\$\$PASSWORD\$\$' "$CONF" >/dev/null; then
password=$(cat /dev/urandom | tr -dc A-Za-z0-9 | head -c 12)
echo "*** CC-Node password : ${password}"
sed -e "s/\\\$\\\$PASSWORD\\\$\\\$/${password}/g"\
-i "$CONF"
fi
fi
# hardcode debhelpers to start the daemon after pycentral
# Automatically added by dh_pycentral
rm -f /var/lib/pycentral/cc-node.pkgremove
if which pycentral >/dev/null 2>&1; then
pycentral pkginstall cc-node
if grep -qs '^cc-node$' /var/lib/pycentral/delayed-pkgs; then
sed -i '/^cc-node$/d' /var/lib/pycentral/delayed-pkgs
fi
fi
# End automatically added section
# Workaround to restart node after pycentral
if [ -x "/etc/init.d/cc-node" ]; then
update-rc.d cc-node defaults >/dev/null
if [ -x "`which invoke-rc.d 2>/dev/null`" ]; then
invoke-rc.d cc-node start || exit $?
else
/etc/init.d/cc-node start || exit $?
fi
fi
exit 0
cc-node-v24/debian/changelog 0000664 0000000 0000000 00000011460 12113401062 0016063 0 ustar 00root root 0000000 0000000 cc-node (24) unstable; urgency=low
* New upstream release
* Added permission for RPC handlers
* Fix bug when domain is updated while running, when restarted, update
informations about the domain
* Added back option to disallow remote execution
-- Anaël Beutot Wed, 27 Feb 2013 13:53:56 +0100
cc-node (23) unstable; urgency=low
* Update dependency on cc-common version 5
* Fix network interface list on VMs
* Fix some errors with invalid plugins persistence file
* Fix sto_vol tag conflict when libvirt connection is lost
-- Anaël Beutot Mon, 17 Dec 2012 14:38:34 +0100
cc-node (22) unstable; urgency=low
* Update dependency on cc-common version 3
* Added tags: membuffers, memcaches, sto_vol
* VMs can be started in pause state
-- Anael Beutot Tue, 20 Nov 2012 16:03:32 +0100
cc-node (21) unstable; urgency=low
* Fix console handler
-- Anael Beutot Tue, 06 Nov 2012 17:22:58 +0100
cc-node (20) unstable; urgency=low
* Fix plugin loading at startup
* Better erorr reporting for remote command execution
* Fix unicode/syslog issue
* Check libvirtd version before opening virtio console
* Updgraded to packaging source format squilt v3.0
* Fix ForkedJob (concerns migrations)
-- Anael Beutot Wed, 24 Oct 2012 17:24:20 +0200
cc-node (19) unstable; urgency=low
* Complete rewrite
* Better libvirt support (0.8.8 to 0.9.12)
* Only KVM support by now
* Remote shell
* Remote console (for VMs)
* Tag registration
* Support for remote script execution
* Support for user defined plugins
-- Anael Beutot Thu, 23 Aug 2012 17:20:46 +0200
cc-node (17) unstable; urgency=low
* Fix running in host mode without python-libvirt
-- Thibault VINCENT Fri, 24 Jun 2011 11:51:33 +0200
cc-node (16) unstable; urgency=low
* Xen fix
-- Thibault VINCENT Mon, 06 Jun 2011 11:03:38 +0200
cc-node (15) unstable; urgency=low
* Use Unix socket to bind tunnels to libvirt (default config compliance)
-- Thibault VINCENT Wed, 01 Jun 2011 16:41:30 +0200
cc-node (14) unstable; urgency=low
* New: live DRBD migration for Xen/KVM
* Fix: various as usual ...
-- Thibault VINCENT Wed, 01 Jun 2011 11:36:31 +0200
cc-node (11) unstable; urgency=low
* New: cold migration
* New: chassis tag
* New: VNC tag
* Fix: spurious VM unregistration
* Fix: wrong memory reported fow Xen host and Dom0
* Fix: missing storage pool/volume data when value is zero
* Fix: missing Xen VM arch tag
* And other code enhancement
-- Thibault VINCENT Tue, 08 Mar 2011 17:21:21 +0100
cc-node (10-1) unstable; urgency=low
* Just a new version
-- Thibault VINCENT Thu, 10 Feb 2011 18:14:22 +0100
cc-node (9) unstable; urgency=low
* Lots of fixes and improvements
* Faster with less contention
* Now returns real TTL, etc...
-- Thibault VINCENT Thu, 03 Feb 2011 15:11:37 +0100
cc-node (8) unstable; urgency=low
* Huge rewrite of libvirt wrapper and server handler
* Add new hardware-related tags
* Basecode ready for Xen integration
-- Thibault VINCENT Mon, 10 Jan 2011 18:49:28 +0100
cc-node (7) unstable; urgency=low
* New tags implemented for HV and VM
-- Thibault VINCENT Thu, 30 Dec 2010 13:28:19 +0100
cc-node (6) unstable; urgency=low
* Bug fix
* Proper release with newer init scripts
-- Thibault VINCENT Mon, 27 Dec 2010 17:12:40 +0100
cc-node (5-2) unstable; urgency=low
* Fix configuration option for HV detection
-- Thibault VINCENT Mon, 27 Dec 2010 14:10:52 +0100
cc-node (5-1) unstable; urgency=low
* Support for non-hypervisor nodes
* New tags implemented
* First migration features
-- Thibault VINCENT Mon, 27 Dec 2010 12:18:42 +0100
cc-node (4-1) unstable; urgency=low
* Fixed bugs
-- Antoine Millet Wed, 22 Dec 2010 17:50:48 +0100
cc-node (3-1) unstable; urgency=low
* Added $HOSTNAME variable on login field of config file.
* Fixed bugs
-- Antoine Millet Wed, 22 Dec 2010 16:47:14 +0100
cc-node (2-1) unstable; urgency=low
* New configuration in configuration file
* Changed name cc-hypervisor to cc-node
* Tags handled in list_vm
-- Antoine Millet Fri, 17 Dec 2010 11:43:42 +0100
cc-node (1-1) unstable; urgency=low
* Initial release.
-- Antoine Millet Mon, 06 Dec 2010 16:42:27 +0100
cc-node-v24/debian/compat 0000664 0000000 0000000 00000000002 12113401062 0015405 0 ustar 00root root 0000000 0000000 7
cc-node-v24/debian/control 0000664 0000000 0000000 00000001647 12113401062 0015622 0 ustar 00root root 0000000 0000000 Source: cc-node
Section: python
Priority: optional
Maintainer: Anaël Beutot
Build-Depends: debhelper (>= 7),
python-central (>= 0.6),
cdbs (>= 0.4.50),
python-setuptools,
python
XS-Python-Version: >= 2.6
Standards-Version: 3.9.1
Package: cc-node
Architecture: all
Depends: ${misc:Depends},
${python:Depends},
python-sjrpc ( >= 18 ),
python-psutil,
python-daemon,
cc-common ( >= 5 ),
python-pyev ( >= 0.8.1 )
XB-Python-Version: ${python:Versions}
Description: CloudControl node
This package provides node of CloudControl.
Package: cc-node-hypervisor
Architecture: all
Depends: ${misc:Depends}, cc-node (= ${binary:Version}), python-libvirt, drbd8-utils
Description: CC-node meta package for use with KVM hypervisor.
This package provides node of CloudControl with hypervisor capabilities.
cc-node-v24/debian/copyright 0000664 0000000 0000000 00000000174 12113401062 0016144 0 ustar 00root root 0000000 0000000 Files: *
Copyright: © 2012 Smartjog
License: LGPL-3
See /usr/share/common-licenses/LGPL-3 for a full copy of the license.
cc-node-v24/debian/rules 0000775 0000000 0000000 00000000573 12113401062 0015274 0 ustar 00root root 0000000 0000000 #!/usr/bin/make -f
# -*- makefile -*-
DEB_PYTHON_SYSTEM=pycentral
# Debhelper must be included before python-distutils to use
# dh_python / dh_pycentral / dh_pysupport
include /usr/share/cdbs/1/rules/debhelper.mk
include /usr/share/cdbs/1/class/python-distutils.mk
PYTHON_PACKAGES := cc-node
$(patsubst %,binary-install/%,$(PYTHON_PACKAGES))::
dh_pycentral -p$(cdbs_curpkg)
cc-node-v24/debian/source/ 0000775 0000000 0000000 00000000000 12113401062 0015507 5 ustar 00root root 0000000 0000000 cc-node-v24/debian/source/format 0000664 0000000 0000000 00000000015 12113401062 0016716 0 ustar 00root root 0000000 0000000 3.0 (native)
cc-node-v24/docs/ 0000775 0000000 0000000 00000000000 12113401062 0013715 5 ustar 00root root 0000000 0000000 cc-node-v24/docs/dev/ 0000775 0000000 0000000 00000000000 12113401062 0014473 5 ustar 00root root 0000000 0000000 cc-node-v24/docs/dev/Makefile 0000664 0000000 0000000 00000011032 12113401062 0016130 0 ustar 00root root 0000000 0000000 # Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) source
.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest
help:
@echo "Please use \`make ' where is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " singlehtml to make a single large HTML file"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " devhelp to make HTML files and a Devhelp project"
@echo " epub to make an epub"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " latexpdf to make LaTeX files and run them through pdflatex"
@echo " text to make text files"
@echo " man to make manual pages"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
clean:
-rm -rf $(BUILDDIR)/*
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
singlehtml:
$(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
@echo
@echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/CloudControlnode.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/CloudControlnode.qhc"
devhelp:
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
@echo
@echo "Build finished."
@echo "To view the help file:"
@echo "# mkdir -p $$HOME/.local/share/devhelp/CloudControlnode"
@echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/CloudControlnode"
@echo "# devhelp"
epub:
$(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
@echo
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make' in that directory to run these through (pdf)latex" \
"(use \`make latexpdf' here to do that automatically)."
latexpdf:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through pdflatex..."
make -C $(BUILDDIR)/latex all-pdf
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
text:
$(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
@echo
@echo "Build finished. The text files are in $(BUILDDIR)/text."
man:
$(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
@echo
@echo "Build finished. The manual pages are in $(BUILDDIR)/man."
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
cc-node-v24/docs/dev/source/ 0000775 0000000 0000000 00000000000 12113401062 0015773 5 ustar 00root root 0000000 0000000 cc-node-v24/docs/dev/source/architecture.rst 0000664 0000000 0000000 00000001535 12113401062 0021213 0 ustar 00root root 0000000 0000000 Architecture
============
Start up proccess
-----------------
See ``bin/cc-node``.
Summary of the steps:
Binary:
* First parse command line options and configuration file
* Switch to a daemon context
* Instanciate and launch a :class:`cloudcontrol.node.node.MainLoop`
Organisation of modules/packages
--------------------------------
.. tree ../../ccnode | grep -v \.pyc | grep -v \\.\\.
.. code-block:: text
|-- config.py
|-- exc.py
|-- host
| |-- __init__.py
| |-- tags.py
|-- hypervisor
| |-- domains
| | |-- __init__.py
| | |-- vm_tags.py
| |-- __init__.py
| |-- jobs.py
| |-- lib.py
| |-- tags.py
|-- __init__.py
|-- jobs.py
|-- node.py
|-- plugins.py
|-- tags.py
|-- utils.py
TODO
Node
----
.. automodule:: cloudcontrol.node.node
:members:
cc-node-v24/docs/dev/source/cold_migration.rst 0000664 0000000 0000000 00000000060 12113401062 0021513 0 ustar 00root root 0000000 0000000 Cold migration steps
====================
TODO
cc-node-v24/docs/dev/source/conf.py 0000664 0000000 0000000 00000016125 12113401062 0017277 0 ustar 00root root 0000000 0000000 # -*- coding: utf-8 -*-
#
# Cloud Control node documentation build configuration file, created by
# sphinx-quickstart on Thu Sep 15 15:48:00 2011.
#
# This file is execfile()d with the current directory set to its containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#sys.path.insert(0, os.path.abspath('.'))
# -- General configuration -----------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.coverage', 'sphinx.ext.viewcode']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'Cloud Control node'
copyright = u'2011, Smartjog'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '18'
# The full version, including alpha/beta/rc tags.
release = '18'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = []
# The reST default role (used for this markup: `text`) to use for all documents.
#default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# -- Options for HTML output ---------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# " v documentation".
#html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# If false, no module index is generated.
#html_domain_indices = True
# If false, no index is generated.
#html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
#html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
#html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = None
# Output file base name for HTML help builder.
htmlhelp_basename = 'CloudControlnodedoc'
# -- Options for LaTeX output --------------------------------------------------
# The paper size ('letter' or 'a4').
#latex_paper_size = 'letter'
# The font size ('10pt', '11pt' or '12pt').
#latex_font_size = '10pt'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
('index', 'CloudControlnode.tex', u'Cloud Control node Documentation',
u'Smartjog', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# If true, show page references after internal links.
#latex_show_pagerefs = False
# If true, show URL addresses after external links.
#latex_show_urls = False
# Additional stuff for the LaTeX preamble.
#latex_preamble = ''
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# If false, no module index is generated.
#latex_domain_indices = True
# -- Options for manual page output --------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
('index', 'cloudcontrolnode', u'Cloud Control node Documentation',
[u'Smartjog'], 1)
]
# -- Options for autodoc ------------------------------------------------------
autoclass_content = 'both'
autodoc_member_order = 'bysource'
cc-node-v24/docs/dev/source/exc.rst 0000664 0000000 0000000 00000000113 12113401062 0017277 0 ustar 00root root 0000000 0000000 Exceptions
==========
.. automodule:: cloudcontrol.node.exc
:members:
cc-node-v24/docs/dev/source/host.rst 0000664 0000000 0000000 00000000236 12113401062 0017503 0 ustar 00root root 0000000 0000000 Host
====
Handler
-------
.. autoclass:: cloudcontrol.node.host.Handler
:members:
Tags
----
.. automodule:: cloudcontrol.node.host.tags
:members:
cc-node-v24/docs/dev/source/hot_migration.rst 0000664 0000000 0000000 00000045511 12113401062 0021376 0 ustar 00root root 0000000 0000000 Live migration steps
====================
See http://libvirt.org/migration.html for migration details using libvirt.
This page explains the live migration scenario between two hypervisors hv1 and
hv2.
We try to move a VM from hv1 to hv2 whose name is "TestVM".
VM has a disk which is a LVM backend named "/dev/vg/TestVM".
A shell command is specified for each step to reproduce the steps.
it-test-16.lab.fr.lan is the source hypervisor (where the VM is)
it-test-17.lab.fr.lan is the destination hypervisor (where we migrate the VM)
[...] in the command results means some lines were removed.
TODO schematics
1 - Pause VM
------------
We must pause VM while we manipulate the device mapper because I/O errors would
occur otherwise.::
virsh suspend TestVM
2 - Set up DRBD
---------------
Reload kernel module with proper parameters::
it-test-16.lab.fr.lan ~ 2 # modprobe drbd minor_count=100 usermode_helper=/bin/true
it-test-16.lab.fr.lan ~ 0 #
it-test-17.lab.fr.lan ~ 2 # modprobe drbd minor_count=100 usermode_helper=/bin/true
it-test-17.lab.fr.lan ~ 0 #
Create DRBD device metadata (external).
See http://www.drbd.org/users-guide-emb/ch-internals.html#s-external-meta-data
for more information.
We find the metadata size we need::
it-test-16.lab.fr.lan ~ 130 # ipython
Python 2.7.3rc2 (default, Apr 22 2012, 22:30:17)
Type "copyright", "credits" or "license" for more information.
IPython 0.12 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
In [1]: import libvirt
In [2]: c = libvirt.open('qemu:///system')
In [4]: c.listDomainsID()
Out[4]: [6, 7]
In [7]: d = c.lookupByID(7)
In [8]: d.name()
Out[8]: 'TestVMDisk-small-16'
In [9]: d.XMLDesc(0)
Out[9]: "\n
[...]
\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
[...]
"
In [10]: s = c.storagePoolLookupByName('vg')
In [13]: s.listVolumes()
Out[13]: ['TestVMDisk-16', 'TestVMDisk-small-16', 'TestVMDisk-17']
In [15]: v = c.storageVolLookupByPath('/dev/vg/TestVMDisk-small-16')
In [16]: v.name()
Out[16]: 'TestVMDisk-small-16'
In [18]: _, capacity, _ = v.info()
In [19]: capacity
Out[19]: 209715200L
In [20]: drbd_meta_size = max(capacity / 32768 + 4 * 2 ** 20, 128 * 2 ** 20)
In [21]: drbd_meta_size
Out[21]: 134217728
Create the LV for the VM on the destination hypervisor::
it-test-17.lab.fr.lan ~ 0 # lvcreate -n TestVMDisk-small-16 -L 200m vg
Logical volume "TestVMDisk-small-16" created
it-test-17.lab.fr.lan ~ 0 # lvs
LV VG Attr LSize Origin Snap% Move Log Copy% Convert
TestVMDisk-small-16 vg -wi-a- 200.00m
We create the metadata LV with the appropriate size on both hypervisors::
it-test-16.lab.fr.lan ~ 5 # lvcreate -v -n TestVMDisk-small-16.drbdmeta -L 134217728b vg
Setting logging type to disk
Finding volume group "vg"
Archiving volume group "vg" metadata (seqno 4).
Creating logical volume TestVMDisk-small-16.drbdmeta
Creating volume group backup "/etc/lvm/backup/vg" (seqno 5).
Found volume group "vg"
activation/volume_list configuration setting not defined, checking only host tags for vg/TestVMDisk-small-16.drbdmeta
Creating vg-TestVMDisk--small--16.drbdmeta
Loading vg-TestVMDisk--small--16.drbdmeta table (253:3)
Resuming vg-TestVMDisk--small--16.drbdmeta (253:3)
Clearing start of logical volume "TestVMDisk-small-16.drbdmeta"
Creating volume group backup "/etc/lvm/backup/vg" (seqno 5).
Logical volume "TestVMDisk-small-16.drbdmeta" created
it-test-16.lab.fr.lan ~ 0 # lvs
LV VG Attr LSize Origin Snap% Move Log Copy% Convert
[...]
TestVMDisk-small-16 vg -wi-ao 200.00m
TestVMDisk-small-16.drbdmeta vg -wi-a- 128.00m
it-test-17.lab.fr.lan ~ 3 # lvcreate -v -n TestVMDisk-small-16.drbdmeta -L 134217728b vg
Setting logging type to disk
Finding volume group "vg"
Archiving volume group "vg" metadata (seqno 2).
Creating logical volume TestVMDisk-small-16.drbdmeta
Creating volume group backup "/etc/lvm/backup/vg" (seqno 3).
Found volume group "vg"
activation/volume_list configuration setting not defined, checking only host tags for vg/TestVMDisk-small-16.drbdmeta
Creating vg-TestVMDisk--small--16.drbdmeta
Loading vg-TestVMDisk--small--16.drbdmeta table (253:1)
Resuming vg-TestVMDisk--small--16.drbdmeta (253:1)
Clearing start of logical volume "TestVMDisk-small-16.drbdmeta"
Creating volume group backup "/etc/lvm/backup/vg" (seqno 3).
Logical volume "TestVMDisk-small-16.drbdmeta" created
Create a device mapper with a different name (.copy) that points to the LV::
it-test-16.lab.fr.lan ~ 0 # dmsetup table --showkeys /dev/vg/TestVMDisk-small-16
0 409600 linear 9:126 8300544
it-test-16.lab.fr.lan ~ 0 # dmsetup table --showkeys /dev/vg/TestVMDisk-small-16 | dmsetup create vg-TestVMDisk--small--16.copy
it-test-17.lab.fr.lan ~ 0 # dmsetup table --showkeys /dev/vg/TestVMDisk-small-16
0 409600 linear 9:126 2048
it-test-17.lab.fr.lan ~ 0 # dmsetup table --showkeys /dev/vg/TestVMDisk-small-16 | dmsetup create vg-TestVMDisk--small--16.copy
it-test-17.lab.fr.lan ~ 0 # dmsetup table --showkeys /dev/vg/TestVMDisk-small-16
0 409600 linear 9:126 2048
it-test-17.lab.fr.lan ~ 0 # dmsetup table --showkeys
vg-TestVMDisk--small--16: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 411648
Wipe and initialize drbd metadata::
it-test-16.lab.fr.lan ~ 20 # drbdmeta --force /dev/drbd0 v08 /dev/vg/TestVMDisk-small-16.drbdmeta 0 wipe-md
There appears to be no drbd meta data to wipe out?
it-test-16.lab.fr.lan ~ 0 # drbdmeta --force /dev/drbd0 v08 /dev/vg/TestVMDisk-small-16.drbdmeta 0 create-md
md_offset 0
al_offset 4096
bm_offset 36864
Found some data
==> This might destroy existing data! <==
Do you want to proceed?
*** confirmation forced via --force option ***
Writing meta data...
initializing activity log
NOT initialized bitmap
New drbd meta data block successfully created.
it-test-17.lab.fr.lan ~ 0 # drbdmeta --force /dev/drbd0 v08 /dev/vg/TestVMDisk-small-16.drbdmeta 0 wipe-md
There appears to be no drbd meta data to wipe out?
it-test-17.lab.fr.lan ~ 0 # drbdmeta --force /dev/drbd0 v08 /dev/vg/TestVMDisk-small-16.drbdmeta 0 create-md
md_offset 0
al_offset 4096
bm_offset 36864
Found some data
==> This might destroy existing data! <==
Do you want to proceed?
*** confirmation forced via --force option ***
Writing meta data...
initializing activity log
NOT initialized bitmap
New drbd meta data block successfully created.
Create DRBD device with LV (.copy) and metadata::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 disk /dev/mapper/vg-TestVMDisk--small--16.copy /dev/vg/TestVMDisk-small-16.drbdmeta 0 --create-device
it-test-16.lab.fr.lan ~ 0 # ls /dev/drb*
/dev/drbd0
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 disk /dev/mapper/vg-TestVMDisk--small--16.copy /dev/vg/TestVMDisk-small-16.drbdmeta 0 --create-device
it-test-17.lab.fr.lan ~ 2 # ls /dev/drb*
/dev/drbd0
Connect DRBD together::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 net 0.0.0.0:7788 192.168.32.209:7788 C -m -S 10000000 &
[1] 9439
[1]+ Done drbdsetup /dev/drbd0 net 0.0.0.0:7788 192.168.32.209:7788 C -m -S 10000000
it-test-16.lab.fr.lan ~ 0 # netstat -t
Active Internet connections (w/o servers)
Proto Recv-Q Send-Q Local Address Foreign Address State
[...]
tcp 0 0 it-test-16.lab.fr.:7788 it-test-17.lab.fr:53443 ESTABLISHED
tcp 0 0 it-test-16.lab.fr:54046 it-test-17.lab.fr.:7788 ESTABLISHED
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 net 0.0.0.0:7788 192.168.32.208:7788 C -m -S 10000000 &
[1] 8113
[...]
[1]+ Done drbdsetup /dev/drbd0 net 0.0.0.0:7788 192.168.32.208:7788 C -m -S 10000000
it-test-17.lab.fr.lan ~ 0 # netstat -t
Active Internet connections (w/o servers)
Proto Recv-Q Send-Q Local Address Foreign Address State
[...]
tcp 0 0 it-test-17.lab.fr.:7788 it-test-16.lab.fr:54046 ESTABLISHED
tcp 0 0 it-test-17.lab.fr:53443 it-test-16.lab.fr.:7788 ESTABLISHED
Set synchronization rate::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 syncer -r 50000
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 syncer -r 50000
Make sure DRBD is connected to its peer::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 wait-connect -t 60 -d 60 -o 60
it-test-16.lab.fr.lan ~ 0 #
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 wait-connect -t 60 -d 60 -o 60
it-test-17.lab.fr.lan ~ 0 #
Set roles for DRBD (source is primary and destination is secondary)::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 primary -o
it-test-16.lab.fr.lan ~ 0 #
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 secondary
it-test-17.lab.fr.lan ~ 0 #
Wait for synchronisation to complete::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 wait-sync
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 status
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 wait-sync
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 status
Set DRBD in primary primary mode (see
http://www.drbd.org/users-guide/ch-admin.html#s-roles and
http://www.drbd.org/users-guide/s-enable-dual-primary.html
for more information)::
it-test-17.lab.fr.lan ~ 1 # drbdsetup /dev/drbd0 primary -o
it-test-17.lab.fr.lan ~ 0 #
Set the original device mapper to the DRBD device (ie takeover)::
it-test-16.lab.fr.lan ~ 0 # python -c 'print "0 %d linear /dev/drbd0 0" % (134217728 / 512)' | dmsetup load /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--17: 0 8192000 linear 9:126 8710144
vg-TestVMDisk--16: 0 8298496 linear 9:126 2048
vg-TestVMDisk--small--16: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 16902144
it-test-16.lab.fr.lan ~ 0 # dmsetup suspend /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup resume /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--17: 0 8192000 linear 9:126 8710144
vg-TestVMDisk--16: 0 8298496 linear 9:126 2048
vg-TestVMDisk--small--16: 0 262144 linear 147:0 0
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 16902144
it-test-17.lab.fr.lan ~ 0 # python -c 'print "0 %d linear /dev/drbd0 0" % (134217728 / 512)' | dmsetup load /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--small--16: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 411648
it-test-17.lab.fr.lan ~ 0 # dmsetup suspend /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup resume /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--small--16: 0 262144 linear 147:0 0
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 411648
3 - Resume VM
-------------
FIXME ?
::
virsh resume TestVMDisk-small-16
4 - Wait for sync
-----------------
FIXME ?
Wait for DRBD to synchronise. Synchronisation status can be checked with the
following command::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 status
When done, setup DRBD in primary-primary mode.
5 - Migrate
-----------
::
it-test-16.lab.fr.lan ~ 0 # ipython
Python 2.7.3rc2 (default, Apr 22 2012, 22:30:17)
Type "copyright", "credits" or "license" for more information.
IPython 0.12 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
In [1]: import libvirt
In [2]: c = libvirt.open('qemu:///system')
In [3]: dest_c = libvirt.open('qemu+tcp://it-test-17.lab.fr.lan:1234/system')
In [5]: c.listDomainsID()
Out[5]: [6, 7]
In [8]: flags = libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_PEER2PEER | libvirt.VIR_MIGRATE_TUNNELLED | libvirt.VIR_MIGRATE_PERSIST_DEST | libvirt.VIR_MIGRATE_UNDEFINE_SOURCE
In [9]: flags
Out[9]: 31
In [11]: d = c.lookupByID(7)
In [12]: d.name()
Out[12]: 'TestVMDisk-small-16'
In [20]: d.migrate(dest_c, flags, None, 'qemu+tcp://it-test-17.lab.fr.lan:1234/system', 0)
Out[20]:
In case your virtio driver is not configured with cache set to 'none' you will
end up with the following message::
In [13]: d.migrate(dest_c, flags, None, 'qemu+tcp://it-test-17.lab.fr.lan:1234/system', 0)
libvir: QEMU error : Unsafe migration: Migration may lead to data corruption if disks use cache != none
---------------------------------------------------------------------------
libvirtError Traceback (most recent call last)
/root/ in ()
----> 1 d.migrate(dest_c, flags, None, 'qemu+tcp://it-test-17.lab.fr.lan:1234/system', 0)
/usr/lib/python2.7/dist-packages/libvirt.pyc in migrate(self, dconn, flags, dname, uri, bandwidth)
820 else: dconn__o = dconn._o
821 ret = libvirtmod.virDomainMigrate(self._o, dconn__o, flags, dname, uri, bandwidth)
--> 822 if ret is None:raise libvirtError('virDomainMigrate() failed', dom=self)
823 __tmp = virDomain(self,_obj=ret)
824 return __tmp
libvirtError: Unsafe migration: Migration may lead to data corruption if disks use cache != none
You may force the migration process by setting migration flags (libvirt >=
0.9.11)::
In [14]: flags |= libvirt.VIR_MIGRATE_UNSAFE
When done, setup DRBD in secondary-primary mode.
6 - Pause VM
------------
We must pause VM while we manipulate the device mapper because I/O errors would
occur otherwise.
7 - Remove DRBD
---------------
Remove DRBD::
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 disconnect
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 detach
/dev/drbd0: State change failed: (-2) Need access to UpToDate data
it-test-17.lab.fr.lan ~ 17 # drbdsetup /dev/drbd0 down
it-test-17.lab.fr.lan ~ 0 #
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 disconnect
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 detach
/dev/drbd0: State change failed: (-2) Need access to UpToDate data
it-test-16.lab.fr.lan ~ 17 # drbdsetup /dev/drbd0 down
it-test-16.lab.fr.lan ~ 0 #
Remove DM::
it-test-17.lab.fr.lan ~ 0 # echo '0 409600 linear 9:126 8300544' | dmsetup load /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--small--16: 0 262144 linear 147:0 0
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 411648
it-test-17.lab.fr.lan ~ 0 # dmsetup suspend /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup resume /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--small--16: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 411648
it-test-16.lab.fr.lan ~ 0 # echo '0 409600 linear 9:126 8300544' | dmsetup load /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--17: 0 8192000 linear 9:126 8710144
vg-TestVMDisk--16: 0 8298496 linear 9:126 2048
vg-TestVMDisk--small--16: 0 262144 linear 147:0 0
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 16902144
it-test-16.lab.fr.lan ~ 0 # dmsetup suspend /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup resume /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--17: 0 8192000 linear 9:126 8710144
vg-TestVMDisk--16: 0 8298496 linear 9:126 2048
vg-TestVMDisk--small--16: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 16902144
Remove copy DM::
it-test-16.lab.fr.lan ~ 0 # dmsetup remove /dev/mapper/vg-TestVMDisk--small--16.copy
it-test-16.lab.fr.lan ~ 0 #
it-test-17.lab.fr.lan ~ 0 # dmsetup remove /dev/mapper/vg-TestVMDisk--small--16.copy
it-test-17.lab.fr.lan ~ 0 #
Remove metadata LV::
it-test-17.lab.fr.lan ~ 5 # lvremove /dev/vg/TestVMDisk-small-16.drbdmeta
Do you really want to remove active logical volume TestVMDisk-small-16.drbdmeta? [y/n]: y
Logical volume "TestVMDisk-small-16.drbdmeta" successfully removed
it-test-17.lab.fr.lan ~ 0 #
it-test-16.lab.fr.lan ~ 5 # lvremove /dev/vg/TestVMDisk-small-16.drbdmeta
Do you really want to remove active logical volume TestVMDisk-small-16.drbdmeta? [y/n]: y
Logical volume "TestVMDisk-small-16.drbdmeta" successfully removed
it-test-16.lab.fr.lan ~ 0 #
Remove VM LV on source hypervisor::
it-test-16.lab.fr.lan ~ 0 # lvremove /dev/vg/TestVMDisk-small-16
Do you really want to remove active logical volume TestVMDisk-small-16? [y/n]: y
Logical volume "TestVMDisk-small-16" successfully removed
it-test-16.lab.fr.lan ~ 0 #
8 - Resume VM
-------------
cc-node-v24/docs/dev/source/hypervisor.rst 0000664 0000000 0000000 00000000755 12113401062 0020746 0 ustar 00root root 0000000 0000000 Hypervisor
==========
Hypervisor handler
------------------
.. autoclass:: cloudcontrol.node.hypervisor.Handler
:members:
Hypervisor object
-----------------
.. autoclass:: cloudcontrol.node.hypervisor.Hypervisor
:members:
Storage pools and volumes
-------------------------
.. autoclass:: cloudcontrol.node.hypervisor.StorageIndex
:members:
.. autoclass:: cloudcontrol.node.hypervisor.Storage
:members:
.. autoclass:: cloudcontrol.node.hypervisor.Volume
:members:
cc-node-v24/docs/dev/source/index.rst 0000664 0000000 0000000 00000001127 12113401062 0017635 0 ustar 00root root 0000000 0000000 .. Cloud Control node documentation master file, created by
sphinx-quickstart on Thu Sep 15 15:48:00 2011.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to Cloud Control node's documentation!
==============================================
Contents:
.. toctree::
:maxdepth: 2
architecture
tags
plugins
host
libvirt
hypervisor
vm
jobs
cold_migration
hot_migration
test
exc
utils
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
cc-node-v24/docs/dev/source/jobs.rst 0000664 0000000 0000000 00000000647 12113401062 0017471 0 ustar 00root root 0000000 0000000 Jobs
====
Jobs manager
------------
.. autoclass:: cloudcontrol.node.jobs.JobManager
:members:
Abstract jobs
-------------
.. autoclass:: cloudcontrol.node.jobs.BaseThreadedJob
:members:
.. autoclass:: cloudcontrol.node.jobs.ForkedJob
:members:
.. autoclass:: cloudcontrol.node.jobs.BaseIOJob
:members:
Hypervisor jobs
---------------
.. automodule:: cloudcontrol.node.hypervisor.jobs
:members:
cc-node-v24/docs/dev/source/libvirt.rst 0000664 0000000 0000000 00000000120 12113401062 0020171 0 ustar 00root root 0000000 0000000 Libvirt
=======
.. automodule:: cloudcontrol.node.hypervisor.lib
:members:
cc-node-v24/docs/dev/source/plugins.rst 0000664 0000000 0000000 00000000111 12113401062 0020177 0 ustar 00root root 0000000 0000000 Plugins
=======
.. automodule:: cloudcontrol.node.plugins
:members:
cc-node-v24/docs/dev/source/tags.rst 0000664 0000000 0000000 00000000100 12113401062 0017452 0 ustar 00root root 0000000 0000000 Tags
====
.. automodule:: cloudcontrol.node.tags
:members:
cc-node-v24/docs/dev/source/test.rst 0000664 0000000 0000000 00000002412 12113401062 0017503 0 ustar 00root root 0000000 0000000 Test scenario for cc-node
=========================
Host
----
* Check all tags
* Check handlers: shutdown, execute_command
Hypervisor
----------
* Check all tags
* Check handlers: vm_start, vm_stop (+destroy), vm_suspend, vm_resume, vm_undefine,
vm_export, vm_define
* Check statuses of VMs change
* Check objects are registered/unregistered to the cc-server when VMs
disapear/appear.
Error handling
..............
Behaviour when libvirt connection is lost:
* libvirtstatus tag is updated to disconnected
* domains are unregistered
* some tags (relative to storage pools are unregistered)
* handlers relative to VMs are removed
Behaviour when libvirt connection is retrieved:
* libvirtstatus tag is updated to connected
* domains are registered
* tags (relative to storage pools) are registered
* handlers relative to VMs are added
CC-server
---------
Error handling
..............
Behaviour when connection is lost:
* Nothing changes
Behaviour when connection is retrieved:
* Authentify
* Check role returned and load another main plugin is role changed, else
keeps current runnning plugin
* Register tags and objects
Error handling
--------------
* Test with libvirt/cc-server failures multiple configuration
cc-node-v24/docs/dev/source/utils.rst 0000664 0000000 0000000 00000000103 12113401062 0017657 0 ustar 00root root 0000000 0000000 Utils
=====
.. automodule:: cloudcontrol.node.utils
:members:
cc-node-v24/docs/dev/source/vm.rst 0000664 0000000 0000000 00000000336 12113401062 0017151 0 ustar 00root root 0000000 0000000 Domains
=======
KVM virtual machines
--------------------
.. automodule:: cloudcontrol.node.hypervisor.domains
:members:
KVM tags
--------
.. automodule:: cloudcontrol.node.hypervisor.domains.vm_tags
:members:
cc-node-v24/docs/user/ 0000775 0000000 0000000 00000000000 12113401062 0014673 5 ustar 00root root 0000000 0000000 cc-node-v24/docs/user/Makefile 0000664 0000000 0000000 00000011032 12113401062 0016330 0 ustar 00root root 0000000 0000000 # Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) source
.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest
help:
@echo "Please use \`make ' where is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " singlehtml to make a single large HTML file"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " devhelp to make HTML files and a Devhelp project"
@echo " epub to make an epub"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " latexpdf to make LaTeX files and run them through pdflatex"
@echo " text to make text files"
@echo " man to make manual pages"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
clean:
-rm -rf $(BUILDDIR)/*
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
singlehtml:
$(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
@echo
@echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/CloudControlnode.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/CloudControlnode.qhc"
devhelp:
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
@echo
@echo "Build finished."
@echo "To view the help file:"
@echo "# mkdir -p $$HOME/.local/share/devhelp/CloudControlnode"
@echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/CloudControlnode"
@echo "# devhelp"
epub:
$(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
@echo
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make' in that directory to run these through (pdf)latex" \
"(use \`make latexpdf' here to do that automatically)."
latexpdf:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through pdflatex..."
make -C $(BUILDDIR)/latex all-pdf
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
text:
$(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
@echo
@echo "Build finished. The text files are in $(BUILDDIR)/text."
man:
$(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
@echo
@echo "Build finished. The manual pages are in $(BUILDDIR)/man."
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
cc-node-v24/docs/user/source/ 0000775 0000000 0000000 00000000000 12113401062 0016173 5 ustar 00root root 0000000 0000000 cc-node-v24/docs/user/source/conf.py 0000664 0000000 0000000 00000015657 12113401062 0017510 0 ustar 00root root 0000000 0000000 # -*- coding: utf-8 -*-
#
# Cloud Control node documentation build configuration file, created by
# sphinx-quickstart on Thu Sep 15 15:46:56 2011.
#
# This file is execfile()d with the current directory set to its containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#sys.path.insert(0, os.path.abspath('.'))
# -- General configuration -----------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.coverage']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'Cloud Control node'
copyright = u'2011, Smartjog'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '18'
# The full version, including alpha/beta/rc tags.
release = '18'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = []
# The reST default role (used for this markup: `text`) to use for all documents.
#default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# -- Options for HTML output ---------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# " v documentation".
#html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# If false, no module index is generated.
#html_domain_indices = True
# If false, no index is generated.
#html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
#html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
#html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = None
# Output file base name for HTML help builder.
htmlhelp_basename = 'CloudControlnodedoc'
# -- Options for LaTeX output --------------------------------------------------
# The paper size ('letter' or 'a4').
#latex_paper_size = 'letter'
# The font size ('10pt', '11pt' or '12pt').
#latex_font_size = '10pt'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
('index', 'CloudControlnode.tex', u'Cloud Control node Documentation',
u'Smartjog', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# If true, show page references after internal links.
#latex_show_pagerefs = False
# If true, show URL addresses after external links.
#latex_show_urls = False
# Additional stuff for the LaTeX preamble.
#latex_preamble = ''
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# If false, no module index is generated.
#latex_domain_indices = True
# -- Options for manual page output --------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
('index', 'cloudcontrolnode', u'Cloud Control node Documentation',
[u'Smartjog'], 1)
]
cc-node-v24/docs/user/source/index.rst 0000664 0000000 0000000 00000000741 12113401062 0020036 0 ustar 00root root 0000000 0000000 .. Cloud Control node documentation master file, created by
sphinx-quickstart on Thu Sep 15 15:46:56 2011.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to Cloud Control node's documentation!
==============================================
Contents:
.. toctree::
:maxdepth: 2
overview
install
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
cc-node-v24/docs/user/source/install.rst 0000664 0000000 0000000 00000002213 12113401062 0020371 0 ustar 00root root 0000000 0000000 Installation
============
Installation
------------
Repository
..........
Install from Smartjog debian repository:
.. code-block:: text
# smartjog
deb http://debian.fr.smartjog.net/debian-smartjog/ squeeze smartjog
**Cloud Control node** provides two packages named ``cc-node`` and
``cc-node-hypervisor``. The last is just a meta package that provides
dependencies for running the **Cloud Control node** as an **hypervisor** (``KVM``).
The first contains all the **Cloud Control node** code base and can be installed
if you only need to run the node with the **host** role.
Install with your favorite package manager as root:
.. code-block:: bash
# apt-get install cc-node
or
.. code-block:: bash
# apt-get install cc-node-hypervisor
Configuration
-------------
Example configuration file:
``[ccserver]`` section:
.. literalinclude:: ../../../etc/cc-node.conf
:lines: 1-5
For logging section, see `Python documentation `_.
Full default file (using syslog):
.. literalinclude:: ../../../etc/cc-node.conf
:lines: 1-18, 20-30, 36, 38
cc-node-v24/docs/user/source/libvirt.rst 0000664 0000000 0000000 00000004353 12113401062 0020405 0 ustar 00root root 0000000 0000000 Libvirt hot migration compatibility
-----------------------------------
Should work:
From 0.8.8 to 0.9.8
From 0.9.8 to 0.9.12
Should not work:
From 0.9.12 to 0.9.8 (because of bad xml seclabel, event if set explicitly it
doesn't work)
From 0.9.8 to 0.8.8 (because of XML format compat)
Note for all versions:
When migrating, libvirt try to acquire some mutex on the domain and this can
block forever (0.8.8) or timeout (0.9.8 and +). Take a look at the node logs to
be sure.
In the later case, you must restart libvirt in order for the migration to work
properly.
What to do when live migration fails
------------------------------------
Check DRBD and remove if needed:
::
# identify which DRBD volume needs to go down
it-test-15.lab.fr.lan ~ 0 # cat /proc/drbd
version: 8.3.9 (api:88/proto:86-95)
srcversion: CF228D42875CF3A43F2945A
0: cs:Connected ro:Primary/Primary ds:UpToDate/UpToDate C r-----
ns:2097152 nr:0 dw:0 dr:2097352 al:0 bm:128 lo:0 pe:0 ua:0 ap:0 ep:1 wo:f oos:0
# identify all LV (VM, copy and DRBD meta)
it-test-15.lab.fr.lan ~ 0 # dmsetup ls
vg-Test--Clone--root (253, 5)
vg-Test--Clone--root.copy (253, 2)
vg-Test--Clone--root.drbdmeta (253, 1)
vg-Etherpad--Etherpad (253, 0)
# reload VM LV table if needed
it-test-15.lab.fr.lan ~ 0 # dmsetup table vg-Test--Clone--root.copy | dmsetup load vg-Test--Clone--root
it-test-15.lab.fr.lan ~ 0 # dmsetup suspend vg-Test--Clone--root
it-test-15.lab.fr.lan ~ 0 # dmsetup resume vg-Test--Clone--root
# halt DRBD volume
it-test-15.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 disconnect
it-test-15.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 secondary
it-test-15.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 detach
it-test-15.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 down
# remove copy DM
it-test-15.lab.fr.lan ~ 0 # dmsetup remove vg-Test--Clone--root.copy
# remove DRBD meta
it-test-15.lab.fr.lan ~ 0 # lvremove /dev/vg/Test-Clone-root.drbdmeta
# /!\ on hypervisor where the VM is not present, also remove the VM LV
it-test-15.lab.fr.lan ~ 0 # lvremove /dev/vg/Test-Clone-root
# check that libvirt ended the qemu process
# refresh libvirt pools
it-test-15.lab.fr.lan ~ 0 # virsh pool-refresh vg
cc-node-v24/docs/user/source/overview.rst 0000664 0000000 0000000 00000000225 12113401062 0020572 0 ustar 00root root 0000000 0000000 Cloud Control
=============
CLoud Control Node
------------------
Objects
-------
Tags
----
TQL
---
Role
----
Host
----
Hypervisor
----------
cc-node-v24/etc/ 0000775 0000000 0000000 00000000000 12113401062 0013540 5 ustar 00root root 0000000 0000000 cc-node-v24/etc/cc-node.conf 0000664 0000000 0000000 00000001601 12113401062 0015715 0 ustar 00root root 0000000 0000000 [node]
# cc-server informations
address=_CC_SERVER_HOST_
port=1984
login=_CC_SERVER_LOGIN_
password=_CC_SERVER_PASSWD_
# logging verbosity (can be specified as number or name)
# 0=error, 1=warning, 2=info, 3=debug
# verbosity = error
# when debug is on, std output/input is not closed and logging is done through
# stderr instead of syslog
# debug = off
# directory where the cc-node jobs are saved
# jobs_store_path=/var/lib/cc-node/jobs/
# file where the plugins are saved for automatic reinstallation at node startup
# plugins_store_path=/var/lib/cc-node/plugins # it's a file
# backward compatibility for execute and shutdown handler
# remote_execution = yes
[node_handler] # this section is optional
# section to allow or forbid usage of RPC handlers at local level
# format is handler_name = yes|no
# execute = yes # this will be erased by the remote_execution function if set
# ...
cc-node-v24/examples/ 0000775 0000000 0000000 00000000000 12113401062 0014603 5 ustar 00root root 0000000 0000000 cc-node-v24/examples/cc-node-debug.conf 0000664 0000000 0000000 00000000133 12113401062 0020043 0 ustar 00root root 0000000 0000000 [node]
address=__HOST__
port=1984
login=__USER__
password=__PASSWD__
verbosity=3
debug=on
cc-node-v24/setup.py 0000664 0000000 0000000 00000002555 12113401062 0014506 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
#coding=utf-8
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import os
from setuptools import setup, find_packages
from cloudcontrol.node import __version__
ldesc = open(os.path.join(os.path.dirname(__file__), 'README')).read()
setup(
name='cc-node',
version=__version__,
description='Cloud Control Node',
long_description=ldesc,
author='Anaël Beutot',
author_email='anael.beutot@smartjog.com',
license='LGPL3',
namespace_packages=['cloudcontrol'],
packages=find_packages(),
scripts=['bin/cc-node'],
data_files=(
('/etc/', ('etc/cc-node.conf',)),
),
classifiers=[
'Operating System :: Unix',
'Programming Language :: Python',
],
)