pax_global_header 0000666 0000000 0000000 00000000064 13444400225 0014510 g ustar 00root root 0000000 0000000 52 comment=f5dd2058c95048237a7190404aa5813d84cefa8d
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/ 0000775 0000000 0000000 00000000000 13444400225 0017664 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/.gitignore 0000664 0000000 0000000 00000000114 13444400225 0021650 0 ustar 00root root 0000000 0000000 *.pyc
doc/_build/*
*.swp
*.log
test_*.py
*.test
/ccnode/geany_run_script.sh
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/CHANGELOG 0000664 0000000 0000000 00000000000 13444400225 0021064 0 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/COPYRIGHT 0000664 0000000 0000000 00000000040 13444400225 0021151 0 ustar 00root root 0000000 0000000 Copytight © 2010-2012 Smartjog
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/LICENSE 0000664 0000000 0000000 00000016743 13444400225 0020704 0 ustar 00root root 0000000 0000000 GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc.
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/MANIFEST.in 0000664 0000000 0000000 00000000030 13444400225 0021413 0 ustar 00root root 0000000 0000000 recursive-include etc *
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/README 0000664 0000000 0000000 00000000000 13444400225 0020532 0 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/bin/ 0000775 0000000 0000000 00000000000 13444400225 0020434 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/bin/cc-node 0000775 0000000 0000000 00000005635 13444400225 0021703 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
# FUCKING WORKAROUND FOR THE FUCKING PEX (FUCK!)
import sys
if '/usr/lib/python2.7/dist-packages' not in sys.path:
sys.path.append('/usr/lib/python2.7/dist-packages')
import os
import atexit
from optparse import OptionParser
from os.path import isfile, abspath
from daemon import DaemonContext
from cloudcontrol.common.client.exc import ConfigError
from cloudcontrol.node import __version__
from cloudcontrol.node.node import NodeLoop
from cloudcontrol.node.config import NodeConfigParser, configure_logging
DEFAULT_CONFIG_FILE = '/etc/cc-node.conf'
# command line arguments...
oparser = OptionParser(version='%%prog %s' % __version__)
oparser.add_option('-d', '--daemonize', default=False, action='store_true',
help=u'run as daemon and write pid file')
oparser.add_option('-c', '--config', metavar='FILE',
default=DEFAULT_CONFIG_FILE,
help=u'configuration file ABSOLUTE path (default: %default)')
oparser.add_option('-p', '--pidfile', metavar='FILE',
help=u'pid file path for daemon mode')
options, args = oparser.parse_args()
# check argument configuration
if options.daemonize and not options.pidfile:
sys.exit(u'Please supply a pid file...')
options.config = abspath(options.config)
if not isfile(options.config):
sys.exit(u'Please supply a valid path to configuration file...')
configure_logging(1, 'console')
try:
config = NodeConfigParser(options.config)
except ConfigError as exc:
sys.exit(exc.message)
# take care of pid file if daemon
if options.daemonize:
pidfile = open(options.pidfile, 'w')
files_preserve = [pidfile]
else:
files_preserve = None
if config.debug:
stderr = sys.stderr
stdout = sys.stdout
else:
stderr = None
stdout = None
with DaemonContext(detach_process=options.daemonize,
files_preserve=files_preserve,
stderr=stderr,
stdout=stdout):
# take care of pidfile
if options.daemonize:
pidfile.write('%s' % os.getpid())
pidfile.flush()
@atexit.register
def clean_pidfile():
pidfile.seek(0)
pidfile.truncate()
pidfile.flush()
NodeLoop(options.config).start()
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/ 0000775 0000000 0000000 00000000000 13444400225 0022373 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/__init__.py 0000664 0000000 0000000 00000001360 13444400225 0024504 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
__import__('pkg_resources').declare_namespace(__name__)
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/ 0000775 0000000 0000000 00000000000 13444400225 0023320 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/__init__.py 0000664 0000000 0000000 00000001410 13444400225 0025425 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
__product__ = 'Cloud-Control Node'
__version__ = '38'
__canonical__ = 'cc-node'
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/config.py 0000664 0000000 0000000 00000016033 13444400225 0025142 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import socket
import logging
import logging.config
from StringIO import StringIO
from itertools import ifilterfalse
from ConfigParser import SafeConfigParser, NoOptionError, NoSectionError
from cloudcontrol.common.client.exc import ConfigError
logger = logging.getLogger(__name__)
class _ConfigProxy(object):
"""Simple ConfigParser proxy that provide default values for get* methods.
"""
def __init__(self, config_parser):
self.config = config_parser
@staticmethod
def config_error(msg):
logger.error(msg)
raise ConfigError(msg)
def __getattr__(self, name):
if name.startswith('get'):
def getter(section, option, *default):
assert not default or len(default) == 1
try:
return getattr(self.config, name)(section, option)
except NoSectionError:
self.config_error('Section "%s" is not present in config'
' file' % section)
except NoOptionError:
if default:
return default[0]
self.config_error(
'Attribute "%s" not specified in config'
' file (section "%s")' % (option, section))
except ValueError:
self.config_error(
'Configuration attribute "%s" value is invalid'
' (section "%s")' % (option, section))
return getter
# else
return getattr(self.config, name)
class NodeConfigParser(object):
"""ConfigParser for ccnode config file."""
def __init__(self, file_path):
config = _ConfigProxy(SafeConfigParser())
config.read(file_path)
# ccserver settings
self.server_host = config.get('node', 'address')
self.server_port = config.getint('node', 'port', 1984)
self.server_user = config.get('node', 'login')
self.server_passwd = config.get('node', 'password')
# node settings
try:
self.logging_level = config.getint('node', 'verbosity', 0)
except ConfigError:
try:
self.logging_level = dict(
debug=3,
info=2,
warning=1,
error=0,
)[config.get('node', 'verbosity')]
except KeyError:
_ConfigProxy.config_error(
'Configuration attribute "verbosity"'
' is invalid (section "node")')
self.debug = config.getboolean('node', 'debug', False)
self.logging_output = 'console' if self.debug else 'syslog'
# path settings
self.jobs_store_path = config.get('node', 'jobs_store_path',
'/var/lib/cc-node/jobs')
# plugins persistance
self.plugins_store_path = config.get('node', 'plugins_store_path',
'/var/lib/cc-node/plugins')
# Libvirt URI to export to cc-server (vir_uri tag)
default_libvirt_uri = 'qemu+tcp://%s/system' % socket.gethostbyname(socket.gethostname())
self.libvirt_uri = config.get('node', 'libvirt_uri', default_libvirt_uri)
# Path to define script
default_define_script = 'hkvm-define'
self.define_script = config.get('node', 'define_script', default_define_script)
default_rescue_script = 'hkvm-rescue'
self.rescue_script = config.get('node', 'rescue_script', default_rescue_script)
default_install_script = 'hkvm-install'
self.install_script = config.get('node', 'install_script', default_install_script)
default_mode_script = 'hkvm-mode'
self.mode_script = config.get('node', 'mode_script', default_mode_script)
default_vlan_script = 'hkvm-vlan'
self.vlan_script = config.get('node', 'vlan_script', default_vlan_script)
default_attach_script = 'hkvm-attach'
self.attach_script = config.get('node', 'attach_script', default_attach_script)
default_detach_script = 'hkvm-detach'
self.detach_script = config.get('node', 'detach_script', default_detach_script)
default_boot_order_script = 'hkvm-boot-order'
self.boot_order_script = config.get('node', 'boot_order_script', default_boot_order_script)
# RPC handler ACLs
acl_section_name = 'node_handler'
if config.has_section(acl_section_name):
self.forbidden_handlers = set(ifilterfalse(
lambda o: config.getboolean(acl_section_name, o, True),
config.options(acl_section_name),
))
else:
self.forbidden_handlers = set()
# backward compatibility
command_execution_handlers = set(('shutdown', 'execute'))
if config.getboolean('node', 'command_execution', None) == False:
self.forbidden_handlers |= command_execution_handlers
else:
self.forbidden_handlers -= command_execution_handlers
# deprecated options
for o in ('detect_hypervisor', 'force_xen'):
if config.get('node', o, None) is not None:
logger.warning('%s config option is not supported anymore', o)
def configure_logging(level, output):
level = {
0: 'ERROR', 1: 'WARNING',
2: 'INFO', 3: 'DEBUG',
}[level]
output = dict(
console="""
[handler_output]
class=StreamHandler
formatter=simpleFormatter
args=(sys.stderr,)
[formatter_simpleFormatter]
format=cc-node - %(asctime)s - %(name)s - %(levelname)s - %(message)s
""",
syslog="""
[handler_output]
class=handlers.SysLogHandler
formatter=simpleFormatter
args=('/dev/log', handlers.SysLogHandler.LOG_DAEMON)
[formatter_simpleFormatter]
class=cloudcontrol.common.helpers.formatter.EncodingFormatter
format=cc-node - %(name)s - %(levelname)s - %(message)s
""",
)[output]
# create config parser for logging configuration
logging.config.fileConfig(StringIO("""
[loggers]
keys=root,ccnode,cccommon,sjrpc
[handlers]
keys=output
[formatters]
keys=simpleFormatter
[logger_root]
level=ERROR
handlers=output
[logger_ccnode]
level=%(level)s
handlers=
qualname=cloudcontrol.node
[logger_cccommon]
level=%(level)s
handlers=
qualname=cloudcontrol.common
[logger_sjrpc]
level=ERROR
handlers=
qualname=sjrpc
%(output)s
""" % dict(level=level, output=output)))
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/exc.py 0000664 0000000 0000000 00000003704 13444400225 0024455 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
"""Exceptions classes for ccnode."""
class CCNodeError(Exception):
"""Base exception class for cc-node."""
pass
class PluginError(CCNodeError):
"""Exception related to plugin execution."""
pass
class UndefinedDomain(CCNodeError):
"""Operation on a domain that does not exist was tried."""
pass
class PoolStorageError(CCNodeError):
"""Pool or volume was not found."""
pass
class TunnelError(CCNodeError):
"""Error occured during TunnelJob execution."""
pass
class DRBDAllocationError(CCNodeError):
"""Cannot create DRBD volume."""
pass
class DRBDError(CCNodeError):
"""Error occured during DRBDJob execution."""
pass
class ConsoleError(CCNodeError):
"""Error relative to VM virtio console handling."""
pass
class ConsoleAlreadyOpened(ConsoleError):
"""VM virtio console is already opened."""
pass
class VMMigrationError(CCNodeError):
"""Error during live migration job."""
pass
class JobError(CCNodeError):
"""General exception for a job."""
pass
class RemoteExecutionError(CCNodeError):
"""Thrown when a remote command execution error occurs."""
pass
class ForbiddenHandler(CCNodeError):
"""Raised when a handler is called but disabled by configuration."""
pass
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/host/ 0000775 0000000 0000000 00000000000 13444400225 0024275 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/host/__init__.py 0000664 0000000 0000000 00000037774 13444400225 0026430 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import os
import pty
import fcntl
import subprocess
import logging
import os.path
import stat
import shutil
import struct
import socket
import termios
import tempfile
import threading
from fcntl import ioctl
import cPickle as pickle
from itertools import imap, chain
from sjrpc.utils import pass_connection, threadless
from sjrpc.core.exceptions import RpcError
from cloudcontrol.common.client.tags import Tag, tag_inspector, TagDB
from cloudcontrol.common.client.plugins import Base as BasePlugin, rpc_handler
from cloudcontrol.common.jobs import JobsManager, JobsStore
from cloudcontrol.common.helpers.logger import patch_logging; patch_logging()
from cloudcontrol.node.utils import execute
from cloudcontrol.node.exc import RemoteExecutionError
from cloudcontrol.node.host import tags
from cloudcontrol.node.host.jobs import NodeJobsManagerInterface, ScriptJob
from cloudcontrol.node.host.plugins import PluginMethodJob, Plugin
logger = logging.getLogger(__name__)
def disk_tag_value(disk_name):
def size():
s = open(os.path.join('/sys/block', disk_name, 'size')).read().strip()
try:
s = int(s)
if s > 0:
return s * 512
else:
return None
except ValueError:
return None
return size
class FakePtySocket(object):
"""A fake socket object wrapping a :class:`subprocess.Popen` object
standard input/output.
"""
def __init__(self, fd):
self._fd = fd
def recv(self, size):
return os.read(self._fd, size)
def send(self, data):
return os.write(self._fd, data)
def fileno(self):
return self._fd
def setblocking(self, blocking):
if not blocking:
# Disable blocking mode on stdin:
current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
pass
else:
raise NotImplementedError('Not implemented')
def close(self):
pass
class RemoteShell(object):
"""Handles basic operations on remote shell."""
def __init__(self, conn, exec_='/bin/bash'):
"""
:param conn: sjRPC connection
:param exec_: path of binary to execute
"""
# handles close when waiting process to end from an other thread
self.master, self.slave = pty.openpty()
self.endpoint = FakePtySocket(self.master)
self.proto = conn.create_tunnel(endpoint=self.endpoint,
close_endpoint_on_shutdown=False,
on_shutdown=self.on_tunnel_shutdown)
self.conn = conn
self.child_watcher = None
try:
self.process = subprocess.Popen(
[exec_], stdout=self.slave,
bufsize=0, stdin=self.slave,
stderr=self.slave,
preexec_fn=os.setsid,
close_fds=True,
cwd=os.environ.get('HOME', '/'),
)
except OSError:
logger.exception('Error while executing remote shell')
# close opened fds
self.close()
raise
self.child_watcher = conn.loop.child(self.process.pid, False,
self.child_cb)
self.child_watcher.start()
@property
def label(self):
return self.proto.label
def resize(self, row, col, xpixel, ypixel):
if self.master is not None:
ioctl(self.master, termios.TIOCSWINSZ,
struct.pack('HHHH', row, col, xpixel, ypixel))
def on_tunnel_shutdown(self, *args):
# *args is for callback arguments (ignored)
self.proto, proto = None, self.proto # prevents multiple calls to
# close method
self.close()
logger.debug('Tunnel closed by sjRPC')
def child_cb(self, watcher, revents):
logger.debug('Tunnel closed by process termination')
self.process.returncode = watcher.rstatus
self.close()
def close(self):
if self.child_watcher is not None:
self.child_watcher.stop()
self.child_watcher = None
if self.process.returncode is None:
# process is still alive
self.process.kill()
if self.master is not None:
try:
os.close(self.master)
except OSError:
logger.error('Error when closing file descriptor for pty')
self.master = None
if self.slave is not None:
try:
os.close(self.slave)
except OSError:
logger.error('Error when closing slave file descriptor for pty')
self.slave = None
if self.proto is not None:
try:
self.proto.shutdown()
except Exception:
logger.error('Error while trying to close RPC tunnel')
self.proto = None
class Handler(BasePlugin):
"""Handler for host role."""
def __init__(self, *args, **kwargs):
BasePlugin.__init__(self, *args, **kwargs)
# add plugin tags
self.tag_db.add_tags(tag_inspector(tags, self))
# disk related tags
self.tag_db.add_tags(imap(
lambda d: Tag('disk%s_size' % d, disk_tag_value(d), 60),
self.tag_db['__main__']['disk']._calculate_value().split(),
))
# running shells
self.shells = dict()
# running remote commands
self.commands = set()
#: jobs manager (different from MainLoop.jobs_manager)
try:
os.makedirs(self.main.config.jobs_store_path, mode=0755)
except OSError as e:
if e.errno == 17:
pass # Ignore existing directory
else:
logger.critical('Cannot create jobs directory: %s', e)
self.main.stop()
raise
except Exception as e:
logger.critical('Cannot create jobs directory: %s', e)
self.main.stop()
raise
try:
self.jobs_manager = JobsManager(logger, NodeJobsManagerInterface(self),
JobsStore(self.main.config.jobs_store_path))
except EnvironmentError as e:
logger.critical('Cannot access jobs directory: %s', e.strerror)
self.main.stop()
raise
#: loaded plugins
self.plugins = {} # plugin name -> plugin object
#: cache directory for scripts
try:
self.scripts_dir = tempfile.mkdtemp(prefix='cc-node-script-cache')
except EnvironmentError as e:
logger.critical('Cannot create temporary directory for scripts: %s',
e.strerror)
self.main.stop()
raise
def start(self):
BasePlugin.start(self)
# load plugins if persistent file is here
if os.path.isfile(self.main.config.plugins_store_path):
logger.debug('Loading previous plugins')
try:
with open(self.main.config.plugins_store_path) as f:
to_load = pickle.load(f)
except EnvironmentError as exc:
logger.error('Cannot load previous plugins: %s (%s)',
exc.errno, exc.strerror)
except (
EOFError,
AttributeError,
ImportError,
IndexError,
pickle.UnpicklingError,
) as exc:
logger.error('Cannot read file, bad format, %s', exc)
else:
def plugins_install():
for p in to_load:
try:
self.plugin_install(self.main.rpc_con.rpc, None, p)
except Exception:
logger.exception('Error while loading plugin %s', p)
else:
logger.debug('Successfuly loaded plugin %s', p)
th = threading.Thread(target=plugins_install)
th.daemon = True
th.start()
else:
logger.debug('No previously loaded plugins')
def stop(self):
# remove script cache directory
shutil.rmtree(self.scripts_dir, ignore_errors=True)
# kill all currently running shells and commands
logger.debug('Kill currently running shells and commands')
for stuff in chain(self.shells.values(),
list(self.commands)):
stuff.close()
self.shells.clear()
self.commands.clear()
BasePlugin.stop(self)
def update_plugin_index(self):
self.tag_db['__main__']['plugins'].update_value()
# write plugins to file
try:
with open(self.main.config.plugins_store_path, 'w') as f:
pickle.dump([p for p in self.plugins], f)
except EnvironmentError as exc:
logger.error('Cannot save loaded plugins in \'%s\': %s (%s)',
self.main.config.plugins_store_path,
exc.errno, exc.strerror)
else:
logger.debug('Plugins state saved')
@rpc_handler
def execute_command(self, command, stdin=None):
command = ['/bin/sh', '-c', command]
rcode, output = execute(self.main, command, stdin)
if rcode != 0:
# 127 means command not found, 126 means not executable
if rcode == 127:
raise RemoteExecutionError('Command not found: %s' % output)
elif rcode == 126:
raise RemoteExecutionError('Command is not executable')
else:
raise RemoteExecutionError('Child exited with non zero status %s' % rcode)
return output
@rpc_handler
def node_shutdown(self, reboot=True, gracefull=True):
"""Halt/Reboot the node.
:param bool reboot: halt/reboot the system
:param bool gracefull: force the operation (gracefull == not force)
"""
args = ['/sbin/shutdown']
if reboot:
args.append('-r')
if gracefull:
logger.info('Going to reboot the host...')
args.append('-f')
else:
logger.info('Going to force the reboot of the host...')
args.append('-n')
else:
# halt
args.append('-h -P')
if not gracefull:
logger.info('Going to halt the host...')
args.append('-n')
else:
logger.info('Going to force the halt of the host...')
args.append('0')
return self.execute_command(' '.join(args))
@threadless
@pass_connection
@rpc_handler
def shell(self, conn, shell='/bin/bash'):
"""Create a shell tunnel and return the label of the created tunnel.
"""
logger.debug('Opening shell')
remote_shell = RemoteShell(conn, shell)
self.shells[remote_shell.label] = remote_shell
return remote_shell.label
@threadless
@rpc_handler('resize')
def shell_resize(self, label, row, col, xpixel, ypixel):
"""Resize the shell's attached terminal."""
logger.debug('Shell resize')
self.shells[label].resize(row, col, xpixel, ypixel)
@pass_connection
@rpc_handler
def forward(self, conn, label, port, destination='127.0.0.1'):
"""TCP port forwarding."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
logger.exception('Cannot create socket in forward handler')
raise
try:
sock.connect((destination, port))
except socket.error:
logger.exception('Error while connecting to destination (forward)')
raise
conn.create_tunnel(label=label, endpoint=sock)
@pass_connection
@rpc_handler
def script_run(self, conn, sha1, script, owner, batch=None, *args):
# retrive script if not here
filename = os.path.join(self.scripts_dir, sha1)
if not os.access(filename, os.X_OK):
try:
sha1, content = conn.call('script_get', script)
except RpcError:
logger.error('Error while retrieving script: %s', script)
raise
with open(filename, "w") as f:
try:
f.write(content)
os.chmod(filename, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
except OSError:
logger.error(
'Could not set execution permission to script: %s',
script)
raise
except IOError:
logger.error(
'Could not write script to repository: %s', script)
raise
return self.jobs_manager.spawn(ScriptJob, owner, batch=batch,
settings=dict(
script=script,
filename=filename,
args=args,
)).id
@rpc_handler
def job_cancel(self, job_id):
self.jobs_manager.get(job_id).cancel()
@rpc_handler
def job_purge(self, job_id):
self.jobs_manager.purge(job_id)
@rpc_handler
def job_attachment(self, job_id, name):
"""
:param name: attachment name
"""
try:
return self.jobs_manager.get(job_id).read_attachment(name)
except Exception:
logger.exception('Error while getting job attachment')
raise
@pass_connection
@rpc_handler
def plugin_install(self, conn, sha1, name):
"""
:param conn: RPC connection
:param sha1: sha1 string or None (get the latest)
:param name: plugin name
"""
# check if plugin is not already loaded and upgrade it if the sha1 hash
# has changed:
if name in self.plugins:
if self.plugins[name].sha1 != sha1:
self.plugins[name].uninstall()
del self.plugins[name]
else:
return
# get plugin from server:
sha1_get, content = conn.call('plugin_get', name)
if sha1 is not None and sha1 != sha1_get:
raise RuntimeError('Requested sha1 is not available on the server')
# load the plugin:
plugin_logger = logger.getChild('plugin.%s' % name)
self.plugins[name] = Plugin(plugin_logger, TagDB(self.tag_db),
name, sha1, content)
self.update_plugin_index()
@rpc_handler
def plugin_uninstall(self, name):
plugin = self.plugins.pop(name, None)
if plugin is None:
raise KeyError('Plugin %r is not running' % name)
plugin.uninstall()
self.update_plugin_index()
@rpc_handler
def plugin_run(self, name, method, owner, batch=None, **kwargs):
if name not in self.plugins:
raise KeyError('Plugin %r is not running' % name)
try:
func = self.plugins[name].methods[method]
except KeyError:
raise KeyError('Unknown method %r' % method)
return self.jobs_manager.spawn(PluginMethodJob, owner, batch=batch,
settings=dict(
plugin_name=name,
method_name=method,
method=func,
method_kwargs=kwargs,
)).id
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/host/jobs.py 0000664 0000000 0000000 00000006164 13444400225 0025613 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
"""Interface implementation for JobsManager from cc-common."""
from datetime import datetime
import subprocess
from cloudcontrol.common.jobs import JobsManagerInterface, Job, JobCancelError
from cloudcontrol.common.tql.db.helpers import taggify
from cloudcontrol.common.client.tags import TagDB, Tag
class NodeJobsManagerInterface(JobsManagerInterface):
TAG_ATTRIBUTES = ('title', 'status', 'state', 'owner', 'created', 'ended',
'attachments', 'batch')
def __init__(self, host_handler):
self.handler = host_handler
self.tag_db = TagDB(self.handler.tag_db)
def on_job_created(self, job):
def get_tag_value(tag):
return lambda job: taggify(getattr(job, tag))
self.tag_db.add_sub_object(
job.id,
(Tag(tag, get_tag_value(tag), parent=job)
for tag in self.TAG_ATTRIBUTES),
'job',
)
def tag_duration(job):
if job.ended is None:
ended = datetime.fromtimestamp(self.handler.main.evloop.now())
else:
ended = job.ended
return taggify(ended - job.created)
self.tag_db.add_sub_tag(job.id, Tag('duration', tag_duration, 2, 1, job))
def on_job_updated(self, job):
for tag in self.TAG_ATTRIBUTES:
self.tag_db[job.id][tag].update_value()
def on_job_purged(self, job_id):
self.tag_db.remove_sub_object(job_id)
class ScriptJob(Job):
"""Job that runs a script."""
def job(self, script, filename, args):
self.title = 'Script: %s %s' % (script, ' '.join(args))
self.process = None
to_exec = (filename,) + args
output = self.attachment('output')
self.logger.debug('To exec: %s', to_exec)
try:
self.process = subprocess.Popen(to_exec, stdout=output,
stderr=output, close_fds=True)
self.logger.info('Script running with pid: %d', self.process.pid)
ex = self.process.wait()
if ex != 0:
raise JobCancelError('Script returned exit code %d' % ex)
except subprocess.CalledProcessError:
self.logger.error('Error while executing script: %s', script)
raise
def cancel(self):
super(ScriptJob, self).cancel()
if self.process is not None:
self.logger.info('Terminating job %s', self.title)
self.process.terminate()
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/host/plugins.py 0000664 0000000 0000000 00000010567 13444400225 0026341 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import imp
import logging
from collections import defaultdict
from cloudcontrol.common.client.exc import TagConflict
from cloudcontrol.common.client.tags import Tag
from cloudcontrol.common.jobs import Job
from cloudcontrol.common.tql.db.helpers import taggify
from cloudcontrol.node.exc import PluginError
logger = logging.getLogger(__name__)
def _tag_direct_use_error(*args, **kwargs):
raise RuntimeError('Plugins tags can\'t be called directly')
class Plugin(object):
""" Represent a loaded cloudcontrol plugin.
"""
def __init__(self, logger, tag_db, name, sha1, plugin_body):
self.logger = logger
self.name = name
self.sha1 = sha1
self.tags = []
self.tag_db = tag_db
self.methods = {}
self.events = defaultdict(lambda: [])
self._load_module(plugin_body)
self.trigger('install', self)
def _load_module(self, plugin_body):
# load the code into a module:
module = imp.new_module(self.name)
module.logger = self.logger
module.tag = self._decorator_tag
module.method = self._decorator_method
module.on = self._decorator_event
try:
exec plugin_body in module.__dict__
conflicts = self.tag_db.check_tags_conflict(*[tag.name for tag in
self.tags])
if conflicts:
raise TagConflict(
'Tags with names %s already exist' % ','.join(conflicts))
else:
self.tag_db.add_tags(self.tags)
except Exception as exc:
err_msg = 'Error during plugin installation (%s): %s' % (
self.name, exc)
logger.exception(err_msg)
# make sure all tags are stopped
self.tag_db.set_parent(None)
raise RuntimeError(err_msg)
else:
# prevents module from being garbage collected
self.module = module
def _decorator_tag(self, ttl=-1, refresh=None, name=None, background=False):
def decorator(func):
tag = Tag(func.__name__ if name is None else name,
lambda: taggify(func()), ttl=ttl, refresh=refresh,
background=background)
self.tags.append(tag)
return _tag_direct_use_error
return decorator
def _decorator_method(self, name=None):
def decorator(func):
register_name = func.__name__ if name is None else name
if register_name in self.methods:
raise PluginError('Already defined method %s' % register_name)
self.methods[register_name] = func
return func
return decorator
def _decorator_event(self, event):
def decorator(func):
self.events[event].append(func)
return func
return decorator
def trigger(self, event, *args, **kwargs):
"""Trigger an event."""
for func in self.events[event]:
func(*args, **kwargs)
def uninstall(self):
self.trigger('uninstall')
# shutdown the tags database:
self.tag_db.set_parent(None)
class PluginMethodJob(Job):
"""Job that run a plugin method."""
def job(self, plugin_name, method_name, method, method_kwargs):
kwargs_str = ', '.join(('%s=%s' % (k, v) for k, v in method_kwargs.iteritems()))
self.title = 'Plugin: %s.%s(%s)' % (plugin_name, method_name, kwargs_str)
try:
returned = method(self, **method_kwargs)
except Exception:
self.logger.exception('Error while executing method')
raise
if returned is not None:
self.attachment('output').write(returned)
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/host/tags.py 0000664 0000000 0000000 00000013143 13444400225 0025607 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
"""This module acts as a little framework for defining tags in a simple way.
Just define a string or a function and it will be introspected and used as a
tag value.
"""
import re
import time
import datetime
import os as os_
import platform as platform_
from multiprocessing import cpu_count
from socket import getfqdn
import psutil
from cloudcontrol.common.client.tags import ttl, refresh, background
@ttl(3600 * 24) # one day
@refresh(30)
@background()
def h():
"""Hostname tag."""
return getfqdn()
# CPU related tags
def arch():
"""Hardware CPU architecture."""
return {
u'i386': u'x86',
u'i486': u'x86',
u'i586': u'x86',
u'i686': u'x86',
u'x86_64': u'x64',
}.get(platform_.machine(), u'unknown')
def cpu():
"""Number of CPU (core) on the host."""
try:
return unicode(cpu_count())
except NotImplementedError:
return None
@ttl(10)
@refresh(2)
def cpuuse():
"""CPU usage in percentage."""
return u'%.1f' % psutil.cpu_percent()
# memory related tags
def mem():
"""Total physical memory available on system."""
if hasattr(psutil, 'avail_phymem') and hasattr(psutil, 'used_phymem'):
return unicode(psutil.avail_phymem() + psutil.used_phymem())
else:
memory = psutil.virtual_memory()
return unicode(memory.total)
@ttl(5)
@refresh(10)
def memfree():
"""Available physical memory on system."""
if hasattr(psutil, 'avail_phymem'):
return unicode(psutil.avail_phymem())
else:
return unicode(psutil.virtual_memory().available)
@ttl(5)
@refresh(10)
def memused():
"""Used physical memory on system."""
if hasattr(psutil, 'used_phymem'):
return unicode(psutil.used_phymem())
else:
return unicode(psutil.virtual_memory().used)
@ttl(5)
@refresh(10)
def membuffers():
"""Buffers memory use."""
if hasattr(psutil, 'phymem_buffers'):
return unicode(psutil.phymem_buffers())
else:
return unicode(psutil.virtual_memory().buffers)
@ttl(5)
@refresh(10)
def memcache():
"""Caches memory use."""
if hasattr(psutil, 'cached_phymem'):
return unicode(psutil.cached_phymem())
else:
return unicode(psutil.virtual_memory().cached)
# disks related tags
def disk():
"""List of disk devices on the host."""
disk_pattern = re.compile(r'[sh]d[a-z]+')
return u' '.join(d for d in os_.listdir(
'/sys/block/') if disk_pattern.match(d))
# other hardware related tags
def chaserial():
"""Blade chassis serial number."""
return open('/sys/class/dmi/id/chassis_serial').read().strip() or None
def chaasset():
"""Blade chassis asset tag."""
return open('/sys/class/dmi/id/chassis_asset_tag').read().strip() or None
def chaslot():
"""Position in blade chassis."""
chassis_version = open('/sys/class/dmi/id/chassis_version').read().strip()
if chassis_version == 'PowerEdge M1000e':
version = open('/sys/class/dmi/id/board_serial').read().strip()
return version.split('.')[3]
def uuid():
"""System UUID."""
return open('/sys/class/dmi/id/product_uuid').read().strip().lower() or None
def hmodel():
"""Host hardware model."""
return open('/sys/class/dmi/id/product_name').read().strip() or None
def hserial():
"""Host hardware serial number."""
return open('/sys/class/dmi/id/product_serial').read().strip() or None
def hvendor():
"""Host hardware vendor."""
return open('/sys/class/dmi/id/sys_vendor').read().strip() or None
def hbios():
"""Host BIOS version."""
return u'%s (%s)' % (
open('/sys/class/dmi/id/bios_version').read().strip() or None,
open('/sys/class/dmi/id/bios_date').read().strip() or None,
)
# Operating system related tags
def os():
"""Operating system (linux/windows)."""
return unicode(platform_.system().lower())
def platform():
"""Python platform.platform() info."""
return unicode(platform_.platform())
def uname():
"""As uname command (see python os.uname)."""
return u' '.join(os_.uname()) or None
@ttl(5)
@refresh(5)
def uptime():
"""Uptime of the system in seconds."""
return open('/proc/uptime').read().split()[0].split(u'.')[0] or None
@ttl(5)
@refresh(5)
def load():
"""Average of the number of processes in the run queue over the last 1, 5
and 15 minutes."""
load_ = None
try:
load_ = u' '.join(unicode(l) for l in os_.getloadavg())
except OSError:
pass
return load_
def plugins(handler):
return ' '.join(handler.plugins) or None
# HKVM related tags
def hkvmver():
"""HKVM version and patch level."""
return open('/etc/hkvm_version').read().strip()
def hkvmsetup():
"""HKVM setup date."""
fields = dict(x.split(': ', 1) for x in open('/etc/isimage').read().split('\n') if x.strip())
setup_date = datetime.datetime.strptime(fields['setup date'], '%a, %d %b %Y %H:%M:%S +0000')
return int(time.mktime(setup_date.timetuple()))
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/hypervisor/ 0000775 0000000 0000000 00000000000 13444400225 0025532 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/hypervisor/__init__.py 0000664 0000000 0000000 00000074231 13444400225 0027652 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import time
import logging
import socket
import json
from StringIO import StringIO
from xml.etree import cElementTree as et
import libvirt
from sjrpc.utils import threadless, pass_connection
from cloudcontrol.common.client.tags import Tag, tag_inspector, ParentWrapper
from cloudcontrol.common.client.plugins import (
rpc_handler,
rpc_handler_decorator_factory,
get_rpc_handlers,
)
from cloudcontrol.node.host import Handler as HostHandler
from cloudcontrol.node.hypervisor import tags
from cloudcontrol.node.hypervisor.kvm import KVM, LiveMigration
from cloudcontrol.node.exc import (
UndefinedDomain, DRBDError, PoolStorageError
)
from cloudcontrol.node.hypervisor.jobs import (
ImportVolume, ExportVolume, TCPTunnel, DRBD
)
from cloudcontrol.node.utils import execute
logger = logging.getLogger(__name__)
libvirt_handler_marker = '_libvirt_rpc_handler'
libvirt_handler = rpc_handler_decorator_factory(libvirt_handler_marker)
# FIXME find a way to refactor Handler and Hypervisor class
class Handler(HostHandler):
def __init__(self, *args, **kwargs):
"""
:param loop: MainLoop instance
:param hypervisor_name: hypervisor name
"""
self.hypervisor_name = kwargs.pop('hypervisor_name')
self._virt_connected = False
HostHandler.__init__(self, *args, **kwargs)
#: keep index of asynchronous calls
self.async_calls = dict()
self.timer = self.main.evloop.timer(.0, 5., self.virt_connect_cb)
self.hypervisor = None
# list of libvirt related RPC handlers
self.virt_handlers = get_rpc_handlers(self, libvirt_handler_marker)
# register tags
self.tag_db.add_tags(tag_inspector(tags, self))
@property
def virt_connected(self):
return self._virt_connected
@virt_connected.setter
def virt_connected(self, value):
self._virt_connected = value
# update tags
for tag in ('vir_status', 'sto', 'nvm', 'vmpaused', 'vmstarted',
'vmstopped', 'hvver', 'libvirtver', 'hv'):
self.tag_db['__main__'][tag].update_value()
def start(self):
self.timer.start()
HostHandler.start(self)
def stop(self):
self.timer.stop()
if self.hypervisor is not None:
self.hypervisor.stop()
HostHandler.stop(self)
def virt_connect_cb(self, *args):
# initialize hypervisor instance
try:
self.hypervisor = KVM(
name=self.hypervisor_name,
handler=self,
)
except libvirt.libvirtError:
logger.exception('Error while connecting to libvirt')
return
self.virt_connected = True
# register hypervisor storage tags
self.hypervisor.storage.update()
# register domains
for dom in self.hypervisor.domains.itervalues():
dom.tag_db.set_parent(ParentWrapper(dom.name, 'vm', self.tag_db))
# we must refresh those tags only when domains tags are registered to
# have the calculated values
tags_to_refresh = ('cpualloc', 'cpurunning', 'cpuremaining',
'cpuallocratio', 'memalloc', 'memrunning',
'memremaining', 'memallocratio')
for tag in tags_to_refresh:
self.tag_db['__main__'][tag].update_value()
# register libvirt handlers
self.rpc_handler.update(self.virt_handlers)
for k, v in self.virt_handlers.iteritems():
self.main.reset_handler(k, v)
# if everything went fine, unregister the timer
self.timer.stop()
def virt_connect_restart(self):
"""Restart libvirt connection.
This method might be called when libvirt connection is lost.
"""
if not self.virt_connected:
return
logger.error('Connection to libvirt lost, trying to restart')
# update connection state
self.virt_connected = False
# refresh those tags
tags_to_refresh = ('cpualloc', 'cpurunning', 'cpuremaining',
'cpuallocratio', 'memalloc', 'memrunning',
'memremaining', 'memallocratio')
for tag in tags_to_refresh:
self.tag_db['__main__'][tag].update_value()
# unregister tags that will be re registered later
for storage in self.hypervisor.storage.storages:
if storage.name.startswith('_'):
continue # Ignore internal storages
self.tag_db.remove_tags((
'sto%s_state' % storage,
'sto%s_size' % storage,
'sto%s_free' % storage,
'sto%s_used' % storage,
'sto%s_type' % storage,
'sto%s_vol' % storage,
'sto%s_ratio' % storage,
))
# unregister sub objects (for the same reason)
for sub_id in self.tag_db.keys():
if sub_id == '__main__':
continue
self.tag_db.remove_sub_object(sub_id)
# stop and delete hypervisor instance
self.hypervisor.stop()
self.hypervisor = None
# remove handlers related to libvirt
for handler in self.virt_handlers:
del self.rpc_handler[handler]
self.main.remove_handler(handler)
# launch connection timer
self.timer.start()
def iter_vms(self, vm_names):
"""Utility function to iterate over VM objects using their names."""
if vm_names is None:
return
get_domain = self.hypervisor.domains.get
for name in vm_names:
dom = get_domain(name)
if dom is not None:
yield dom
@libvirt_handler
def vm_define(self, data, format='xml'):
logger.debug('VM define')
if format == 'xml':
return self.hypervisor.vm_define(data)
elif format == 'vmspec':
# Encode tags as description:
if 'tags' in data:
if 'description' not in data:
data['description'] = ''
for tag, value in data['tags'].iteritems():
data['description'] += '\n@%s=%s' % (tag, value)
# Delete the tags key which is not recognized by hkvm-define
try:
del data['tags']
except KeyError:
pass
rcode, output = execute(self.main, [self.main.config.define_script], stdin=json.dumps(data))
if rcode == 0:
return output.strip()
else:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
raise NotImplementedError('Format not supported')
@libvirt_handler
def vm_undefine(self, name):
logger.debug('VM undefine %s', name)
vm = self.hypervisor.domains.get(name)
if vm is not None:
vm.undefine()
@libvirt_handler
def vm_export(self, name, format='xml'):
logger.debug('VM export %s', name)
if format != 'xml':
raise NotImplementedError('Format not supported')
vm = self.hypervisor.domains.get(name)
if vm is None:
return
return vm.lv_dom.XMLDesc(0)
@libvirt_handler
def vm_rescue(self, name):
logger.debug('VM rescue %s', name)
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.rescue_script, '-r', name])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot rescue VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_unrescue(self, name):
logger.debug('VM unrescue %s', name)
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.rescue_script, '-u', name])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot unrescue VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_install(self, name):
logger.debug('VM install %s', name)
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.install_script, '-i', name])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot install VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_uninstall(self, name):
logger.debug('VM uninstall %s', name)
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.install_script, '-u', name])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot uninstall VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_set_vlans(self, name, vlan_update_format, mac_address=None):
logger.debug('VM set vlan %s', name)
if name in self.hypervisor.domains:
if mac_address is None:
rcode, output = execute(self.main, [self.main.config.vlan_script,
name, '--', vlan_update_format])
else:
rcode, output = execute(self.main, [self.main.config.vlan_script,
'--iface-macaddr', mac_address,
name, '--', vlan_update_format])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot set vlans on VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_attach_disk(self, name, pool, volume, driver='virtio', bps=0, iops=0):
logger.debug('VM attach disk %s/%s -> %s ', pool, volume, name)
self.hypervisor.storage.update()
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.attach_script,
'--driver', driver,
'--bps', str(bps),
'--iops', str(iops),
name, pool, volume])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot attach disk on VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_detach_disk(self, name, pool, volume):
logger.debug('VM detach disk %s/%s <- %s ', pool, volume, name)
if name in self.hypervisor.domains:
rcode, output = execute(self.main, [self.main.config.detach_script,
name, pool, volume])
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot detach disk from VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_boot_order(self, name, order):
logger.debug('VM boot order %s -> %s', name, order)
if name in self.hypervisor.domains:
args_order = ['%s:%s' % tuple(x) for x in order]
rcode, output = execute(self.main, [self.main.config.boot_order_script,
name] + args_order)
if rcode != 0:
raise RuntimeError(output.strip().split('\n')[-1].strip())
else:
msg = 'Cannot change boot order of VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_stop(self, name):
logger.debug('VM stop %s', name)
try:
self.hypervisor.domains[name].stop()
except libvirt.libvirtError:
logger.exception('Error while stopping VM %s', name)
raise
except KeyError:
msg = 'Cannot stop VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_destroy(self, name):
logger.debug('VM destroy %s', name)
try:
self.hypervisor.domains[name].destroy()
except libvirt.libvirtError as exc:
# Libvirt raises exception 'domain is not running' even if domain
# is running, might be a bug in libvirt
if 'domain is not running' not in str(exc) or (self.hypervisor.domains[name].state != 'running'):
logger.exception('Error while destroying VM %s', name)
raise
except KeyError:
msg = 'Cannot destroy VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_start(self, name, pause=False):
"""
:param str name: VM name to start
:param bool pause: start VM in pause
"""
logger.debug('VM start %s', name)
try:
self.hypervisor.domains[name].start(pause)
except libvirt.libvirtError:
logger.exception('Error while starting VM %s', name)
raise
except KeyError:
msg = 'Cannot start VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_suspend(self, name):
logger.debug('VM suspend %s', name)
try:
self.hypervisor.domains[name].suspend()
except libvirt.libvirtError:
logger.exception('Error while suspending VM %s', name)
raise
except KeyError:
msg = 'Cannot suspend VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_resume(self, name):
logger.debug('VM resume %s', name)
try:
self.hypervisor.domains[name].resume()
except libvirt.libvirtError:
logger.exception('Error while resuming VM %s', name)
raise
except KeyError:
msg = 'Cannot resume VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_reset(self, name):
logger.debug('VM reset %s', name)
try:
self.hypervisor.domains[name].reset()
except libvirt.libvirtError:
logger.exception('Error while resetting VM %s', name)
raise
except KeyError:
msg = 'Cannot reset VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_cycle(self, name):
logger.debug('VM cycle %s', name)
try:
self.hypervisor.domains[name].destroy()
time.sleep(1)
self.hypervisor.domains[name].start()
except libvirt.libvirtError:
logger.exception('Error while cycle VM %s', name)
raise
except KeyError:
msg = 'Cannot cycle VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_change_title(self, name, new_title):
logger.debug('VM edit title %s', name)
try:
self.hypervisor.domains[name].title = new_title
except libvirt.libvirtError:
logger.exception('Error while changing VM title %s', name)
raise
except KeyError:
msg = 'Cannot change title open VM %s because it is not defined' % name
logger.error(msg)
raise UndefinedDomain(msg)
@libvirt_handler
def vm_migrate(self, name, dest_uri, live=False):
try:
dom = self.hypervisor.domains[name]
except KeyError:
raise UndefinedDomain('Cannot migrate VM %s because it is not defined', name)
dom.migrate(dest_uri, live=live)
@libvirt_handler
def vm_migrate_tunneled(self, name, tun_res, migtun_res, unsafe=False,
timeout=60.):
"""Live migrate VM through TCP tunnel.
:param name: VM name to migrate
:param tun_res: result of tunnel_setup handler
:param migtun_res: result of tunnel setup handler
:param bool unsafe: unsafe migration
:param float timeout: timeout for libvirt migration (prevents libvirt
from trying to acquire domain lock forever)
:param float timeout: migration timeout in seconds
"""
logger.debug('VM live migrate %s', name)
try:
# this is the port used by our libvirt in the cc-node (client
# libvirt) to connect to the remote libvirtd
remote_virt_port = tun_res['port']
except KeyError:
logger.error('Invalid formatted argument tun_res for live'
' migration')
raise
try:
# this is the port used by local libvirtd to connect to the remote
# libvirtd (see http://libvirt.org/migration.html)
remote_virt_port2 = migtun_res['port']
except KeyError:
logger.error('Invalid formatted argument migtun_res for live'
' migration')
raise
try:
vm = self.hypervisor.domains[name]
except KeyError:
logger.exception('Cannot find domain %s on hypervisor for live'
' migration', name)
raise
migration = LiveMigration(self.main, vm, remote_virt_port,
remote_virt_port2, timeout, unsafe)
try:
migration.wait()
except Exception:
logger.exception('Error during live migration for vm %s', name)
logger.debug('Exit status %d', migration.return_status)
raise
logger.info('Sucessfuly migrated vm %s', name)
@threadless
@pass_connection
@libvirt_handler
def vm_open_console(self, conn, name):
"""
:param conn: sjRPC connection instance
:param name: VM name
"""
vm = self.hypervisor.domains[name]
# create connection to the VM console
try:
endpoint = vm.open_console()
except socket.error:
# cannot create socketpair
logger.error('Cannot create connection to VM console')
raise
except Exception:
logger.exception('Error while trying to open console for domain %s',
name)
raise
def on_shutdown(tun):
"""Method of Tunnel protocol close callback."""
vm.close_console()
# connect as tunnel endpoint
proto = conn.create_tunnel(endpoint=endpoint, on_shutdown=on_shutdown)
return proto.label
@libvirt_handler
def vm_disable_virtio_cache(self, name):
"""Set virtio cache to none on VM disks.
:param name: VM name
"""
vm = self.hypervisor.domains[name]
# get VM XML
try:
xml = vm.lv_dom.XMLDesc(0)
except libvirt.libvirtError:
logger.exception('Error while getting domain XML from libvirt, %s',
vm.name)
raise
xml_tree = et.ElementTree()
xml_tree.parse(StringIO(xml))
for disk in xml_tree.findall('devices/disk'):
# check that disk is virtio
target = disk.find('target')
if target is None or target.get('bus') != 'virtio':
continue
# modify cache attr
driver = disk.find('driver')
assert driver is not None
driver.set('cache', 'none')
logger.debug('Set cache attribute for disk %s of VM %s',
target.get('dev'), name)
# write back the XML tree
out = StringIO()
xml_tree.write(out) # check encoding is fine
try:
self.hypervisor.vir_con.defineXML(out.getvalue())
except libvirt.libvirtError:
logger.exception('Cannot update XML file for domain %s', name)
raise
@libvirt_handler
def vm_set_autostart(self, name, autostart=True):
"""Set autostart on VM.
:param name: VM name
:param bool autostart: autostart value to set
"""
vm = self.hypervisor.domains[name]
vm.lv_dom.setAutostart(int(bool(autostart)))
# update autostart value now instead of 10 seconds lag
vm.tag_db['__main__']['autostart'].update_value()
@libvirt_handler
def tag_add(self, name, tag, value):
"""Add a static tag on specified VM.
:param name: VM name
:param tag: tag name
:param value: tag value
"""
vm = self.hypervisor.domains[name]
vm.set_tag(tag, value)
@libvirt_handler
def tag_delete(self, name, tag):
"""Delete a static tag on specified VM.
:param name: VM name
:param tag: tag name
"""
vm = self.hypervisor.domains[name]
vm.delete_tag(tag)
@libvirt_handler
def tag_show(self, name):
"""Show static tags of the specified VM.
:param name: VM name
"""
vm = self.hypervisor.domains[name]
return vm.tags
@libvirt_handler
def vol_create(self, pool, name, size):
logger.debug('Volume create %s, pool %s, size %s', name, pool, size)
try:
self.hypervisor.storage.create_volume(pool, name, size)
except Exception:
logger.exception('Error while creating volume')
raise
@libvirt_handler
def vol_delete(self, pool, name):
logger.debug('Volume delete %s, pool %s', name, pool)
try:
self.hypervisor.storage.delete_volume(pool, name)
except Exception:
logger.exception('Error while deleting volume')
raise
@libvirt_handler
def vol_import(self, pool, name):
"""
:param pool: pool name where the volume is
:param name: name of the volume
"""
logger.debug('Volume import pool = %s, volume = %s', pool, name)
try:
self.hypervisor.storage.update()
pool = self.hypervisor.storage.get_storage(pool)
if pool is None:
raise PoolStorageError('Pool storage does not exist')
volume = pool.volumes.get(name)
if volume is None:
raise PoolStorageError('Volume does not exist')
# create the job
job = self.main.job_manager.create(ImportVolume, volume)
job.start()
except Exception:
logger.exception('Error while starting import job')
raise
return dict(id=job.id, port=job.port)
@libvirt_handler
def vol_import_wait(self, job_id):
"""Block until completion of the given job id."""
job = self.main.job_manager.get(job_id)
logger.debug('Waiting for import job to terminate')
job.wait()
logger.debug('Import job terminated')
return dict(id=job.id, log='', checksum=job.checksum)
@libvirt_handler
def vol_import_cancel(self, job_id):
"""Cancel import job."""
logger.debug('Cancel import job')
job = self.main.job_manager.get(job_id)
self.main.job_manager.cancel(job_id)
# wait for job to end
job.join() # we don't call wait as it is already called in
# vol_import_wait handler
@libvirt_handler
def vol_export(self, pool, name, raddr, rport):
"""
:param pool: pool name where the volume is
:param name: name of the volume
:param raddr: IP address of the destination to send the volume to
:param rport: TCP port of the destination
"""
pool = self.hypervisor.storage.get_storage(pool)
if pool is None:
raise PoolStorageError('Pool storage does not exist')
volume = pool.volumes.get(name)
if volume is None:
raise PoolStorageError('Volume does not exist')
try:
job = self.main.job_manager.create(ExportVolume, volume, raddr, rport)
job.start()
job.wait()
except Exception:
logger.exception('Error while exporting volume')
raise
logger.debug('Export volume successfull')
return dict(id=job.id, log='', checksum=job.checksum)
@threadless
@rpc_handler
def tun_setup(self, local=True):
"""Set up local tunnel and listen on a random port.
:param local: indicate if we should listen on localhost or all
interfaces
"""
logger.debug('Tunnel setup: local = %s', local)
# create job
job = self.main.job_manager.create(TCPTunnel)
job.setup_listen('127.0.0.1' if local else '0.0.0.0')
return dict(
jid=job.id,
key='FIXME',
port=job.port,
)
@threadless
@rpc_handler
def tun_connect(self, res, remote_res, remote_ip):
"""Connect tunnel to the other end.
:param res: previous result of `tun_setup` handler
:param remote_res: other end result of `tun_setup` handler
:param remote_ip: where to connect
"""
logger.debug('Tunnel connect %s %s', res['jid'], remote_ip)
job = self.main.job_manager.get(res['jid'])
job.setup_connect((remote_ip, remote_res['port']))
job.start()
@threadless
@rpc_handler
def tun_connect_hv(self, res, migration=False):
"""Connect tunnel to local libvirt Unix socket.
:param res: previous result of `tun_setup` handler
"""
logger.debug('Tunnel connect hypervisor %s', res['jid'])
job = self.main.job_manager.get(res['jid'])
job.setup_connect('/var/run/libvirt/libvirt-sock')
job.start()
@threadless
@rpc_handler
def tun_destroy(self, res):
"""Close given tunnel.
:param res: previous result as givent by `tun_setup` handler
"""
logger.debug('Tunnel destroy %s', res['jid'])
job = self.main.job_manager.get(res['jid'])
self.main.job_manager.cancel(job.id)
job.wait()
@libvirt_handler
def drbd_setup(self, pool, name):
"""Create DRBD volumes.
:param pool: storage pool
:param name: storage volume name
"""
pool = self.hypervisor.storage.get_storage(pool)
if pool is None:
raise DRBDError('Cannot setup DRBD: pool storage does not exist')
elif pool.type != 'logical':
raise DRBDError('Cannot setup DRBD: pool storage is not LVM')
volume = pool.volumes.get(name)
if volume is None:
raise DRBDError('Cannot setup DRBD: volume does not exist')
try:
job = self.main.job_manager.create(DRBD, self.hypervisor.storage,
pool, volume)
except Exception:
logger.exception('Error while creating DRBD job')
raise
job.setup()
logger.debug('DRBD setup successfull')
return dict(
jid=job.id,
port=job.drbd_port,
)
@libvirt_handler
def drbd_connect(self, res, remote_res, remote_ip):
"""Set up DRBD in connect mode. (Wait for connection and try to connect
to the remote peer.
:param res: previous result of `drbd_setup` handler
:param remote_res: result of remote `drbd_setup` handler
:param remote_ip: IP of remote peer
"""
job = self.main.job_manager.get(res['jid'])
job.connect(remote_ip, remote_res['port'])
job.wait_connection()
@libvirt_handler
def drbd_role(self, res, primary):
"""Set up DRBD role.
:param res: previous result of `drbd_setup` handler
:param bool primary: if True, set up in primary mode else secondary
"""
job = self.main.job_manager.get(res['jid'])
if primary:
job.switch_primary()
else:
job.switch_secondary()
@libvirt_handler
def drbd_takeover(self, res, state):
"""Set up DRBD device as the VM disk. FIXME
:param res: previous result of `drbd_setup` handler
:param state: FIXME
"""
job = self.main.job_manager.get(res['jid'])
job.takeover()
@libvirt_handler
def drbd_sync_status(self, res):
"""Return synchronization status of a current DRBD job.
:param res: previous result of `drbd_setup` handler
"""
status = self.main.job_manager.get(res['jid']).status()
result = dict(
done=status['disk'] == 'UpToDate',
completion=status['percent'],
)
logger.debug('DRBD status %s', result)
return result
@libvirt_handler
def drbd_shutdown(self, res):
"""Destroy DRBD related block devices.
:param res: previous result of `drbd_setup` handler
"""
logger.debug('DRBD shutdown')
job = self.main.job_manager.get(res['jid'])
job.cleanup()
# remove job from job_manager list
self.main.job_manager.notify(job)
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/hypervisor/domains/ 0000775 0000000 0000000 00000000000 13444400225 0027164 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/hypervisor/domains/__init__.py 0000664 0000000 0000000 00000055020 13444400225 0031277 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import re
import errno
import logging
import socket
import weakref
from StringIO import StringIO
from xml.etree import cElementTree as et
from itertools import izip, count
from functools import partial
import pyev
import libvirt
from cloudcontrol.common.client.tags import Tag, tag_inspector, TagDB
from cloudcontrol.node.hypervisor.lib import DOMAIN_STATES as STATE, BoundVolumeProxy
from cloudcontrol.node.hypervisor.domains import vm_tags
from cloudcontrol.node.utils import SocketBuffer
from cloudcontrol.node.exc import ConsoleAlreadyOpened, ConsoleError
logger = logging.getLogger(__name__)
REGEX_TAG_NAME = '[a-zA-Z0-9_-]+'
REGEX_TAG_VALUE = '.+'
REGEX_TAG_IN_DESCRIPTION = '^@(' + REGEX_TAG_NAME + ')[ ]*?=[ ]*?(' + REGEX_TAG_VALUE + ')$'
REGEX_TAG_REPLACE = '^@%s[ ]*?=[ ]*?(' + REGEX_TAG_VALUE + ')$'
class NetworkInterface(object):
def __init__(self, source=None, mac=None, model=None, untagged_vlan=None, tagged_vlans=None):
self.source = source
self.mac = mac
self.model = model
self.untagged_vlan = untagged_vlan
self.tagged_vlans = set() if tagged_vlans is None else set(tagged_vlans)
@property
def vlans(self):
if self.untagged_vlan is not None:
yield self.untagged_vlan
for vlan in self.tagged_vlans:
yield vlan
class VirtualMachine(object):
"""Represent a VM instance."""
#: buffer size for console connection handling
BUFFER_LEN = 1024
def __init__(self, dom, hypervisor):
"""
:param dom: libvirt domain instance
:param hypervisor: hypervisor where the VM is
"""
self.hypervisor = weakref.proxy(hypervisor)
#: UUID string of domain
self.uuid = dom.UUIDString()
self.name = dom.name()
#: state of VM: started, stoped, paused
self._state = STATE[dom.info()[0]]
#: tags for this VM
self.tag_db = TagDB(tags=tag_inspector(vm_tags, self))
#: Driver cache behavior for each VM storage, see
#: http://libvirt.org/formatdomain.html#elementsDisks
self.cache_behaviour = dict()
# define dynamic tags
for i, v in izip(count(), self.iter_disks()):
for t in (
Tag('disk%s_size' % i, v.capacity, 10),
Tag('disk%s_path' % i, v.path, 10),
Tag('disk%s_pool' % i, v.storage.name, 10), # FIXME: change
Tag('disk%s_vol' % i, v.name, 10),
Tag('disk%s_cache' % i, partial(lambda x: self.cache_behaviour.get(x.path), v), 10),
Tag('disk%s_shared' % i, partial(lambda x: {True: 'yes', False: 'no'}[x.storage.is_shared], v)),
Tag('disk%s_dev' % i, v.device, 10),
Tag('disk%s_bus' % i, v.bus, 10)
):
self.tag_db.add_tag(t)
for i, nic in izip(count(), self.iter_nics()):
for t in (
Tag('nic%s_mac' % i, nic.mac),
Tag('nic%s_source' % i, nic.source),
Tag('nic%s_model' % i, nic.model),
Tag('nic%s_untagged_vlan' % i, nic.untagged_vlan),
Tag('nic%s_tagged_vlans' % i, ' '.join(str(x) for x in nic.tagged_vlans)),
Tag('nic%s_vlans' % i, ' '.join(str(x) for x in nic.vlans)),
):
self.tag_db.add_tag(t)
#: keep record of CPU stats (libev timestamp, cpu time)
self.cpu_stats = (hypervisor.handler.main.evloop.now(), dom.info()[4])
# attributes related to console handling
self.stream = None
self.sock = None # socketpair endpoint
self.read_watcher = None # ev watcher for sock
self.write_watcher = None # ev watcher for sock
self.from_tunnel = None # buffer
self.from_stream = None # buffer
self.stream_handling = 0 # libvirt stream event mask
self.redefine_on_stop = False # for XML update (see KVM class)
self._description_tags_db = TagDB(parent_db=self.tag_db)
self.sync_description_tags()
@property
def state(self):
return self._state
@state.setter
def state(self, value):
self._state = value
self.tag_db['__main__']['status'].update_value()
self.tag_db['__main__']['vncport'].update_value()
self.sync_description_tags()
@property
def title(self):
try:
return self.lv_dom.metadata(libvirt.VIR_DOMAIN_METADATA_TITLE, None)
except (libvirt.libvirtError, AttributeError):
# libvirtError handle the case where the title is not defined on the
# vm, AttributeError handle the case where the libvirt is too old
# to allow metadata handling
return None
@title.setter
def title(self, value):
try:
tags = libvirt.VIR_DOMAIN_AFFECT_CONFIG
if self._state != 'stopped':
tags = tags | libvirt.VIR_DOMAIN_AFFECT_LIVE
self.lv_dom.setMetadata(libvirt.VIR_DOMAIN_METADATA_TITLE, value,
None, None, tags)
except AttributeError:
raise NotImplementedError('This hv doesn\'t handle VM titles')
else:
self.tag_db['__main__']['t'].update_value()
@property
def description(self):
xml = self.lv_dom.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE)
descriptions = et.ElementTree().parse(StringIO(xml)).findall('description')
if descriptions:
return descriptions[0].text
else:
return ''
@description.setter
def description(self, value):
try:
xml = self.lv_dom.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE)
except libvirt.libvirtError:
logger.exception('Error while getting domain XML from libvirt, %s',
self.name)
raise
xml_tree = et.ElementTree()
root = xml_tree.parse(StringIO(xml))
desc = xml_tree.find('description')
if desc is None:
desc = et.SubElement(root, 'description')
desc.text = value
# write back the XML tree
out = StringIO()
xml_tree.write(out)
try:
self.hypervisor.vir_con.defineXML(out.getvalue())
except libvirt.libvirtError:
logger.exception('Cannot update XML file for domain %s', self.name)
raise
@property
def tags(self):
return dict(re.findall(REGEX_TAG_IN_DESCRIPTION, self.description, re.MULTILINE))
def set_tag(self, tag, value):
if not re.match(REGEX_TAG_NAME + '$', tag):
raise RuntimeError('Bad tag name')
elif not re.match(REGEX_TAG_VALUE + '$', value):
raise RuntimeError('Bad tag value')
tags = self.tags
if tag in tags:
self.description = re.sub(REGEX_TAG_REPLACE % re.escape(tag),
'@%s=%s' % (tag, value),
self.description,
flags=re.MULTILINE)
else:
self.description += '\n@%s=%s' % (tag, value)
def delete_tag(self, tag):
tags = self.tags
if tag in tags:
self.description = re.sub(REGEX_TAG_REPLACE % tag,
'',
self.description,
flags=re.MULTILINE)
@property
def lv_dom(self):
"""Libvirt domain instance."""
return self.hypervisor.vir_con.lookupByUUIDString(self.uuid)
def start(self, pause=False):
flags = 0
if pause:
flags |= libvirt.VIR_DOMAIN_START_PAUSED
self.lv_dom.createWithFlags(flags)
def sync_description_tags(self):
tags = dict(re.findall(REGEX_TAG_IN_DESCRIPTION, self.description, re.MULTILINE))
# Add/update static tags:
for k, v in tags.iteritems():
tag = self._description_tags_db['__main__'].get(k)
if tag is None:
self._description_tags_db.add_tag(Tag(k, v, -1))
else:
tag.value = v
# Purge not more defined static tags:
for k in self._description_tags_db['__main__']:
if k not in tags:
self._description_tags_db.remove_tag(k)
def stop(self):
self.lv_dom.shutdown()
def suspend(self):
self.lv_dom.suspend()
def resume(self):
self.lv_dom.resume()
def destroy(self):
self.lv_dom.destroy()
def undefine(self):
self.lv_dom.undefine()
def reset(self):
self.lv_dom.reset(0)
@property
def disks(self):
return list(self.iter_disks())
def iter_disks(self):
for d in et.ElementTree().parse(
StringIO(self.lv_dom.XMLDesc(0))
).findall('devices/disk'):
if d.get('device') != 'disk':
continue
type_ = d.get('type')
if type_ in ('file', 'block'):
path = d.find('source').get(dict(file='file', block='dev')[type_])
volume = self.hypervisor.storage.get_volume(path)
if volume is None:
continue
elif type_ == 'volume':
pool = d.find('source').get('pool')
vol = d.find('source').get('volume')
volume = self.hypervisor.storage.get_volume_by_pool(pool, vol)
elif type_ == 'network':
vol = d.find('source').get('name')
volume = self.hypervisor.storage.get_volume_by_pool('_standalone', vol)
else:
continue
# Ignore unknown volumes:
if volume is None:
logger.warn('Unknown volume for vm %s', self.name)
continue
path = volume.path
# Get target device & bus
target = d.find('target')
device = target.get('dev')
bus = target.get('bus')
# update cache behaviour
driver = d.find('driver')
if driver is None:
driver = {}
self.cache_behaviour[path] = driver.get('cache', 'default')
yield BoundVolumeProxy(volume, device, bus)
@property
def nics(self):
return list(self.iter_nics())
def iter_nics(self):
for nic in et.ElementTree().parse(
StringIO(self.lv_dom.XMLDesc(libvirt.VIR_DOMAIN_XML_INACTIVE))
).findall('devices/interface'):
if nic.get('type') == 'bridge':
try:
mac = nic.find('mac').get('address')
except AttributeError:
mac = None
try:
model = nic.find('model').get('type')
except AttributeError:
model = None
try:
source = nic.find('source').get('bridge')
except AttributeError:
source = None
untagged_vlan = None
tagged_vlans = set()
vlan = nic.find('vlan')
if vlan is None:
untagged_vlan = source[2:]
else:
tags = vlan.findall('tag')
trunk = vlan.get('trunk') == 'yes' or len(tags) > 1
default_mode = 'tagged' if trunk else 'untagged'
for tag in tags:
if tag.get('nativeMode', default_mode) == 'untagged':
untagged_vlan = int(tag.get('id'))
else:
tagged_vlans.add(int(tag.get('id')))
yield NetworkInterface(
mac=mac,
source=source,
model=model,
untagged_vlan=untagged_vlan,
tagged_vlans=tagged_vlans,
)
def open_console(self):
if self.stream is not None:
raise ConsoleAlreadyOpened('Console for this VM is already'
' opened')
if str(
self.hypervisor.handler.tag_db['__main__']['libvirtver'].value,
).startswith('8'):
raise ConsoleError(
'Cannot open console, not compatible with this version of libvirt')
logger.info('Opening console stream on VM %s', self.name)
try:
self.stream = self.hypervisor.vir_con.newStream(
libvirt.VIR_STREAM_NONBLOCK)
except libvirt.libvirtError:
logger.error('Cannot create new stream for console %s', self.name)
self.close_console()
raise
self.stream_handling = libvirt.VIR_STREAM_EVENT_READABLE | (
libvirt.VIR_STREAM_EVENT_ERROR | libvirt.VIR_STREAM_EVENT_HANGUP)
try:
self.lv_dom.openConsole(None, self.stream, 0)
self.stream.eventAddCallback(self.stream_handling,
self.virt_console_stream_cb,
None)
except libvirt.libvirtError:
logger.error('Cannot open console on domain %s', self.name)
self.close_console()
raise
try:
self.sock, tunnel_endpoint = socket.socketpair()
except socket.error:
logger.error('Cannot create socket pair for console on domain %s',
self.name)
self.close_console()
raise
try:
self.sock.setblocking(0)
except socket.error:
logger.error('Cannot set socket to non blocking for console on'
' domain %s', self.name)
self.close_console()
raise
self.read_watcher = self.hypervisor.handler.main.evloop.io(
self.sock, pyev.EV_READ, self.read_from_tun_cb)
self.read_watcher.start()
self.write_watcher = self.hypervisor.handler.main.evloop.io(
self.sock, pyev.EV_WRITE, self.write_to_tun_cb)
# self.write_watcher.start()
self.from_tunnel = SocketBuffer(4096)
self.from_stream = SocketBuffer(4096)
return tunnel_endpoint
def virt_console_stream_cb(self, stream, events, opaque):
"""Handles read/write from/to libvirt stream."""
if events & libvirt.VIR_EVENT_HANDLE_ERROR or (
events & libvirt.VIR_EVENT_HANDLE_HANGUP):
# error/hangup
# logger.debug('Received error on stream')
self.close_console()
return
if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
# logger.debug('Write to stream')
# logger.debug('Event %s', self.stream_handling)
while True:
try:
to_send = self.from_tunnel.popleft()
except IndexError:
# update libvirt event mask
self.stream_handling ^= libvirt.VIR_STREAM_EVENT_WRITABLE
self.stream.eventUpdateCallback(self.stream_handling)
break
send_buffer = to_send
total_sent = 0
while True:
try:
written = self.stream.send(send_buffer)
except:
# libvirt send error
logger.exception('Error while writing to stream')
self.close_console()
return
if written == -2: # equivalent to EAGAIN
self.from_tunnel.appendleft(to_send[total_sent:])
break
elif written == len(send_buffer):
break
total_sent += written
send_buffer = buffer(to_send, total_sent)
if not self.from_tunnel.is_full():
self.read_watcher.start()
if events & libvirt.VIR_EVENT_HANDLE_READABLE:
# logger.debug('Read from stream')
# logger.debug('Event %s', self.stream_handling)
while True:
try:
incoming = self.stream.recv(self.BUFFER_LEN)
except:
pass
if incoming == -2:
# equivalent to EAGAIN
break
elif not incoming:
# EOF
self.close_console()
return
self.from_stream.append(incoming)
if self.from_stream.is_full():
# update libvirt event mask
self.stream_handling ^= libvirt.VIR_STREAM_EVENT_READABLE
self.stream.eventUpdateCallback(self.stream_handling)
break
if not self.from_stream.is_empty():
self.write_watcher.start()
def read_from_tun_cb(self, watcher, revents):
"""Read data from tunnel and save into buffer."""
# logger.debug('Read from tunnel')
# logger.debug('Event %s', self.stream_handling)
while True:
try:
incoming = self.sock.recv(self.BUFFER_LEN)
except socket.error as exc:
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
break
logger.exception('Error reading on socket for vm console')
self.close_console()
return
if not incoming:
# EOF (we could wait before closing console stream)
self.close_console()
return
self.from_tunnel.append(incoming)
if self.from_tunnel.is_full():
self.read_watcher.stop()
break
if not self.from_tunnel.is_empty():
# update libvirt event callback
self.stream_handling |= libvirt.VIR_STREAM_EVENT_WRITABLE
self.stream.eventUpdateCallback(self.stream_handling)
def write_to_tun_cb(self, watcher, revents):
"""Write data from buffer to tunnel."""
# logger.debug('Write to tunnel')
# logger.debug('Event %s', self.stream_handling)
while True:
try:
to_send = self.from_stream.popleft()
except IndexError:
self.write_watcher.stop()
break
send_buffer = to_send
total_sent = 0
while True:
try:
written = self.sock.send(send_buffer)
except socket.error as exc:
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
self.from_stream.appenleft(to_send[total_sent:])
break
logger.exception('Error writing on socket for vm console')
self.close_console()
return
if written == len(send_buffer):
break
total_sent += written
send_buffer = buffer(to_send, total_sent)
if not self.from_stream.is_full():
# update libvirt event callback
self.stream_handling |= libvirt.VIR_STREAM_EVENT_READABLE
self.stream.eventUpdateCallback(self.stream_handling)
def close_console(self):
logger.info('Closing console stream on VM %s', self.name)
if self.stream is not None:
try:
self.stream.eventRemoveCallback()
except Exception:
logger.error('Error while removing callback on stream')
try:
self.stream.finish()
except Exception:
logger.error('Cannot finnish console stream')
self.stream = None
if self.sock is not None:
try:
self.sock.close()
except socket.error:
logger.error('Cannot close socket')
self.sock = None
if self.read_watcher is not None:
self.read_watcher.stop()
self.read_watcher = None
if self.write_watcher is not None:
self.write_watcher.stop()
self.write_watcher = None
self.from_tunnel = None
self.from_stream = None
def migrate(self, dest_uri, live=False):
volumes = list(self.iter_disks()) # Store volumes for future cleanup
migration_params = {}
# Get the list of shared, non shared disks and shared pools:
shared = set()
shared_pools = set()
nonshared = set()
for volume in volumes:
if volume.storage.is_shared:
shared.add(volume.device)
shared_pools.add(volume.storage.name)
else:
nonshared.add(volume.device)
flags = (libvirt.VIR_MIGRATE_PEER2PEER
| libvirt.VIR_MIGRATE_PERSIST_DEST
| libvirt.VIR_MIGRATE_UNDEFINE_SOURCE)
if live:
flags |= (libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_NON_SHARED_DISK)
migration_params[libvirt.VIR_MIGRATE_PARAM_MIGRATE_DISKS] = list(nonshared)
else:
flags |= libvirt.VIR_MIGRATE_OFFLINE
dconn = libvirt.open(dest_uri)
# Refresh shared pools on the destination to avoid unknown volumes:
for pool_name in shared_pools:
if pool_name.startswith('_'):
continue # Ignore internal storages
pool = dconn.storagePoolLookupByName(pool_name)
pool.refresh()
# Note that we don't reuse the existing connection to target
# because the migrate3 API doesn't support P2P migrations.
error = self.lv_dom.migrateToURI3(dest_uri, migration_params, flags)
if error:
raise RuntimeError('Unable to migrate VM')
else:
if live: # In live mode, we are responsible for source volume cleaning
for volume in volumes:
if not volume.storage.is_shared:
# Delete VM storage after migration success:
self.hypervisor.storage.delete_volume(volume.storage.name, volume.name)
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/hypervisor/domains/vm_tags.py 0000664 0000000 0000000 00000012431 13444400225 0031177 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import logging
import itertools
from functools import wraps
from xml.etree import cElementTree as et
from StringIO import StringIO
import libvirt
from cloudcontrol.node.utils import execute
from cloudcontrol.common.client.tags import ttl, refresh, background
logger = logging.getLogger(__name__)
def _vir_tag(func):
"""Catches libvirt related exception.
Decorator used for tag declarations that interacts with libvirt.
"""
@wraps(func)
def decorated(dom):
if not dom.hypervisor.handler.virt_connected:
return
try:
return func(dom)
except libvirt.libvirtError as exc:
if 'Domain not found' in str(exc):
# sometimes, libvirt tells us too late when a domain is removed
# we just ignore the error and remove the domain
dom.hypervisor.vm_unregister(dom.name)
return
logger.exception('Unexpected libvirt error')
dom.hypervisor.handler.virt_connect_restart()
return decorated
def uuid(dom):
"""Unique identifier of the domain."""
return dom.uuid.lower()
@background(max_concurrent=3)
def mode(dom):
"""Retrieve VM mode"""
mode_path = dom.hypervisor.handler.main.config.mode_script
rcode, output = execute(dom.hypervisor.handler.main, [mode_path, '-m', dom.name])
if rcode == 0:
try:
res = output.rsplit(':', 1)[-1].strip()
except IndexError:
res = 'error'
else:
res= 'error'
return res
def status(dom):
return dom.state
def hv(dom):
# FIXME: what shoud be the value of this tag ?
return dom.hypervisor.name
def htype(dom):
return dom.hypervisor.type
@_vir_tag
def arch(dom):
"""VM CPU architecture."""
try:
return dict(i686='x86', x86_64='x64')[et.ElementTree().parse(
StringIO(dom.lv_dom.XMLDesc(0))).find('os/type').get('arch')]
except Exception:
logger.exception('Error while get Architecture tag')
def h(dom):
"""Name of the VM."""
return dom.name
def t(dom):
"""Title of the VM (or name if title is not defined)."""
if dom.title is None:
return dom.name
else:
return dom.title
@_vir_tag
def cpu(dom):
"""Number of CPU of the VM."""
return dom.lv_dom.info()[3]
@ttl(10)
@refresh(5)
@_vir_tag
def cpuuse(dom):
"""Represent CPU use in percent average on 5 seconds."""
state, _, _, vcpu, cpu_time = dom.lv_dom.info()
if state != 1: # if VM is not running
return None
old_cpu_stats = dom.cpu_stats
dom.cpu_stats = (dom.hypervisor.handler.main.evloop.now(), cpu_time)
# calculate CPU percentage, code is from virt-manager in domain.py:1210
return '%.2f' % max(0., min(100., ((cpu_time - old_cpu_stats[1]) * 100.) / (
(dom.cpu_stats[0] - old_cpu_stats[0]) * 1000. * 1000. * 1000.) / vcpu))
@_vir_tag
def mem(dom):
"""Memory currently allocated."""
if dom.state == 'stopped':
return dom.lv_dom.info()[1] * 1024
else:
return dom.lv_dom.info()[2] * 1024
@_vir_tag
def memmax(dom):
"""Maximum memory allocation."""
return dom.lv_dom.info()[1] * 1024
@_vir_tag
def vncport(dom):
"""VNC port for the VM console access."""
try:
port = et.ElementTree().parse(
StringIO(dom.lv_dom.XMLDesc(0))
).find('devices/graphics[@type="vnc"]').get('port')
except Exception:
logger.exception('VNCPort')
raise
try:
if 0 < int(port) < 65536:
return port
except (TypeError, ValueError):
pass
return
@_vir_tag
def spiceport(dom):
"""Spice port for the VM console access."""
try:
port = et.ElementTree().parse(
StringIO(dom.lv_dom.XMLDesc(0))
).find('devices/graphics[@type="spice"]').get('port')
except Exception:
logger.exception('SpicePort')
raise
try:
if 0 < int(port) < 65536:
return port
except (TypeError, ValueError):
pass
return
@ttl(10)
@refresh(10)
@_vir_tag
def disk(dom):
"""Get backend disks."""
return u' '.join(map(str, xrange(len(dom.disks)))) or None
@ttl(10)
@refresh(10)
@_vir_tag
def nic(dom):
"""VM network interfaces."""
return u' '.join(map(str, xrange(len(dom.nics)))) or None
@ttl(10)
@refresh(10)
@_vir_tag
def nic_vlans(dom):
"""VM network interfaces."""
vlans = set(itertools.chain(*[x.vlans for x in dom.nics]))
return u' '.join(str(x) for x in sorted(vlans)) or None
@refresh(10)
@_vir_tag
def autostart(dom):
"""Autostart status."""
return {True: 'yes', False: 'no'}[bool(dom.lv_dom.autostart())]
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/hypervisor/jobs.py 0000664 0000000 0000000 00000104457 13444400225 0027054 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import io
import os
import errno
import socket
import logging
from functools import partial
from os.path import exists as path_exists
from time import sleep
from hashlib import md5
from StringIO import StringIO
from subprocess import CalledProcessError
import sys
from xml.etree import ElementTree as et
import pyev
from cloudcontrol.common.jobs import Job
from cloudcontrol.node.exc import TunnelError, DRBDAllocationError, DRBDError
from cloudcontrol.node.jobs import BaseIOJob, ForkedJob
from cloudcontrol.node.utils import SocketBuffer, subproc_call, Singleton
logger = logging.getLogger(__name__)
class ImportVolume(BaseIOJob):
"""Import volume job.
"""
BUFFER_LEN = 8192 * 16
HASH = md5
def __init__(self, job_manager, volume):
BaseIOJob.__init__(self, job_manager)
self.checksum = None
self.volume = volume
# where the other node will connect
self.port = None
# fds
self.sock = None
self.client_sock = None
self.disk = None
@property
def open_fds(self):
return [fo.fileno() for fo in (self.sock, self.client_sock, self.disk)
if fo is not None]
def clean_fds(self):
if self.sock is not None:
self.sock.close()
self.sock = None
if self.client_sock is not None:
self.client_sock.close()
self.client_sock = None
if self.disk is not None:
self.disk.close()
self.disk = None
def pre_job(self):
"""
:returns: port number the socket is listening on
"""
# create socket
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
logger.exception('Error while creating socket for volume export')
self.clean_fds()
raise
try:
self.sock.settimeout(10.)
except socket.error:
logger.exception('Cannot set timeout on socket for volume export')
self.clean_fds()
raise
try:
self.sock.bind(('0.0.0.0', 0))
except socket.error:
logger.exception('Error while binding socket for volume export')
self.clean_fds()
raise
try:
self.sock.listen(1)
except socket.error:
logger.exception('Error while listening on socket')
self.clean_fds()
raise
# open local disk
try:
self.disk = io.open(self.volume.path, 'wb', 0)
except IOError:
logger.exception('Error while trying to open local disk')
self.clean_fds()
raise
self.port = self.sock.getsockname()[1]
return self.port
def run_job(self):
try:
self.client_sock, _ = self.sock.accept()
except socket.timeout:
sys.stderr.write('Error for importing job: client did not connect\n')
self.clean_fds()
raise
except socket.error:
sys.stderr.write('Error while accepting socket\n')
self.clean_fds()
raise
# close the listening socket
self.sock.close()
self.sock = None
checksum = self.HASH()
# start downloading disk image
while self.running:
try:
received = [] # keep a list of received buffers in order to do
# only one concatenation in the end
total_received = 0
while True:
recv_buf = self.client_sock.recv(self.BUFFER_LEN - total_received)
# sys.stderr.write('Received %d\n' % len(recv_buf))
if not recv_buf: # EOF
# in case received in not empty, we will come back here
# once again and it returns EOF one more time
break
total_received += len(recv_buf)
received.append(recv_buf)
if total_received == self.BUFFER_LEN:
break
except socket.error:
sys.stderr.write('Error while receiving disk image\n')
self.clean_fds()
raise
buffer_ = b''.join(received)
if not buffer_:
sys.stderr.write('Received EOF import job\n')
break
checksum.update(buffer_)
try:
written = 0
# FIXME never write small chuncks
# in which case does disk.write would not write all the buffer ?
to_send = buffer_
while True:
written += self.disk.write(to_send)
# sys.stderr.write('Written %s to disk\n' % written)
to_send = buffer(buffer_, written)
if not to_send:
break
except IOError:
sys.stderr.write('Error while writing image to disk\n')
self.clean_fds()
raise
# here we could not have received the full disk but we don't consider
# this as an error in the import part
self.checksum = checksum.hexdigest()
# clean the fds
self.clean_fds()
sys.stderr.write('Volume import done\n')
class ExportVolume(BaseIOJob):
"""Export volume job.
"""
BUFFER_LEN = 8192 * 16
HASH = md5
def __init__(self, job_manager, volume, raddr, rport):
"""
:param volume: :class:`Volume` instance
:param raddr: remote IP address
:param rport: remote TCP port
"""
BaseIOJob.__init__(self, job_manager)
# where to connect to send the volume
self.raddr = raddr
self.rport = rport
self.volume = volume
self.checksum = None
# fds
self.sock = None
self.disk = None
@property
def open_fds(self):
return [fo.fileno() for fo in (self.sock, self.disk)
if fo is not None]
def clean_fds(self):
if self.sock is not None:
self.sock.close()
self.sock = None
if self.disk is not None:
self.disk.close()
self.disk = None
def pre_job(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# connect to the remote host
try:
self.sock.connect((self.raddr, self.rport))
except socket.error as exc:
logger.exception('Error while trying to connect to remote host %s',
exc.strerror)
self.clean_fds()
raise
# open local volume
try:
self.disk = io.open(self.volume.path, 'rb', 0)
except IOError:
logger.exception('Error while opening disk for export job')
self.clean_fds()
raise
def run_job(self):
checksum = self.HASH()
# sent_count = 0
# do copy
while self.running:
try:
read = self.disk.read(self.BUFFER_LEN)
except IOError:
sys.stderr.write('Error while reading from disk\n')
self.clean_fds()
break
# read length may be less than BUFFER_LEN but we don't care as it
# will go over TCP
if not read: # end of file
# sys.stderr.write('EOF, exported %d bytes\n' % sent_count)
break
# sent_count += len(read)
# sys.stderr.write('Read %d from disk\n' % len(read))
checksum.update(read)
try:
self.sock.sendall(read)
except socket.error:
sys.stderr.write('Error while sending through socket\n')
self.clean_fds()
break
self.checksum = checksum.hexdigest()
self.clean_fds()
class TCPTunnel(ForkedJob):
"""Handles a TCP tunnel."""
BUFFER_LEN = 8096
def __init__(self, job_manager, connect=None, listen='0.0.0.0'):
"""
:param job_manager: :class:`JobManager` instance
:param connect: where to connect one end of the tunnel (a tuple, as
given to socket.connect)
:param listen: which interface to listen to for the other end of the
tunnel
"""
ForkedJob.__init__(self, job_manager)
# create a new libev loop that will run inside our child
self.ev_loop = pyev.Loop()
self.connect = connect
self.listen = listen
#: port is assigned by the kernel
self.port = None
# keep state information for both ends
self.listen_state = 'CLOSED'
self.connect_state = 'CLOSED'
#: very basic error report
self.error = None
# these are the watchers
self.source_reader = None
self.source_writer = None
self.dest_reader = None
self.dest_writer = None
#: source_sock is the socket that will listen for remote|local to happen
self.source_sock = None
#: dest sock connects to an other setuped tunnel
self.dest_sock = None
# input buffer is used for data that is coming from source_sock and goes
# to dest_sock
self.input_buffer = SocketBuffer()
# output_buffer is usde for data that is coming from dest_sock and goes
# to source_sock
self.output_buffer = SocketBuffer()
@property
def open_fds(self):
return [fo.fileno() for fo in (self.source_sock, self.dest_sock)
if fo is not None]
def after_fork(self):
self.ev_loop.reset()
def close(self):
# as this could be called from child, don't use logger (this is for
# debug anyway)
sys.stderr.write('Closing job %d' % self.id)
# stop watchers
if self.source_reader is not None:
self.source_reader.stop()
self.source_reader = None
if self.source_writer is not None:
self.source_writer.stop()
self.source_writer = None
if self.dest_reader is not None:
self.dest_reader.stop()
self.dest_reader = None
if self.dest_writer is not None:
self.dest_writer.stop()
self.dest_writer = None
# close sockets
if self.source_sock is not None:
self.source_sock.close()
self.source_sock = None
if self.dest_sock is not None:
self.dest_sock.close()
self.dest_sock = None
# clear buffers (this memory won't be needed anyway)
self.input_buffer = None
self.output_buffer = None
# reset states
self.listen_state = 'CLOSED'
self.connect_state = 'CLOSED'
def stop(self):
self.close()
def setup_listen(self, interface=None):
"""Setup source socket.
:param interface: specify which interface to listen onto
"""
if interface is not None:
self.listening = interface
logger.debug('Setup listening %s %d', self.listen, self.id)
try:
self.source_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
logger.exception('Error while creating source_sock for tunnel job'
' %d', self.id)
self.close()
raise
try:
self.source_sock.setblocking(0)
except socket.error:
logger.exception('Cannot set source_sock in blocking mode for'
' tunnel job %d', self.id)
self.close()
raise
try:
self.source_sock.bind((self.listen, 0))
except socket.error:
logger.exception('Error while binding source_sock for tunnel job'
' %d', self.id)
self.close()
raise
self.port = self.source_sock.getsockname()[1]
logger.debug('Listening on port %s', self.port)
try:
self.source_sock.listen(1)
except socket.error:
logger.exception('Error while listening on source_sock for tunnel'
' job %d', self.id)
self.close()
raise
self.listen_state = 'LISTENING'
# ready to accept
self.source_reader = self.ev_loop.io(self.source_sock,
pyev.EV_READ, self.accept_cb)
self.source_reader.start()
def setup_connect(self, endpoint=None):
"""Start connection to remote end.
:param endpoint: specify where to connect (same as connect argument in
constructor), can be specified in both places
"""
if endpoint is not None:
self.connect = endpoint
if self.connect is None:
raise TunnelError('Remote endpoint to connect to was not specified')
logger.debug('Connect to endpoint %s %d', self.connect, self.id)
try:
if isinstance(self.connect, tuple):
addr_family = socket.AF_INET
else:
addr_family = socket.AF_UNIX
self.dest_sock = socket.socket(addr_family, socket.SOCK_STREAM)
except socket.error:
logger.exception('Error while creating dest_sock for tunnel job'
' %d', self.id)
self.close()
raise
try:
self.dest_sock.setblocking(0)
except socket.error:
logger.exception('Error while sitting non block mode on dest_sock'
' for tunnel job %d', self.id)
raise
error = self.dest_sock.connect_ex(self.connect)
if error and error != errno.EINPROGRESS:
raise socket.error('Error during connect for tunnel job, %s' %
os.strerror(error))
self.dest_writer = self.ev_loop.io(self.dest_sock,
pyev.EV_WRITE, self.connect_cb)
self.dest_writer.start()
self.connect_state = 'CONNECTING'
def run_job(self):
sys.stderr.write('Will start ev loop in child\n')
self.ev_loop.start()
def accept_cb(self, watcher, revents):
try:
new_source, remote = self.source_sock.accept()
except socket.error as exc:
if exc.errno == errno.EAGAIN or errno.EWOULDBLOCK:
# we will come back
return
# else
self.fatal_exc('Error while accepting new connection on'
' sock_source for tunnel job')
# everything went fine
self.source_sock.close() # we won't accept connections
self.source_sock = new_source
# set new socket non blocking
try:
self.source_sock.setblocking(0)
except socket.error as exc:
self.fatal_exc('Cannot set source socket in non blocking for'
' tunnel job: %s', exc.strerror)
self.source_reader.stop()
self.source_reader = self.ev_loop.io(new_source, pyev.EV_READ,
self.read_cb)
self.source_writer = self.ev_loop.io(new_source, pyev.EV_WRITE,
self.write_cb)
sys.stderr.write('Successfully accepted remote client %s for tunnel'
' job %d\n' % (remote, self.id))
self.listen_state = 'CONNECTED'
if self.connect_state == 'CONNECTED':
# start the watchers only if both ends are ready to accept data
self.source_reader.start()
self.dest_reader.start()
def connect_cb(self, watcher, revents):
# check that connection was a success
error = self.dest_sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
if error:
self.fatal('Error during connect for tunnel job, %s\n' %
os.strerror(error))
# else we setup watcher with proper events
self.dest_reader = self.ev_loop.io(self.dest_sock, pyev.EV_READ,
self.read_cb)
self.dest_writer.stop()
self.dest_writer = self.ev_loop.io(self.dest_sock, pyev.EV_WRITE,
self.write_cb)
sys.stderr.write('Successfully connected to remote endpoint %s %d\n' %
(self.connect, self.id))
self.connect_state = 'CONNECTED'
if self.listen_state == 'CONNECTED':
# start the watchers only if both ends are ready to accept data
self.source_reader.start()
self.dest_reader.start()
def read_cb(self, watcher, revents):
if watcher == self.dest_reader:
# sys.stderr.write('Read event on dest %s\n' % self.id)
sock = self.dest_sock
buffer_ = self.output_buffer
other_watcher = self.source_writer
else:
# sys.stderr.write('Read event on source %s\n' % self.id)
sock = self.source_sock
buffer_ = self.input_buffer
other_watcher = self.dest_writer
# sys.stderr.write('Will loop into event\n')
while True:
try:
incoming = sock.recv(self.BUFFER_LEN)
except socket.error as exc:
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
# sys.stderr.write('EAGAIN\n')
break
# else: unexpected error
self.fatal_exc('Unexpected error while reading on socket'
' for tunnel job, %s\n', exc.strerror)
if not incoming:
# EOF
# sys.stderr.write('EOF\n')
self.close()
return
# sys.stderr.write('Read %d bytes\n' % len(incoming))
buffer_.append(incoming)
if buffer_.is_full():
# sys.stderr.write('Buffer is full\n')
watcher.stop()
break
# we did read some bytes that we could write to the other end
if not buffer_.is_empty():
# sys.stderr.write('Starting other watcher\n')
other_watcher.start()
# sys.stderr.write('Read event done\n')
def write_cb(self, watcher, revents):
if watcher == self.dest_writer:
# sys.stderr.write('Write event on dest %s', self.id)
sock = self.dest_sock
buffer_ = self.input_buffer
other_watcher = self.source_reader
else:
# sys.stderr.write('Write event on source %s\n' % self.id)
sock = self.source_sock
buffer_ = self.output_buffer
other_watcher = self.dest_reader
while True:
try:
to_send = buffer_.popleft()
except IndexError:
# buffer is empty, we should stop write event
# sys.stderr.write('Buffer is empty\n')
watcher.stop()
break
send_buffer = to_send
total_sent = 0
while True:
try:
written = sock.send(send_buffer)
except socket.error as exc:
if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK):
buffer_.appendleft(to_send[total_sent:])
# sys.stderr.write('EAGAIN\n')
break
# else: unexpected error
self.fatal_exc('Unexpected error while writting on socket'
' for tunnel job, %s', exc.strerror)
# sys.stderr.write('Written %d bytes\n' % written)
if written == len(send_buffer):
break
# else
total_sent += written
send_buffer = buffer(to_send, total_sent)
# if we can read on the other end
if not buffer_.is_full():
# sys.stderr.write('Starting other watcher\n')
other_watcher.start()
# sys.stderr.write('Proccessed write event\n')
class DRBDAllocator(object):
"""Keeps a list of allocated DRBD devices."""
__metaclass__ = Singleton
RMMOD = '/sbin/rmmod'
MODPROBE = '/sbin/modprobe'
#: maximum number of DRBD devices
MINOR_MAX = 100
def __init__(self, main_loop):
self.volumes = set()
self.subproc_call = partial(subproc_call, main_loop)
self.reload_kernel_module()
def new_volume(self):
for i in xrange(self.MINOR_MAX):
if i not in self.volumes:
self.volumes.add(i)
break
else:
raise DRBDAllocationError('Cannot allocate DRBD volume')
return i
def remove_volume(self, id_):
self.volumes.remove(id_)
def reload_kernel_module(self):
# FIXME find an other way to set parameters to drbd module
# try to remove kernel module
try:
self.subproc_call([self.RMMOD, 'drbd'])
except CalledProcessError:
# this is not an error if drbd module wasn't loaded
if 'drbd' in open('/proc/modules').read():
logger.error('Cannot remove drbd kernel module')
raise
# load kernel module with proper parameters
try:
# we use greater minor_count than the default which seems to small.
# we set usermode helper to bin true because by default, the module
# is calling some drbd helpers that returns non 0 value and make the
# synchronisation halt.
self.subproc_call([self.MODPROBE, 'drbd',
'minor_count=%d' % self.MINOR_MAX,
'usermode_helper=/bin/true'])
except CalledProcessError:
logger.error('Cannot load drbd kernel module')
class DRBD(object):
"""Manage DRBD job."""
DMSETUP = '/sbin/dmsetup'
DRBDSETUP = '/sbin/drbdsetup'
DRBDMETA = '/sbin/drbdmeta'
DRBD_TIMEOUT = '30'
DRBD_RATE = '50000'
def __init__(self, job_manager, storage_index, lvm_pool, lvm_volume):
"""
:param job_manager: :class:`JobManager` instance
:param storage_index: :class:`StorageIndex` instance
:param lvm_pool: :class:`Storage` instance
:param lvm_volume: :class:`Volume` instance
"""
#: job id
self.id = job_manager.job_id.next()
self.subproc_call = partial(subproc_call, job_manager.main)
self.allocator = DRBDAllocator(job_manager.main)
# define a set of states
self.state = 'INIT'
self.storage = storage_index
self.pool = lvm_pool
self.volume = lvm_volume
self.meta_volume = None
#: DRBD id as returned by DRBDAllocator
self.drbd_id = None
self.drbd_port = None
#: DRBD device full path
self.drbd_path = None
self.drbd_table = None
self.drbd_status = dict(conn=None)
#: name of DM copy of LV
self.dm_table = None
self.dm_copy = '%s-%s.copy' % (
'vg', self.volume.name.replace('-', '--'))
# each step is executed in the RPC call thread, thus exception are
# propagated directly to the cc-server
def stop(self):
pass
def cleanup(self):
# reset DM to initial state
try:
table = self.subproc_call([self.DMSETUP, 'table', self.volume.path])
except CalledProcessError:
logger.error('Error while getting table of VM LV')
else:
if table != self.dm_table:
try:
self.subproc_call([self.DMSETUP, 'load', self.volume.path],
self.dm_table)
self.subproc_call([self.DMSETUP, 'suspend', self.volume.path])
self.subproc_call([self.DMSETUP, 'resume', self.volume.path])
except CalledProcessError:
logger.error('Error while loading back VM LV table')
# FIXME this is kind of critical, we should tell the user to
# call a Gaetant
# stop drbd volume
# if path_exists(self.drbd_path):
if self.drbd_id is not None:
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'disconnect'])
except CalledProcessError:
logger.error('Error while disconnecting DRBD device %s',
self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'secondary'])
except CalledProcessError:
logger.error('Error while switching DRBD device to secondary'
' (%s)', self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'detach'])
except CalledProcessError:
logger.error('Error while detaching DRBD device %s',
self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'down'])
except CalledProcessError:
logger.error('Error while bringing down DRBD device %s',
self.drbd_path)
self.allocator.remove_volume(self.drbd_id)
self.drbd_id = None
self.drbd_port = None
self.drbd_path = None
self.drbd_table = None
self.drbd_status = dict(conn=None)
# remove drbd meta volume
if self.meta_volume is not None:
try:
self.storage.delete_volume(
self.pool.name,
self.volume.name + '.drbdmeta',
)
except: # FIXME
logger.exception('Error while removing DRBD metadata LV')
self.meta_volume = None
# remove copy DM
if path_exists('/dev/mapper/' + self.dm_copy):
try:
self.subproc_call([self.DMSETUP, 'remove', self.dm_copy])
except CalledProcessError:
logger.error('Error while removing DM copy')
self.dm_table = None
# set mapper
def setup(self):
logger.debug('Create DRBD meta device')
self.meta_volume = self.storage.create_volume(
self.pool.name,
self.volume.name + '.drbdmeta',
# see
# http://www.drbd.org/users-guide/ch-internals.html#s-meta-data-size
# for external metadata size calculation
max(self.volume.capacity / 32768 + 4 * 2 ** 20, 128 * 2 ** 20),
)
logger.debug('Create a copy DM of the LV')
# get LV table
try:
self.dm_table = self.subproc_call([self.DMSETUP, 'table', '--showkeys', self.volume.path])
except CalledProcessError:
logger.error('Cannot get DM table of VM LV')
raise DRBDError('Cannot get DM table of VM LV')
# create new DM
logger.debug('Got table of LV "%s"', self.dm_table)
try:
self.subproc_call([self.DMSETUP, 'create', self.dm_copy], self.dm_table)
except CalledProcessError:
logger.error('Cannot create copy DM of LV with table "%s"',
self.dm_table)
raise
logger.debug('Setup DRBD device')
# get drbd path
self.drbd_id = self.allocator.new_volume()
self.drbd_port = 7788 + self.drbd_id # FIXME magic number
self.drbd_path = '/dev/drbd%d' % self.drbd_id
# wipe drbd metadata (just in case)
try:
self.subproc_call([self.DRBDMETA, '--force', self.drbd_path,
'v08', self.meta_volume.path, '0', 'wipe-md'])
except CalledProcessError:
pass
try:
self.subproc_call([self.DRBDMETA, '--force', self.drbd_path,
'v08', self.meta_volume.path, '0', 'create-md'])
except CalledProcessError:
logger.error('Cannot create DRBD external metadata on device')
raise DRBDError('Cannot create DRBD metadata')
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'disk',
'/dev/mapper/%s' % self.dm_copy,
self.meta_volume.path,
'0', '--create-device'])
except CalledProcessError:
logger.error('Error while creating DRBD device')
raise DRBDError('Cannot create DRBD device')
self.drbd_table = '0 %d linear %s 0' % (
self.volume.capacity / 512, # FIXME comment
self.drbd_path,
)
logger.debug('Setup DRBD done')
self.state = 'SETUP'
def connect(self, remote_addr, remote_port):
logger.debug('Setup networking for DRBD')
# connect to remote node
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'net',
'0.0.0.0:%d' % self.drbd_port,
'%s:%d' % (remote_addr, remote_port),
'C', '-m', '-S', '10000000'])
except CalledProcessError:
logger.error('Error while setting up network facility for DRBD')
raise DRBDError('Cannot set up network for DRBD')
sleep(.5) # FIXME
logger.debug('Set up bandwidth limit')
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'syncer', '-r', self.DRBD_RATE])
except CalledProcessError:
logger.error('Cannot set bandwidth rate limit on DRBD')
raise DRBDError('Error while setting bandwidth limit')
self.state = 'CONNECTED'
def wait_connection(self):
self.state = 'WAIT PEER CONNECT'
sleep(.5) # FIXME
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'wait-connect',
'-t', self.DRBD_TIMEOUT,
'-d', self.DRBD_TIMEOUT,
'-o', self.DRBD_TIMEOUT])
except CalledProcessError:
logger.error('Error while waiting for remote DRBD to connect,'
' timeout = %s', self.DRBD_TIMEOUT)
raise DRBDError('Error while waiting DRBD connect')
sleep(.5) # FIXME
self.state = 'CONNECTED'
def switch_primary(self):
logger.debug('Switch DRBD %s in primary mode', self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'primary', '-o'])
except CalledProcessError:
logger.error('Error while switching to primary role (%s)',
self.drbd_path)
raise DRBDError('Cannot switch to primary role')
self.state = 'CONNECTED PRIMARY'
def switch_secondary(self):
logger.debug('Switch DRBD %s in secondary mode', self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'secondary'])
except CalledProcessError:
logger.error('Error while switching to secondary role (%s)',
self.drbd_path)
raise DRBDError('Cannot switch to secondary role')
self.state = 'CONNECTED SECONDARY'
def wait_sync(self):
self.state = 'WAIT SYNC'
sleep(.5) # FIXME
logger.debug('Wait sync %s', self.drbd_path)
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'wait-sync'])
except CalledProcessError:
logger.error('Error while waiting for synchronisation of DRBD'
' device (%s)', self.drbd_path)
raise DRBDError('Wait sync error')
self.state = 'SYNC DONE'
def disconnect(self):
try:
self.subproc_call([self.DRBDSETUP, self.drbd_path, 'disconnect'])
except CalledProcessError:
logger.error('Error while disconnecting DRBD device %s',
self.drbd_path)
raise DRBDError('Cannot disconnect device')
self.state = 'DISCONNECTED'
def status(self):
"""DRBD status."""
try:
out = self.subproc_call([self.DRBDSETUP, self.drbd_path, 'status'])
except CalledProcessError:
logger.error('Error while getting DRBD status (%s)', self.drbd_path)
raise DRBDError('Status: error while executing DRBD status')
try:
status = et.ElementTree().parse(StringIO(out))
except:
logger.error('Error while parsing status command output for DRBD'
' device %s', self.drbd_path)
raise DRBDError('Status: cannot parse output')
self.drbd_status = dict(
conn=status.get('cs'),
disk=status.get('ds1'),
rdisk=status.get('ds2'),
role=status.get('ro1'),
rrole=status.get('ro2'),
percent=status.get('resynced_percent', None),
)
return self.drbd_status
def takeover(self):
"""Set up DRBD device as VM backing device."""
logger.debug('DRBD takeover %s', self.drbd_path)
assert self.drbd_table is not None
try:
self.subproc_call([self.DMSETUP, 'load', self.volume.path],
self.drbd_table)
except CalledProcessError:
logger.error('Error while loading new table for VM LV')
raise DRBDError('Takeover: cannot load DM table')
try:
self.subproc_call([self.DMSETUP, 'suspend', self.volume.path])
except CalledProcessError:
logger.error('Error while suspending VM LV')
raise DRBDError('Takeover: cannot suspend DM')
try:
self.subproc_call([self.DMSETUP, 'resume', self.volume.path])
except CalledProcessError:
logger.error('Error while resuming VM LV')
raise DRBDError('Takeover: cannot resume DM')
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/hypervisor/kvm.py 0000664 0000000 0000000 00000042035 13444400225 0026705 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
"""KVM hypervisor support."""
import re
import os
import sys
import signal
import logging
import weakref
import threading
import traceback
import libvirt
from cloudcontrol.common.client.utils import main_thread
from cloudcontrol.common.client.tags import ParentWrapper
from cloudcontrol.node.hypervisor.lib import (
DOMAIN_STATES, EVENTS,
EventLoop as VirEventLoop,
StorageIndex,
)
from cloudcontrol.node.hypervisor.domains import VirtualMachine
from cloudcontrol.node.utils import close_fds, set_signal_map, num_to_sig
from cloudcontrol.node.exc import VMMigrationError
logger = logging.getLogger(__name__)
BAD_VM_NAME = re.compile('^job-(0|([1-9]\d*))$')
# FIXME create abstract base class for any hypervisor
class KVM(object):
"""Container for all hypervisor related state."""
def __init__(self, name, handler):
"""
:param str name: name of hypervisor instance
:param Handler handler: hypervisor handler
"""
self.handler = weakref.proxy(handler)
#: hv attributes
self.name = name
self.type = u'kvm'
# register libvirt error handler
libvirt.registerErrorHandler(self.vir_error_cb, None)
# libvirt event loop abstraction
self.vir_event_loop = VirEventLoop(self.handler.main.evloop)
self.vir_con = libvirt.open('qemu:///system') # currently only support KVM
# findout storage
self.storage = StorageIndex(handler, self.vir_con)
logger.debug('Storages: %s', self.storage.paths)
#: domains: vms, containers...
self.domains = dict()
# find defined domains
for dom_name in self.vir_con.listDefinedDomains():
dom = self.vir_con.lookupByName(dom_name)
self.domains[dom.name()] = VirtualMachine(dom, self)
# find started domains
for dom_id in self.vir_con.listDomainsID():
dom = self.vir_con.lookupByID(dom_id)
self.domains[dom.name()] = VirtualMachine(dom, self)
logger.debug('Domains: %s', self.domains)
self.vir_con.domainEventRegister(self.vir_cb, None)
self.vir_cb_id_dev_add = self.vir_con.domainEventRegisterAny(None,
libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_ADDED,
self.vir_cb_devices, None)
self.vir_cb_id_dev_del = self.vir_con.domainEventRegisterAny(None,
libvirt.VIR_DOMAIN_EVENT_ID_DEVICE_REMOVED,
self.vir_cb_devices, None)
def stop(self):
self.vir_event_loop.stop()
# unregister callback
try:
self.vir_con.domainEventDeregister(self.vir_cb)
self.vir_con.domainEventDeregisterAny(self.vir_cb_id_dev_add)
self.vir_con.domainEventDeregisterAny(self.vir_cb_id_dev_del)
except libvirt.libvirtError:
# in case the libvirt connection is broken, it will raise the error
pass
ret = self.vir_con.close()
logger.debug('Libvirt still handling %s ref connections', ret)
def vir_error_cb(self, ctxt, err):
"""Libvirt error callback.
See http://libvirt.org/errors.html for more informations.
:param ctxt: arbitrary context data (not needed because context is
givent by self
:param err: libvirt error code
"""
logger.error('Libvirt error %s', err)
def vir_cb(self, conn, dom, event, detail, opaque):
"""Callback for libvirt event loop."""
logger.debug('Received event %s on domain %s, detail %s', event,
dom.name(), detail)
event = EVENTS[event]
if event == 'Added':
# prevents name conflicts with others cc-node objects
if BAD_VM_NAME.match(dom.name()):
logger.error('Cannot register VM %s as its name would '
'conflits with others cc-node objects')
return
# update Storage pools in case VM has volumes that were created
self.storage.update(retry=10)
# if vm is redefined while running we need to refresh its devices
# when stopped
redefine_on_stop = False
if dom.name() in self.domains:
# sometimes libvirt send us the same event multiple times
# this can be the result of a change in the domain configuration
# we first remove the old domain
vm = self.domains.pop(dom.name())
vm.tag_db.set_parent(None)
if vm.state not in ('stopped', 'crashed'):
# if the vm was updated while it was "on", then the
# modifications will not be reflected since we construct the
# object/tags from running XML
redefine_on_stop = True
logger.debug('Domain %s recreated', dom.name())
self.vm_register(dom, redefine_on_stop)
elif event == 'Removed':
vm_name = dom.name()
self.vm_unregister(vm_name)
logger.info('Removed domain %s', vm_name)
elif event in ('Started', 'Suspended', 'Resumed', 'Stopped', 'Saved',
'Restored'):
vm = self.domains.get(dom.name())
# sometimes libvirt sent a start event before a created event so be
# careful
if vm is not None:
try:
state = DOMAIN_STATES[dom.info()[0]]
except libvirt.libvirtError as exc:
# checks that domain was not previously removed
# seems to happen only in libvirt 0.8.8
if 'Domain not found' in str(exc):
self.vm_unregister(dom.name())
else:
raise
else:
logger.info('Domain change state from %s to %s', vm.state, state)
if event == 'Stopped' and vm.redefine_on_stop:
# if the vm was changed while it was running, then we
# need to recreate it now as stated above
self.vm_unregister(vm.name)
self.vm_register(dom)
else:
vm.state = state
self.update_domain_count()
def vir_cb_devices(self, conn, dom, device, opaque):
"""Callback for device add/removed from a domain."""
logger.debug('Received device event on domain %s, dev %s', dom.name(), device)
vm = self.domains.get(dom.name())
if vm is not None:
# Unregister/register VM to update devices list:
self.vm_unregister(vm.name)
self.vm_register(dom)
def vm_register(self, dom, redefine_on_stop=False):
"""Register a VM to the hypervisor object.
:param dom: libvirt domain instance
:param redefine_on_stop: if we need to reread the domain XML on stop
"""
vm = VirtualMachine(dom, self)
logger.info('Created domain %s', vm.name)
vm.redefine_on_stop = redefine_on_stop
self.domains[vm.name] = vm
vm.tag_db.set_parent(ParentWrapper(vm.name, 'vm', self.handler.tag_db))
self.update_domain_count()
def vm_unregister(self, name):
"""Unregister a VM from the cc-server and remove it from the index."""
try:
vm = self.domains.pop(name)
except KeyError:
# domain already removed, see hypervisor/domains/vm_tags.py
# sometimes libvirt send us the remove event too late
# we still update storage and tag attributes
pass
else:
vm.tag_db.set_parent(None)
# update Storage pools in case VM had volumes that were deleted
self.storage.update()
self.update_domain_count()
def update_domain_count(self):
"""Update domain state count tags."""
# update domain state counts
for tag in ('nvm', 'vmpaused', 'vmstarted', 'vmstopped', 'cpualloc',
'cpurunning', 'cpuremaining', 'cpuallocratio', 'memalloc',
'memrunning', 'memremaining', 'memallocratio'):
self.handler.tag_db['__main__'][tag].update_value()
def vm_define(self, xml_desc):
"""Create a VM on the Hypervisor
:param str xml_desc: XML description in libvirt format
:return: VM name created
"""
try:
return self.vir_con.defineXML(xml_desc).UUIDString()
except libvirt.libvirtError:
logger.exception('Error while creating domain')
# reraise exception for the cc-server
raise
def _count_domain(self, filter=lambda d: True):
count = 0
for dom in self.domains.itervalues():
if filter(dom):
count += 1
return count
@property
def vm_started(self):
"""Number of VMs started."""
return self._count_domain(lambda d: d.state == 'running')
@property
def vm_stopped(self):
"""Number of VMs stopped."""
return self._count_domain(lambda d: d.state == 'stopped')
@property
def vm_paused(self):
"""Number of VMs paused."""
return self._count_domain(lambda d: d.state == 'paused')
@property
def vm_total(self):
"""Total number of VMs on the hypervisor."""
return self._count_domain()
class LiveMigration(object):
def __init__(self, main_loop, vm, node2virt_port, virt2virt_port, timeout,
unsafe=False):
"""Performs live migration in a forked process.
:param main_loop: instance of MainLoop
:param vm: instance of VM to migrate
:param node2virt_port: port for ccnode -> distant libvirt
:param virt2virt_port: port for local libvirt -> distant libvirt
:param float timeout: timeout for libvirt migration (prevents libvirt
from trying to acquire domain lock forever)
:param bool unsafe: for Libvirt >= 0.9.11, see
http://libvirt.org/html/libvirt-libvirt.html#virDomainMigrateFlags
"""
self.main = main_loop
self.vm = vm
self.node2virt_port = node2virt_port
self.virt2virt_port = virt2virt_port
self.timeout = timeout
self.unsafe = unsafe
#: child pid
self.pid = None
self.error_msg = None
self.return_status = None
# event for caller thread to wait migration termination
self.event = threading.Event()
self.do_fork()
def create_watchers(self):
self.timeout_watcher = self.main.evloop.timer(self.timeout, 0.,
self.timeout_cb)
self.child_watcher = self.main.evloop.child(self.pid, False,
self.child_cb)
self.timeout_watcher.start()
self.child_watcher.start()
def child_cb(self, watcher, revents):
self.pid = None
self.return_status = watcher.rstatus
watcher.stop()
if self.timeout_watcher.active:
self.timeout_watcher.stop()
self.child_watcher = None
logger.debug('Status: %s', self.return_status)
# test if killed, then set msg
if os.WIFSIGNALED(self.return_status):
signo = os.WTERMSIG(self.return_status)
if signo == signal.SIGKILL:
self.error_msg = 'Migration timeout for vm %s' % self.vm.name
else:
self.error_msg = 'Migration failed for vm %s, (%s)' % (
self.vm.name, num_to_sig(signo))
else:
# test status
status = os.WEXITSTATUS(self.return_status)
if status == 1:
self.error_msg = (
'Migration failed for vm %s, due to libvirt error'
% self.vm.name)
elif status == 4:
self.error_msg = 'Cannot open new connection to libvirt'
elif status == 5:
self.error_msg = 'Cannot open connection to remote libvirt'
elif status != 0:
self.error_msg = 'Migration failed for vm %s (%d)' % (
self.vm.name, status)
self.event.set()
def timeout_cb(self, watcher, revents):
# kill the young
logger.debug('Killing child migration process')
os.kill(self.pid, signal.SIGKILL)
@main_thread
def do_fork(self):
# we fork and open a new connection to libvirt because sometimes libvirt
# python binding, while doing a operation,
# doesn't seem to realease CPython's GIL, therefore all node
# operations are blocked
# the only solution we have found right now is to use a dedicated
# libvirt connection for the migration and fork, the migration operation
# in itself is handled by the child while other threads can be scheduled
try:
pid = os.fork()
except OSError:
logger.error('Cannot fork before running live migration')
raise
if pid == 0:
# child
try:
self.child_work()
except:
# whatever the matter is we MUST NOT return to libev or sjRPC
traceback.print_exc('Error uncatched')
finally:
os._exit(42)
self.pid = pid
self.create_watchers()
def child_work(self):
# migration is performed here
sys.stderr.write('Hello from child !\n')
sys.stderr.write('Debug is %s\n' % self.main.config.debug)
try:
close_fds(debug=self.main.config.debug)
set_signal_map({
signal.SIGTERM: lambda *args: os._exit(1),
signal.SIGUSR1: signal.SIG_IGN,
signal.SIGINT: signal.SIG_IGN,
# FIXME need more signal ?
})
except:
sys.stderr.write('Error while performing post fork work\n')
traceback.print_exc(file=sys.stderr)
# create a new libvirt connection dedicated to migration
sys.stderr.write('Open new connection to libvirt\n')
try:
new_con = libvirt.open('qemu:///system')
domain = new_con.lookupByUUIDString(self.vm.uuid)
except libvirt.libvirtError:
sys.stderr.write('Cannot connect to libvirt\n')
os._exit(4)
except:
# error
traceback.print_exc(sys.stderr)
os._exit(2)
sys.stderr.write('Open destination libvirt connection\n')
try:
dest_virt_con = libvirt.open(
'qemu+tcp://127.0.0.1:%d/system' % self.node2virt_port)
except libvirt.libvirtError:
sys.stderr.write('Cannot connect to remote libvirt for live'
' migrating vm %s', self.vm.name)
os._exit(5)
except:
# error
traceback.print_exc(file=sys.stderr)
os._exit(2)
try:
if self.unsafe:
# VIR_MIGRATE_UNSAFE is not defined for libvirt < 0.9.11
append_flags = getattr(libvirt, 'VIR_MIGRATE_UNSAFE', 0)
else:
append_flags = 0
sys.stderr.write('Do migrate\n')
domain.migrate(
dest_virt_con,
libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_PEER2PEER |
libvirt.VIR_MIGRATE_TUNNELLED |
libvirt.VIR_MIGRATE_PERSIST_DEST |
libvirt.VIR_MIGRATE_UNDEFINE_SOURCE |
append_flags,
None,
'qemu+tcp://127.0.0.1:%d/system' % self.virt2virt_port,
0,
)
except libvirt.libvirtError:
sys.stderr.write('libvirt error during migration\n')
traceback.print_exc(file=sys.stderr)
os._exit(1)
except:
# whatever the matter is we MUST NOT return to libev or sjRPC
sys.stderr.write('error during migration\n')
traceback.print_exc(file=sys.stderr)
os._exit(2)
else:
os._exit(0)
finally:
new_con.close()
dest_virt_con.close()
def wait(self):
self.event.wait()
if self.return_status != 0:
raise VMMigrationError(self.error_msg)
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/hypervisor/lib.py 0000664 0000000 0000000 00000042420 13444400225 0026654 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
"""Helpers for libvirt."""
import logging
import time
from itertools import chain, imap, count
from StringIO import StringIO
from xml.etree import cElementTree as et
from functools import partial
import pyev
import libvirt
from cloudcontrol.common.client.tags import Tag
from cloudcontrol.node.utils import Singleton
from cloudcontrol.node.exc import PoolStorageError
logger = logging.getLogger(__name__)
#: corresponding name for enum state of libvirt domains
# see http://libvirt.org/html/libvirt-libvirt.html#virDomainState
DOMAIN_STATES = (
'stopped', # 0 no state
'running', # 1 running
'blocked', # 2 blocked
'paused', # 3 paused
'running', # 4 shutdown
'stopped', # 5 shuttoff
'crashed', # 6 crashed
'suspended', # 7 suspended
)
STORAGE_STATES = (
'inactive',
'building',
'running',
'degraded',
'inaccessible',
'???', # 5
)
#: libvirt events
EVENTS = (
'Added',
'Removed',
'Started',
'Suspended',
'Resumed',
'Stopped',
'Saved',
'Restored',
)
# following event loop implementation was inspired by libvirt python example
# but updated to work with libev
class LoopHandler(object):
"""This class contains the data we need to track for a single file handle.
"""
def __init__(self, loop, handle, fd, events, cb, opaque):
self.handle = handle
self.fd = fd
self._events = self.virt_to_ev(events)
self._cb = cb
self.opaque = opaque
self.watcher = loop.io(self.fd, self._events, self.ev_cb,
None, pyev.EV_MAXPRI)
def ev_to_virt(self, events):
"""Convert libev events into libvirt one."""
result = 0
if events & pyev.EV_READ:
result |= libvirt.VIR_EVENT_HANDLE_READABLE
if events & pyev.EV_WRITE:
result |= libvirt.VIR_EVENT_HANDLE_WRITABLE
return result
def virt_to_ev(self, events):
"""Convert libvirt event to libev one."""
result = 0
if events & (libvirt.VIR_EVENT_HANDLE_READABLE |
libvirt.VIR_EVENT_HANDLE_ERROR |
libvirt.VIR_EVENT_HANDLE_HANGUP):
result |= pyev.EV_READ
if events & libvirt.VIR_EVENT_HANDLE_WRITABLE:
result |= pyev.EV_WRITE
return result
def _set(self):
self.watcher.stop()
if self._events != 0:
self.watcher.set(self.fd, self._events)
self.watcher.start()
@property
def events(self):
return self._events
@events.setter
def events(self, events):
self._events = self.virt_to_ev(events)
self._set()
def start(self):
self.watcher.start()
def stop(self):
self.watcher.stop()
def ev_cb(self, watcher, revents):
# convert events
events = self.ev_to_virt(revents)
self._cb(self.handle, self.watcher.fd, events, self.opaque[0],
self.opaque[1])
class LoopTimer(object):
"""This class contains the data we need to track for a single periodic
timer.
"""
def __init__(self, loop, timer, interval, cb, opaque):
self.timer = timer
self._interval = float(interval)
self._cb = cb
self.opaque = opaque
self.watcher = None
self.loop = loop
self._set()
def _set(self):
self.stop()
if self._interval >= 0.: # libvirt sends us interval == -1
self.watcher = self.loop.timer(self._interval, self._interval,
self.ev_cb, None, pyev.EV_MAXPRI)
else:
self.watcher = None
self.start()
@property
def interval(self):
return self._interval
@interval.setter
def interval(self, value):
self._interval = float(value)
self._set()
def start(self):
if self.watcher is not None:
self.watcher.start()
def stop(self):
if self.watcher is not None:
self.watcher.stop()
def ev_cb(self, *args):
self._cb(self.timer, self.opaque[0], self.opaque[1])
class EventLoop(object):
"""This class is used as an interface between the libvirt event handling and
the main pyev loop.
It cannot be used from other threads.
"""
# singletion usage is very important because libvirt.virEventRegisterImpl
# would segfaults on libvirt 0.8.X
__metaclass__ = Singleton
def __init__(self, loop):
"""
:param loop: pyev loop instance
"""
self.loop = loop
self.handle_id = count(1)
self.timer_id = count(1)
self.handles = dict()
self.timers = dict()
# This tells libvirt what event loop implementation it
# should use
libvirt.virEventRegisterImpl(
self.add_handle,
self.update_handle,
self.remove_handle,
self.add_timer,
self.update_timer,
self.remove_timer,
)
def add_handle(self, fd, events, cb, opaque):
"""Registers a new file handle 'fd', monitoring for 'events' (libvirt
event constants), firing the callback cb() when an event occurs.
Returns a unique integer identier for this handle, that should be
used to later update/remove it.
Note: unlike in the libvirt example, we don't use an interrupt trick as
we run everything in the same thread, furthermore, calling start watcher
method from a different thread could be dangerous.
"""
handle_id = self.handle_id.next()
h = LoopHandler(self.loop, handle_id, fd, events, cb, opaque)
h.start()
self.handles[handle_id] = h
# logger.debug('Add handle %d fd %d events %d', handle_id, fd, events)
return handle_id
def add_timer(self, interval, cb, opaque):
"""Registers a new timer with periodic expiry at 'interval' ms,
firing cb() each time the timer expires. If 'interval' is -1,
then the timer is registered, but not enabled.
Returns a unique integer identier for this handle, that should be
used to later update/remove it.
Note: same note as for :py:meth:`add_handle` applies here
"""
timer_id = self.timer_id.next()
h = LoopTimer(self.loop, timer_id, interval, cb, opaque)
h.start()
self.timers[timer_id] = h
# logger.debug('Add timer %d interval %d', timer_id, interval)
return timer_id
def update_handle(self, handle_id, events):
"""Change the set of events to be monitored on the file handle.
"""
h = self.handles.get(handle_id)
if h:
h.events = events
# logger.debug('Update handle %d fd %d events %d', handle_id, h.fd, events)
def update_timer(self, timer_id, interval):
"""Change the periodic frequency of the timer.
"""
t = self.timers.get(timer_id)
if t:
t.interval = interval
def remove_handle(self, handle_id):
"""Stop monitoring for events on the file handle.
"""
h = self.handles.pop(handle_id, None)
if h:
h.stop()
# logger.debug('Remove handle %d', handle_id)
def remove_timer(self, timer_id):
"""Stop firing the periodic timer.
"""
t = self.timers.pop(timer_id, None)
if t:
t.stop()
# logger.debug('Remove timer %d', timer_id)
def stop(self):
for handl in chain(self.handles.itervalues(), self.timers.itervalues()):
handl.stop()
self.handles = dict()
self.timers = dict()
class StorageIndex(object):
"""Keep an index of all storage volume paths."""
SHARED_TYPES = ['rbd']
def __init__(self, handler, lv_con):
"""
:param handler: Hypervisor handler instance
:param lv_con: Libvirt connection
"""
self.handler = handler
self.lv_con = lv_con
self.storages = {}
self.paths = None
self.update()
def update(self, retry=1):
"""Update storage pools and volumes."""
# go through all storage pools and check if it is already in the index
for lv_storage in imap(
self.lv_con.storagePoolLookupByName,
chain(
self.lv_con.listDefinedStoragePools(),
self.lv_con.listStoragePools(),
),
):
if lv_storage.name() in self.storages:
# update
self.storages[lv_storage.name()].update(retry)
else:
# add storage pool
storage_type = et.ElementTree().parse(StringIO(lv_storage.XMLDesc(0))).get('type')
if storage_type in self.SHARED_TYPES:
s = SharedStorage(lv_storage)
else:
s = Storage(lv_storage)
self.storages[s.name] = s
# add tags
self.handler.tag_db.add_tags((
Tag('sto%s_state' % s.name, partial(lambda x: x.state, s), 5, 5),
Tag('sto%s_size' % s.name, partial(lambda x: x.capacity, s), 5, 5),
Tag('sto%s_free' % s.name, partial(lambda x: x.available, s), 5, 5),
Tag('sto%s_used' % s.name,
partial(lambda x: x.capacity - x.available
if x.available is not None and x.capacity is not None else None, s), 5, 5),
Tag('sto%s_type' % s.name, partial(lambda x: x.type, s), 5, 5),
Tag('sto%s_vol' % s.name,
partial(lambda x: ' '.join(x.volumes) if x.volumes and not x.is_shared else None, s),
5, 5),
Tag('sto%s_ratio' % s.name,
partial(lambda x: '%0.2f' % (1 - float(x.available) / x.capacity)
if x.available is not None and x.capacity is not None else None, s), 5, 5),
Tag('sto%s_shared' % s.name, partial(lambda x: {True: 'yes', False: 'no'}[s.is_shared], s)),
))
# Add a special storage for standalone drives:
if '_standalone' not in self.storages:
self.storages['_standalone'] = DummyStorage('_standalone')
self.update_path_index()
def update_path_index(self):
self.paths = dict(
(v.path, v) for v in chain.from_iterable(imap(
lambda s: s.volumes.itervalues(),
self.storages.itervalues(),
)),
)
def get_volume(self, path):
return self.paths.get(path)
def get_volume_by_pool(self, pool, vol):
storage = self.get_storage(pool)
return storage.volumes.get(vol)
def get_storage(self, name):
return self.storages.get(name)
def create_volume(self, pool_name, volume_name, capacity):
"""Create a new volume in the storage pool.
:param str name: name for the volume
:param int capacity: size for the volume
"""
# get volume
logger.debug('asked pool %s', pool_name)
logger.debug('Pool state %s', self.storages)
try:
pool = self.storages[pool_name]
except KeyError:
raise PoolStorageError('Invalid pool name')
if pool is None:
raise Exception('Storage pool not found')
try:
new_volume = pool.lv_storage.createXML("""
%s
%d
""" % (volume_name, capacity), 0)
except libvirt.libvirtError:
logger.exception('Error while creating volume')
raise
self.update()
return new_volume
def delete_volume(self, pool_name, volume_name):
"""Delete a volume in the givent storage pool.
:param str pool_name: name for the storage pool
:param str volume_name: name for the volume
"""
# get volume
try:
pool = self.storages[pool_name]
except KeyError:
raise PoolStorageError('Invalid pool name')
try:
volume = pool.volumes[volume_name]
except KeyError:
raise PoolStorageError('Invalid volume name')
# delete from index
del self.paths[volume.path]
del self.storages[pool_name].volumes[volume_name]
# delete volume
try:
volume.lv_volume.delete(0)
except libvirt.libvirtError:
logger.exception('Error while deleting volume')
raise
class Storage(object):
"""Storage abstraction."""
REFRESH_RETRY_INTERVAL = 1
def __init__(self, lv_storage):
"""
:param lv_storage: Libvirt pool storage instance
"""
self.uuid = lv_storage.UUID()
self.name = lv_storage.name()
self.lv_storage = lv_storage
self.state, self.capacity = None, None
self.allocation, self.available = None, None
self.type = et.ElementTree().parse(
StringIO(lv_storage.XMLDesc(0))).get('type')
self.volumes = {}
self.update()
@property
def is_shared(self):
return False
def update(self, retry=1):
for _ in xrange(retry):
try:
self.lv_storage.refresh()
except libvirt.libvirtError as err:
logger.warning('Unable to refresh storage %s: %s', self.name, err)
time.sleep(self.REFRESH_RETRY_INTERVAL)
else:
break
try:
self.update_attr()
# update volumes
for vol_name in self.lv_storage.listVolumes():
if vol_name in self.volumes:
# update volume
self.volumes[vol_name].update()
else:
# add volume
v = Volume(self, self.lv_storage.storageVolLookupByName(vol_name))
self.volumes[v.name] = v
except libvirt.libvirtError as err:
logger.warning('Unable to update storage %s: %s', self.name, err)
def update_attr(self):
self.state, self.capacity, self.allocation, self.available = self.lv_storage.info()
self.state = STORAGE_STATES[self.state]
self.type = et.ElementTree().parse(
StringIO(self.lv_storage.XMLDesc(0))).get('type')
class DummyStorageVolumeDispenser(object):
def __init__(self, storage):
self.storage = storage
def get(self, name):
return DummyVolume(self.storage, name)
def itervalues(self):
return iter([])
class SharedStorage(Storage):
"""Shared storage abstraction."""
def __init__(self, lv_storage):
"""
:param lv_storage: Libvirt pool storage instance
"""
super(SharedStorage, self).__init__(lv_storage)
self.volumes = DummyStorageVolumeDispenser(self)
@property
def is_shared(self):
return True
def update(self, retry=1):
pass # Do nothing.
class DummyStorage(Storage):
"""Dummy storage abstraction."""
def __init__(self, name, shared=True):
"""
:param lv_storage: Libvirt pool storage instance
"""
self.uuid = None
self.name = name
self.shared = shared
self.state, self.capacity = None, None
self.allocation, self.available = None, None
self.type = 'dummy'
self.volumes = DummyStorageVolumeDispenser(self)
@property
def is_shared(self):
return self.shared
def update(self, retry=1):
pass # Do nothing
class Volume(object):
"""Volume abstraction."""
def __init__(self, storage, lv_volume):
"""
:param lv_volume: Libvirt volume instance
"""
self.storage = storage
self.path = lv_volume.path()
self.name = lv_volume.name()
self.capacity, self.allocation = None, None
self.lv_volume = lv_volume
self.update()
def update(self):
self.capacity, self.allocation = self.lv_volume.info()[1:]
class DummyVolume(object):
"""Dummy volume abstraction."""
def __init__(self, storage, name):
self.storage = storage
self.path, self.capacity, self.allocation = None, None, None
self.name = name
self.lv_volume = None
def update(self):
pass # Do nothing.
class BoundVolumeProxy(object):
"""Proxy object to an existing Volume when its bound to a VM."""
def __init__(self, volume, device, bus):
self.volume = volume
self.device = device
self.bus = bus
def __getattr__(self, name):
return self.volume.__getattribute__(name)
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/hypervisor/tags.py 0000664 0000000 0000000 00000013341 13444400225 0027044 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import logging
from functools import wraps
import libvirt
from cloudcontrol.node.utils import and_
logger = logging.getLogger(__name__)
def _virt_tag(func):
"""Catches libvirt related exception.
Decorator used for tag declarations that interacts with libvirt.
"""
@wraps(func)
def decorated(handl):
if not handl.virt_connected:
return
try:
return func(handl)
except libvirt.libvirtError:
logger.exception('Unexpected libvirt error')
handl.vir_con_restart()
return decorated
def _check_virt_connected(func):
"""Check is libvirt is connected before caculating tag."""
@wraps(func)
def decorated(handl):
if not handl.virt_connected:
return
return func(handl)
return decorated
def vir_status(handl):
"""Local libvirt connection status."""
return {True: 'connected', False: 'disconnected'}[handl.virt_connected]
def vir_uri(handl):
return handl.main.config.libvirt_uri
# hypervisor related tags
def htype():
"""Hypervisor type."""
# FIXME for other support
return u'kvm'
@_check_virt_connected
def hv(handl):
"""Hypervisor name."""
# What is the point of this tag ? if the information not already in a and id
# ?
return handl.hypervisor.name
def hvm():
"""Hardware virtualization enable."""
# see
# http://www.linux-kvm.org/page/FAQ#How_can_I_tell_if_I_have_Intel_VT_or_AMD-V.3F
# or
# http://www.cyberciti.biz/faq/linux-xen-vmware-kvm-intel-vt-amd-v-support/
# if we are in a xen hypervisor we won't see vt in /proc/cpuinfo
result = {True: u'yes', False: u'no'}
if htype() == u'kvm':
# findout in /proc/cpuinfo if all CPUs have virtualisation enabled
return result[and_(
set(
'vmx', # Intel VT
'svm', # AMD
) & set(
l.split(': ')[-1].split()
) for l in open('/proc/cpuinfo').readline() if l.startswith('Tags')
)]
return None
@_virt_tag
def hvver(handl):
"""Hypervisor version."""
return handl.hypervisor.vir_con.getVersion()
@_virt_tag
def libvirtver(handl):
"""Version of running libvirt."""
return handl.hypervisor.vir_con.getLibVersion()
# jobs
def rjobs():
"""Number of currently running jobs."""
# storage pools
@_check_virt_connected
def sto(handl):
"""Storage pool names."""
return u' '.join(handl.hypervisor.storage.storages.iterkeys())
# Vm related tags
@_check_virt_connected
def nvm(handl):
"""Number of VMS in the current hypervisor."""
return handl.hypervisor.vm_total
@_check_virt_connected
def vmpaused(handl):
"""Count of VMs paused."""
return handl.hypervisor.vm_paused
@_check_virt_connected
def vmstarted(handl):
"""Count of VMs started."""
return handl.hypervisor.vm_started
@_check_virt_connected
def vmstopped(handl):
"""Count of VMs Stopped."""
return handl.hypervisor.vm_stopped
@_check_virt_connected
def cpurunning(handl):
"""CPU total used by running VMs on the hypervisor."""
return sum(int(vm.tag_db['__main__']['cpu'].value) for vm in
handl.hypervisor.domains.itervalues() if vm.tag_db['__main__']['cpu'].value and
vm.state == 'running')
@_check_virt_connected
def cpualloc(handl):
"""CPU total used by all VMs on the hypervisor."""
return sum(int(vm.tag_db['__main__']['cpu'].value) for vm in
handl.hypervisor.domains.itervalues() if vm.tag_db['__main__']['cpu'].value)
@_check_virt_connected
def cpuremaining(handl):
"""Allocatable CPU remaining on the hypervisor."""
cpu = int(handl.tag_db['__main__']['cpu'].value)
cpualloc = int(handl.tag_db['__main__']['cpualloc'].value)
return cpu - cpualloc
@_check_virt_connected
def cpuallocratio(handl):
"""Allocated CPU ratio on the hypervisor."""
cpu = float(handl.tag_db['__main__']['cpu'].value)
cpualloc = float(handl.tag_db['__main__']['cpualloc'].value)
return '%0.2f' % (cpualloc / cpu)
@_check_virt_connected
def memrunning(handl):
"""Memory used by running VMs on the hypervisor."""
return sum(int(vm.tag_db['__main__']['mem'].value) for vm in
handl.hypervisor.domains.itervalues() if vm.tag_db['__main__']['mem'].value and
vm.state == 'running')
@_check_virt_connected
def memalloc(handl):
"""Memory used by all VMs on the hypervisor."""
return sum(int(vm.tag_db['__main__']['mem'].value) for vm in
handl.hypervisor.domains.itervalues() if vm.tag_db['__main__']['mem'].value)
@_check_virt_connected
def memremaining(handl):
"""Allocatable memory remaining on the hypervisor."""
mem = int(handl.tag_db['__main__']['mem'].value)
memalloc = int(handl.tag_db['__main__']['memalloc'].value)
return mem - memalloc
@_check_virt_connected
def memallocratio(handl):
"""Allocated memory ratio on the hypervisor."""
mem = float(handl.tag_db['__main__']['mem'].value)
memalloc = float(handl.tag_db['__main__']['memalloc'].value)
return '%0.2f' % (memalloc / mem)
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/jobs.py 0000664 0000000 0000000 00000027753 13444400225 0024645 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import errno
import logging
import os
import signal
import subprocess
import sys
import traceback
from threading import Thread, Event
from StringIO import StringIO
from itertools import count
from cloudcontrol.node.exc import JobError
from cloudcontrol.node.utils import num_to_sig, close_fds
logger = logging.getLogger(__name__)
class JobManager(object):
def __init__(self, main_loop):
"""
:param main_loop: :class:`MainLoop` instance
"""
self.job_id = count()
self.main = main_loop
#: keep an index of all jobs
self.jobs = {}
def job_start(self):
pass
def job_stop(self):
pass
def notify(self, job):
"""Called when a job is done."""
# by now only remove the job
self.remove(job.id)
def cancel(self, job_id):
"""Cancel a job."""
self.jobs[job_id].stop()
def remove(self, job_id):
try:
return self.jobs.pop(job_id)
except KeyError:
logger.error('Job %s does not exist', job_id)
def create(self, job_constructor, *args, **kwargs):
"""Create a new job and populate job id."""
job = job_constructor(self, *args, **kwargs)
self.jobs[job.id] = job
return job
def get(self, job_id):
return self.jobs[job_id]
def start(self):
pass
def stop(self):
logger.debug('Stopping all currently running jobs')
for job in self.jobs.itervalues():
try:
job.stop()
except Exception:
pass
class BaseThreadedJob(Thread):
"""Job running in a background thread.
Handles job notification to the job manager.
"""
def __init__(self, job_manager):
Thread.__init__(self)
#: report progress in %
self.progress = 0.
self.job_manager = job_manager
#: job id
self.id = job_manager.job_id.next()
self.running = False
def pre_job(self):
"""Job preparation that is called when doing start, it can raise
exceptions to report error to the caller. In the latter case, it will
also removes itself from the job list.
"""
pass
def run_job(self):
"""Overide this method to define what your job do."""
raise NotImplementedError
def run(self):
try:
self.run_job()
except Exception:
pass
finally:
self.running = False
def notify(self):
self.job_manager.notify(self)
def start(self):
"""Start job in a background thread."""
# first we run pre_job as it could raise an exception
try:
self.pre_job()
except Exception:
# in case of error we must remove the job from the manager
self.notify()
raise
# then we start the watcher when it's safe
self.running = True
Thread.start(self) # thread will signal when it's done using async
def wait(self):
"""For jobs running in a background, this method MUST be called in order
to remove the job from the list in the end.
"""
self.join()
self.notify()
def start_current(self):
"""Start job in current thread."""
try:
self.pre_job()
self.running = True
self.run_job()
# we could log exceptions here but rather do it inside the run_job
# method
finally:
self.notify()
def stop(self):
self.running = False
class ForkedJob(object):
"""Job that executes in a fork.
When inherit, you must define open_fds property that list file descriptors
that must be kept in the child and closed in the parent.
.. warning::
logging should not be used in the child as this would cause a deadlock
to occur, see http://bugs.python.org/issue6721
"""
def __init__(self, job_manager):
"""
:param job_manager: :class:`JobManager` instance
"""
self.job_manager = job_manager
#: job id
self.id = job_manager.job_id.next()
self.running = False
# event for other thread to wait for job termination
self.job_done = Event()
self.fork_pid = None
# internal event used to wait for forked process termination
self.fork_die = Event()
# libev child watcher
self.fork_watcher = None
# return status of forked process
self.return_status = None
def create_fork_watcher(self):
self.fork_watcher = self.job_manager.main.evloop.child(self.fork_pid, False, self.child_cb)
self.fork_watcher.start()
def child_cb(self, watcher, revents):
self.return_status = watcher.rstatus
self.fork_die.set()
watcher.stop()
self.fork_watcher = None
def pre_job(self):
"""This method represents any preparation job that must be done in the
parent before the fork.
"""
pass
def run_job(self):
"""This represent the work that will be done in the forked child.
This method MUST be redefined in subclasses.
"""
pass
def after_fork(self):
"""This method will be called just after fork in the child.
It does nothing by default and it can be redifined in subclasses.
"""
pass
def fatal(self, fmt, *args, **kwargs):
"""Write error message in stderr and exit.
:param str fmt: format string
:param \*args: arguments for format string
:param \*\*kwargs: can contain ('status', :int:) -> exit status of process
"""
try:
status = int(kwargs.get('status', 1))
except (ValueError, TypeError):
sys.stderr.write('Bad status argument %s' % status)
os._exit(42)
try:
fmt = fmt % args
except (ValueError, TypeError):
sys.stderr.write('Bad formatting for string: %s' % fmt)
os._exit(42)
try:
sys.stderr.write(fmt)
except IOError:
os._exit(42)
os._exit(status)
def fatal_exc(self, fmt, *args, **kwargs):
"""Write error message and traceback and exit.
:param str fmt: format string
:param \*args: arguments for format string
:param \*\*kwargs: can contain ('status', :int:) -> exit status of process
"""
tb = StringIO()
tb.write('\n')
traceback.print_exc(file=tb)
tb.write('\n')
fmt += '%s'
args = args + (tb.getvalue(),)
self.fatal(fmt, *args, status=kwargs.get('status', 1))
def reset_signal_mask(self):
signal.signal(signal.SIGTERM, lambda *args: os._exit(1))
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
def run(self):
"""This method performs all the hard work by doing the actual fork.
It catches all possible exceptions in the child, as this would prevents
the latter from going back in the stack and doing nasty things with
libev loop or sjRPC.
Thus you do not need to capture all exceptions in your code,
furthermore, if you need to exit from the child, you'd better use
`os._exit `_ function.
"""
try:
self.fork_pid = os.fork()
except OSError as exc:
logger.error('Cannot fork (job %s): %s', self.id, exc.strerror)
raise
self.running = True
if self.fork_pid == 0:
# child
# just hope to not receive any signals in between since there is no
# way to block signals in python :(
try:
self.reset_signal_mask()
close_fds(exclude_fds=self.open_fds,
debug=self.job_manager.main.config.debug)
self.after_fork()
except:
traceback.print_exc()
os._exit(1)
try:
self.run_job()
except:
sys.stderr.write('Error during job %s\n' % self.id)
traceback.print_exc()
os._exit(1)
else:
sys.stderr.write('Job execution went well %s\n' % self.id)
os._exit(0)
else:
self.create_fork_watcher()
# close child fds
for fd in self.open_fds:
try:
os.close(fd)
except OSError as exc:
if exc.errno == errno.EBADF:
# FIXME this is weird but it seems to happen sometimes
logger.debug('Error while closing fd %s in parent,'
' EBADF (job %s', fd, self.id)
continue
logger.error('Error while closing fds in parent: %s',
exc.strerror)
raise
def start(self):
"""This will start the job by executing :py:meth:`pre_job` method and
:py:meth:`run`.
"""
self.pre_job()
self.job_manager.main.call_in_main_thread(self.run)
def stop(self):
"""This would be called to stop the job.
"""
if self.fork_die.is_set():
return
try:
os.kill(self.fork_pid, signal.SIGKILL)
except OSError as exc:
if exc.errno == errno.ESRCH:
logger.debug('Child already killed')
return
logger.error('Cannot kill child for IO job: %s', exc.strerror)
raise
def notify(self):
self.job_manager.notify(self)
def wait(self):
"""This will wait for the fork to end and raise exception depending on
child return status.
.. warning::
This method MUST be called.
"""
if self.fork_pid is None:
return
try:
self.fork_die.wait()
if self.return_status >> 8 != 0:
if self.return_status & 0xff == signal.SIGKILL:
logger.error('Job was killed')
else:
raise JobError('Exception during job, returned %s, signal'
' %s' % (
self.return_status >> 8,
num_to_sig(self.return_status & 0xff)))
finally:
self.fork_pid = None
self.job_done.set()
self.notify()
def join(self):
"""This provides an API similar to `threading.Thread.join
`_
, you can wait
for the job termination from multiple points in your program but one of
these and only one MUST be `wait` method.
"""
self.job_done.wait()
class BaseIOJob(ForkedJob):
"""Fork job that set ionice on the child."""
#: level of io nice that will be set (see :manpage:`ionice(1)`)
IO_NICE = 7
def after_fork(self):
try:
subprocess.check_call(['ionice', '-n%d' % self.IO_NICE,
'-p%d' % os.getpid()], close_fds=True)
except subprocess.CalledProcessError as exc:
sys.stderr.write('Cannot set ionice, return code %s\n' % exc.returncode)
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/node.py 0000664 0000000 0000000 00000011000 13444400225 0024607 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import os
import logging
import logging.config
from functools import wraps
from cloudcontrol.common.client.loop import RPCStartHandler, MainLoop
from cloudcontrol.common.client.tags import Tag
from cloudcontrol.node import __version__
from cloudcontrol.node.config import NodeConfigParser, configure_logging
from cloudcontrol.node.jobs import JobManager
from cloudcontrol.node.exc import ForbiddenHandler
logger = logging.getLogger(__name__)
def _rights_check(handler_name):
"""Method instance decorator for checking permissions before executing an
RPC handler.
"""
def decorator(func):
@wraps(func)
def decorated(*args, **kwargs):
if handler_name in func.im_self.main.config.forbidden_handlers:
logger.error('Remote tried to call forbidden handler "%s"',
handler_name)
raise ForbiddenHandler('Forbidden handler "%s"' % handler_name)
return func(*args, **kwargs)
return decorated
return decorator
class NodeRPCStartHandler(RPCStartHandler):
def handle_authentication_response(self, response=None):
# set handler according to which role was returned by the cc-server
if response == self.loop.role and response is not None:
# we don't need to reload the plugins
# but we need to register the objects and tags
self.loop.tag_db.rpc_register()
elif response == u'host':
# close previous plugins if needed
if self.loop.role is not None:
self.loop.close_plugins()
logger.debug('Role host affected')
from cloudcontrol.node.host import Handler as HostHandler
self.loop.main_plugin = HostHandler(loop=self.loop)
self.loop.role = 'host'
# (re)-register the tags of the main loop
self.loop.tag_db.rpc_register()
self.loop.register_plugin(self.loop.main_plugin)
elif response == u'hv':
# close previous plugins if needed
if self.loop.role is not None:
self.loop.close_plugins()
logger.debug('Role hypervisor affected')
# set libvirt environement variables
os.environ['LIBVIRT_DEBUG'] = '4'
# os.environ['LIBVIRT_LOG_FILTERS'] = ''
os.environ['LIBVIRT_LOG_OUTPUT'] = '4:stderr'
# we don't import those modules at the top because some dependancies
# may not be installed
from cloudcontrol.node.hypervisor import Handler as HypervisorHandler
self.loop.main_plugin = HypervisorHandler(
hypervisor_name=self.loop.config.server_user,
loop=self.loop,
)
self.loop.role = 'hv'
# (re)-register the tags of the main loop
self.loop.tag_db.rpc_register()
self.loop.register_plugin(self.loop.main_plugin)
else:
logger.error('Failed authentication, role returned: %s', response)
self._goto(self.handle_error)
return
self._goto(self.handle_done)
class NodeLoop(MainLoop):
CONFIG_CLASS = NodeConfigParser
CONNECT_CLASS = NodeRPCStartHandler
DEFAULT_TAGS = (Tag(u'version', __version__),)
def __init__(self, config_path):
MainLoop.__init__(self, config_path)
self.job_manager = JobManager(self)
def reset_handler(self, name, handl):
# we decorate each handler for permissions to be checked before each
# invocation
MainLoop.reset_handler(self, name, _rights_check(name)(handl))
def configure_logging(self):
configure_logging(self.config.logging_level, self.config.logging_output)
def stop(self, watcher=None, revents=None):
MainLoop.stop(self, watcher, revents)
# stop running jobs
self.job_manager.stop()
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/cloudcontrol/node/utils.py 0000664 0000000 0000000 00000017100 13444400225 0025031 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import os
import sys
import errno
import signal
import logging
import resource
import threading
import subprocess
from collections import deque
logger = logging.getLogger(__name__)
def and_(iter_):
"""Do an and logic condition over the iterable element.
:param iterable iter: meat for condition
"""
for i in iter_:
if not i:
return False
return True
def subproc_call(main_loop, args, stdin=None):
"""
:param args: arguments for subprocess call
:param stdin: stdin data as string
"""
rcode, output = execute(args, stdin)
if rcode != 0:
raise subprocess.CalledProcessError(rcode, 'Error while executing command')
return output
def execute(main_loop, args, stdin=None):
"""Execute a command and return error code and command output.
Warning: this function will block until the command exits. DO NOT CALL IT
IN THE MAIN THREAD.
:param args: list of command arguments. First item is the command itself
:param stdin: string to pass as command standard input
"""
# Create pipes to interact with child's standard IOs:
r_stdin, w_stdin = os.pipe()
r_stdout, w_stdout = os.pipe()
child_is_terminated = threading.Event()
# Callback to be called by libev when the child terminates:
def _cb_child_is_terminated(watcher, revents):
child_is_terminated.set()
watcher.stop()
# Part executed in the main thread to allow child watcher to be properly
# installed: "It is permissible to install a Child watcher after the child
# has been forked (which implies it might have already exited), as long as
# the event loop isn't entered" - http://pythonhosted.org/pyev/Child.html
def _executed_in_main_thread():
pid = os.fork()
if pid == 0: # Child
os.dup2(r_stdin, sys.stdin.fileno())
os.dup2(w_stdout, sys.stdout.fileno())
os.dup2(w_stdout, sys.stderr.fileno())
close_fds(debug=True)
try:
os.execvp(args[0], args)
except OSError as err:
if err.errno == 2:
os._exit(127)
os._exit(1)
except:
os._exit(1)
else: # Parent
os.close(r_stdin)
os.close(w_stdout)
# Write stdin string to the child standard input:
buf = stdin
while buf:
written = os.write(w_stdin, buf)
buf = buf[written:]
os.close(w_stdin)
# Create a watcher to catch child termination and start it:
child_watcher = main_loop.evloop.child(pid, False, _cb_child_is_terminated)
child_watcher.start()
# Returns the created watcher for two reasons:
# 1. Access to the child exit code and pid
# 2. Keep a reference on the watcher to prevent it to be garbage collected
return child_watcher
child_watcher = main_loop.call_in_main_thread(_executed_in_main_thread)
logger.debug('Executed command with pid %d: args: %s, stdin: %s',
child_watcher.pid, args,
'%d bytes' % len(stdin) if stdin is not None else 'no')
# Read child's stdout:
stdout = ''
tmp = True
while tmp:
tmp = os.read(r_stdout, 2048)
stdout += tmp
os.close(r_stdout)
child_is_terminated.wait()
if os.WIFEXITED(child_watcher.rstatus):
rcode = os.WEXITSTATUS(child_watcher.rstatus)
else:
rcode = -os.WTERMSIG(child_watcher.rstatus)
logger.debug('Command with pid %d returned with code %d',
child_watcher.pid, rcode)
return rcode, stdout
class SocketBuffer(deque):
"""Holds bytes in a list.
This class don't handle maximum size but instead give help like handling
count automatically.
"""
def __init__(self, max_len=8 * 64 * 1024):
deque.__init__(self)
self.max_len = max_len
self.current_len = 0
def append(self, x):
deque.append(self, x)
self.current_len += len(x)
def appendleft(self, x):
deque.appendleft(self, x)
self.current_len += len(x)
def clear(self):
deque.clear(self)
self.current_len = 0
def extend(self, iterable):
raise NotImplementedError
def extendleft(self, iterable):
raise NotImplementedError
def pop(self):
elt = deque.pop(self)
self.current_len -= len(elt)
return elt
def popleft(self):
elt = deque.popleft(self)
self.current_len -= len(elt)
return elt
def remove(value):
raise NotImplementedError
def reverse(self):
raise NotImplementedError
def rotate(self, n):
raise NotImplementedError
def is_full(self):
return self.current_len >= self.max_len
def is_empty(self):
return self.current_len == 0
class Singleton(type):
"""Singleton metaclass."""
def __init__(cls, name, bases, dict):
super(Singleton, cls).__init__(cls, bases, dict)
cls._instance = None
def __call__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instance
def close_fds(exclude_fds=None, debug=False):
"""Close all fds uneeded fds in child when using fork.
:param exclude_fds: list of file descriptors that should not be closed (0,
1, 2 must not be set here, see debug)
:param bool debug: indicates if std in/out should be left open (usually for
debuging purpose)
"""
# get max fd
limit = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if limit == resource.RLIM_INFINITY:
max_fd = 2048
else:
max_fd = limit
if exclude_fds is None:
exclude_fds = []
if debug:
exclude_fds += [0, 1, 2] # debug
for fd in xrange(max_fd, -1, -1):
if fd in exclude_fds:
continue
try:
os.close(fd)
except OSError as exc:
if exc.errno != errno.EBADF:
raise
# wasn't open
if not debug:
sys.stdin = open(os.devnull)
sys.stdout = open(os.devnull, 'w')
sys.stderr = open(os.devnull, 'w')
assert sys.stdin.fileno() == 0
assert sys.stdout.fileno() == 1
assert sys.stderr.fileno() == 2
def set_signal_map(map_):
"""Set signal map in fork children.
:param mapping map_: (signal code, handler)...
:returns: old handlers as dict
"""
previous_handlers = dict()
for sig, handler in map_.iteritems():
previous_handlers[sig] = signal.signal(sig, handler)
return previous_handlers
sig_names = dict((k, v) for v, k in signal.__dict__.iteritems() if
v.startswith('SIG'))
def num_to_sig(num):
"""Returns signal name.
:param num: signal number
"""
return sig_names.get(num, 'Unknown signal')
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/ 0000775 0000000 0000000 00000000000 13444400225 0020614 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/ 0000775 0000000 0000000 00000000000 13444400225 0021372 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/Makefile 0000664 0000000 0000000 00000011032 13444400225 0023027 0 ustar 00root root 0000000 0000000 # Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) source
.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest
help:
@echo "Please use \`make ' where is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " singlehtml to make a single large HTML file"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " devhelp to make HTML files and a Devhelp project"
@echo " epub to make an epub"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " latexpdf to make LaTeX files and run them through pdflatex"
@echo " text to make text files"
@echo " man to make manual pages"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
clean:
-rm -rf $(BUILDDIR)/*
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
singlehtml:
$(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
@echo
@echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/CloudControlnode.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/CloudControlnode.qhc"
devhelp:
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
@echo
@echo "Build finished."
@echo "To view the help file:"
@echo "# mkdir -p $$HOME/.local/share/devhelp/CloudControlnode"
@echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/CloudControlnode"
@echo "# devhelp"
epub:
$(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
@echo
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make' in that directory to run these through (pdf)latex" \
"(use \`make latexpdf' here to do that automatically)."
latexpdf:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through pdflatex..."
make -C $(BUILDDIR)/latex all-pdf
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
text:
$(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
@echo
@echo "Build finished. The text files are in $(BUILDDIR)/text."
man:
$(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
@echo
@echo "Build finished. The manual pages are in $(BUILDDIR)/man."
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/ 0000775 0000000 0000000 00000000000 13444400225 0022672 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/architecture.rst 0000664 0000000 0000000 00000001535 13444400225 0026112 0 ustar 00root root 0000000 0000000 Architecture
============
Start up proccess
-----------------
See ``bin/cc-node``.
Summary of the steps:
Binary:
* First parse command line options and configuration file
* Switch to a daemon context
* Instanciate and launch a :class:`cloudcontrol.node.node.MainLoop`
Organisation of modules/packages
--------------------------------
.. tree ../../ccnode | grep -v \.pyc | grep -v \\.\\.
.. code-block:: text
|-- config.py
|-- exc.py
|-- host
| |-- __init__.py
| |-- tags.py
|-- hypervisor
| |-- domains
| | |-- __init__.py
| | |-- vm_tags.py
| |-- __init__.py
| |-- jobs.py
| |-- lib.py
| |-- tags.py
|-- __init__.py
|-- jobs.py
|-- node.py
|-- plugins.py
|-- tags.py
|-- utils.py
TODO
Node
----
.. automodule:: cloudcontrol.node.node
:members:
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/cold_migration.rst 0000664 0000000 0000000 00000000060 13444400225 0026412 0 ustar 00root root 0000000 0000000 Cold migration steps
====================
TODO
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/conf.py 0000664 0000000 0000000 00000016125 13444400225 0024176 0 ustar 00root root 0000000 0000000 # -*- coding: utf-8 -*-
#
# Cloud Control node documentation build configuration file, created by
# sphinx-quickstart on Thu Sep 15 15:48:00 2011.
#
# This file is execfile()d with the current directory set to its containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#sys.path.insert(0, os.path.abspath('.'))
# -- General configuration -----------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.coverage', 'sphinx.ext.viewcode']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'Cloud Control node'
copyright = u'2011, Smartjog'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '18'
# The full version, including alpha/beta/rc tags.
release = '18'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = []
# The reST default role (used for this markup: `text`) to use for all documents.
#default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# -- Options for HTML output ---------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# " v documentation".
#html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# If false, no module index is generated.
#html_domain_indices = True
# If false, no index is generated.
#html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
#html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
#html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = None
# Output file base name for HTML help builder.
htmlhelp_basename = 'CloudControlnodedoc'
# -- Options for LaTeX output --------------------------------------------------
# The paper size ('letter' or 'a4').
#latex_paper_size = 'letter'
# The font size ('10pt', '11pt' or '12pt').
#latex_font_size = '10pt'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
('index', 'CloudControlnode.tex', u'Cloud Control node Documentation',
u'Smartjog', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# If true, show page references after internal links.
#latex_show_pagerefs = False
# If true, show URL addresses after external links.
#latex_show_urls = False
# Additional stuff for the LaTeX preamble.
#latex_preamble = ''
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# If false, no module index is generated.
#latex_domain_indices = True
# -- Options for manual page output --------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
('index', 'cloudcontrolnode', u'Cloud Control node Documentation',
[u'Smartjog'], 1)
]
# -- Options for autodoc ------------------------------------------------------
autoclass_content = 'both'
autodoc_member_order = 'bysource'
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/exc.rst 0000664 0000000 0000000 00000000113 13444400225 0024176 0 ustar 00root root 0000000 0000000 Exceptions
==========
.. automodule:: cloudcontrol.node.exc
:members:
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/host.rst 0000664 0000000 0000000 00000000236 13444400225 0024402 0 ustar 00root root 0000000 0000000 Host
====
Handler
-------
.. autoclass:: cloudcontrol.node.host.Handler
:members:
Tags
----
.. automodule:: cloudcontrol.node.host.tags
:members:
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/hot_migration.rst 0000664 0000000 0000000 00000045511 13444400225 0026275 0 ustar 00root root 0000000 0000000 Live migration steps
====================
See http://libvirt.org/migration.html for migration details using libvirt.
This page explains the live migration scenario between two hypervisors hv1 and
hv2.
We try to move a VM from hv1 to hv2 whose name is "TestVM".
VM has a disk which is a LVM backend named "/dev/vg/TestVM".
A shell command is specified for each step to reproduce the steps.
it-test-16.lab.fr.lan is the source hypervisor (where the VM is)
it-test-17.lab.fr.lan is the destination hypervisor (where we migrate the VM)
[...] in the command results means some lines were removed.
TODO schematics
1 - Pause VM
------------
We must pause VM while we manipulate the device mapper because I/O errors would
occur otherwise.::
virsh suspend TestVM
2 - Set up DRBD
---------------
Reload kernel module with proper parameters::
it-test-16.lab.fr.lan ~ 2 # modprobe drbd minor_count=100 usermode_helper=/bin/true
it-test-16.lab.fr.lan ~ 0 #
it-test-17.lab.fr.lan ~ 2 # modprobe drbd minor_count=100 usermode_helper=/bin/true
it-test-17.lab.fr.lan ~ 0 #
Create DRBD device metadata (external).
See http://www.drbd.org/users-guide-emb/ch-internals.html#s-external-meta-data
for more information.
We find the metadata size we need::
it-test-16.lab.fr.lan ~ 130 # ipython
Python 2.7.3rc2 (default, Apr 22 2012, 22:30:17)
Type "copyright", "credits" or "license" for more information.
IPython 0.12 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
In [1]: import libvirt
In [2]: c = libvirt.open('qemu:///system')
In [4]: c.listDomainsID()
Out[4]: [6, 7]
In [7]: d = c.lookupByID(7)
In [8]: d.name()
Out[8]: 'TestVMDisk-small-16'
In [9]: d.XMLDesc(0)
Out[9]: "\n
[...]
\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
[...]
"
In [10]: s = c.storagePoolLookupByName('vg')
In [13]: s.listVolumes()
Out[13]: ['TestVMDisk-16', 'TestVMDisk-small-16', 'TestVMDisk-17']
In [15]: v = c.storageVolLookupByPath('/dev/vg/TestVMDisk-small-16')
In [16]: v.name()
Out[16]: 'TestVMDisk-small-16'
In [18]: _, capacity, _ = v.info()
In [19]: capacity
Out[19]: 209715200L
In [20]: drbd_meta_size = max(capacity / 32768 + 4 * 2 ** 20, 128 * 2 ** 20)
In [21]: drbd_meta_size
Out[21]: 134217728
Create the LV for the VM on the destination hypervisor::
it-test-17.lab.fr.lan ~ 0 # lvcreate -n TestVMDisk-small-16 -L 200m vg
Logical volume "TestVMDisk-small-16" created
it-test-17.lab.fr.lan ~ 0 # lvs
LV VG Attr LSize Origin Snap% Move Log Copy% Convert
TestVMDisk-small-16 vg -wi-a- 200.00m
We create the metadata LV with the appropriate size on both hypervisors::
it-test-16.lab.fr.lan ~ 5 # lvcreate -v -n TestVMDisk-small-16.drbdmeta -L 134217728b vg
Setting logging type to disk
Finding volume group "vg"
Archiving volume group "vg" metadata (seqno 4).
Creating logical volume TestVMDisk-small-16.drbdmeta
Creating volume group backup "/etc/lvm/backup/vg" (seqno 5).
Found volume group "vg"
activation/volume_list configuration setting not defined, checking only host tags for vg/TestVMDisk-small-16.drbdmeta
Creating vg-TestVMDisk--small--16.drbdmeta
Loading vg-TestVMDisk--small--16.drbdmeta table (253:3)
Resuming vg-TestVMDisk--small--16.drbdmeta (253:3)
Clearing start of logical volume "TestVMDisk-small-16.drbdmeta"
Creating volume group backup "/etc/lvm/backup/vg" (seqno 5).
Logical volume "TestVMDisk-small-16.drbdmeta" created
it-test-16.lab.fr.lan ~ 0 # lvs
LV VG Attr LSize Origin Snap% Move Log Copy% Convert
[...]
TestVMDisk-small-16 vg -wi-ao 200.00m
TestVMDisk-small-16.drbdmeta vg -wi-a- 128.00m
it-test-17.lab.fr.lan ~ 3 # lvcreate -v -n TestVMDisk-small-16.drbdmeta -L 134217728b vg
Setting logging type to disk
Finding volume group "vg"
Archiving volume group "vg" metadata (seqno 2).
Creating logical volume TestVMDisk-small-16.drbdmeta
Creating volume group backup "/etc/lvm/backup/vg" (seqno 3).
Found volume group "vg"
activation/volume_list configuration setting not defined, checking only host tags for vg/TestVMDisk-small-16.drbdmeta
Creating vg-TestVMDisk--small--16.drbdmeta
Loading vg-TestVMDisk--small--16.drbdmeta table (253:1)
Resuming vg-TestVMDisk--small--16.drbdmeta (253:1)
Clearing start of logical volume "TestVMDisk-small-16.drbdmeta"
Creating volume group backup "/etc/lvm/backup/vg" (seqno 3).
Logical volume "TestVMDisk-small-16.drbdmeta" created
Create a device mapper with a different name (.copy) that points to the LV::
it-test-16.lab.fr.lan ~ 0 # dmsetup table --showkeys /dev/vg/TestVMDisk-small-16
0 409600 linear 9:126 8300544
it-test-16.lab.fr.lan ~ 0 # dmsetup table --showkeys /dev/vg/TestVMDisk-small-16 | dmsetup create vg-TestVMDisk--small--16.copy
it-test-17.lab.fr.lan ~ 0 # dmsetup table --showkeys /dev/vg/TestVMDisk-small-16
0 409600 linear 9:126 2048
it-test-17.lab.fr.lan ~ 0 # dmsetup table --showkeys /dev/vg/TestVMDisk-small-16 | dmsetup create vg-TestVMDisk--small--16.copy
it-test-17.lab.fr.lan ~ 0 # dmsetup table --showkeys /dev/vg/TestVMDisk-small-16
0 409600 linear 9:126 2048
it-test-17.lab.fr.lan ~ 0 # dmsetup table --showkeys
vg-TestVMDisk--small--16: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 411648
Wipe and initialize drbd metadata::
it-test-16.lab.fr.lan ~ 20 # drbdmeta --force /dev/drbd0 v08 /dev/vg/TestVMDisk-small-16.drbdmeta 0 wipe-md
There appears to be no drbd meta data to wipe out?
it-test-16.lab.fr.lan ~ 0 # drbdmeta --force /dev/drbd0 v08 /dev/vg/TestVMDisk-small-16.drbdmeta 0 create-md
md_offset 0
al_offset 4096
bm_offset 36864
Found some data
==> This might destroy existing data! <==
Do you want to proceed?
*** confirmation forced via --force option ***
Writing meta data...
initializing activity log
NOT initialized bitmap
New drbd meta data block successfully created.
it-test-17.lab.fr.lan ~ 0 # drbdmeta --force /dev/drbd0 v08 /dev/vg/TestVMDisk-small-16.drbdmeta 0 wipe-md
There appears to be no drbd meta data to wipe out?
it-test-17.lab.fr.lan ~ 0 # drbdmeta --force /dev/drbd0 v08 /dev/vg/TestVMDisk-small-16.drbdmeta 0 create-md
md_offset 0
al_offset 4096
bm_offset 36864
Found some data
==> This might destroy existing data! <==
Do you want to proceed?
*** confirmation forced via --force option ***
Writing meta data...
initializing activity log
NOT initialized bitmap
New drbd meta data block successfully created.
Create DRBD device with LV (.copy) and metadata::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 disk /dev/mapper/vg-TestVMDisk--small--16.copy /dev/vg/TestVMDisk-small-16.drbdmeta 0 --create-device
it-test-16.lab.fr.lan ~ 0 # ls /dev/drb*
/dev/drbd0
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 disk /dev/mapper/vg-TestVMDisk--small--16.copy /dev/vg/TestVMDisk-small-16.drbdmeta 0 --create-device
it-test-17.lab.fr.lan ~ 2 # ls /dev/drb*
/dev/drbd0
Connect DRBD together::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 net 0.0.0.0:7788 192.168.32.209:7788 C -m -S 10000000 &
[1] 9439
[1]+ Done drbdsetup /dev/drbd0 net 0.0.0.0:7788 192.168.32.209:7788 C -m -S 10000000
it-test-16.lab.fr.lan ~ 0 # netstat -t
Active Internet connections (w/o servers)
Proto Recv-Q Send-Q Local Address Foreign Address State
[...]
tcp 0 0 it-test-16.lab.fr.:7788 it-test-17.lab.fr:53443 ESTABLISHED
tcp 0 0 it-test-16.lab.fr:54046 it-test-17.lab.fr.:7788 ESTABLISHED
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 net 0.0.0.0:7788 192.168.32.208:7788 C -m -S 10000000 &
[1] 8113
[...]
[1]+ Done drbdsetup /dev/drbd0 net 0.0.0.0:7788 192.168.32.208:7788 C -m -S 10000000
it-test-17.lab.fr.lan ~ 0 # netstat -t
Active Internet connections (w/o servers)
Proto Recv-Q Send-Q Local Address Foreign Address State
[...]
tcp 0 0 it-test-17.lab.fr.:7788 it-test-16.lab.fr:54046 ESTABLISHED
tcp 0 0 it-test-17.lab.fr:53443 it-test-16.lab.fr.:7788 ESTABLISHED
Set synchronization rate::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 syncer -r 50000
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 syncer -r 50000
Make sure DRBD is connected to its peer::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 wait-connect -t 60 -d 60 -o 60
it-test-16.lab.fr.lan ~ 0 #
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 wait-connect -t 60 -d 60 -o 60
it-test-17.lab.fr.lan ~ 0 #
Set roles for DRBD (source is primary and destination is secondary)::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 primary -o
it-test-16.lab.fr.lan ~ 0 #
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 secondary
it-test-17.lab.fr.lan ~ 0 #
Wait for synchronisation to complete::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 wait-sync
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 status
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 wait-sync
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 status
Set DRBD in primary primary mode (see
http://www.drbd.org/users-guide/ch-admin.html#s-roles and
http://www.drbd.org/users-guide/s-enable-dual-primary.html
for more information)::
it-test-17.lab.fr.lan ~ 1 # drbdsetup /dev/drbd0 primary -o
it-test-17.lab.fr.lan ~ 0 #
Set the original device mapper to the DRBD device (ie takeover)::
it-test-16.lab.fr.lan ~ 0 # python -c 'print "0 %d linear /dev/drbd0 0" % (134217728 / 512)' | dmsetup load /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--17: 0 8192000 linear 9:126 8710144
vg-TestVMDisk--16: 0 8298496 linear 9:126 2048
vg-TestVMDisk--small--16: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 16902144
it-test-16.lab.fr.lan ~ 0 # dmsetup suspend /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup resume /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--17: 0 8192000 linear 9:126 8710144
vg-TestVMDisk--16: 0 8298496 linear 9:126 2048
vg-TestVMDisk--small--16: 0 262144 linear 147:0 0
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 16902144
it-test-17.lab.fr.lan ~ 0 # python -c 'print "0 %d linear /dev/drbd0 0" % (134217728 / 512)' | dmsetup load /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--small--16: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 411648
it-test-17.lab.fr.lan ~ 0 # dmsetup suspend /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup resume /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--small--16: 0 262144 linear 147:0 0
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 411648
3 - Resume VM
-------------
FIXME ?
::
virsh resume TestVMDisk-small-16
4 - Wait for sync
-----------------
FIXME ?
Wait for DRBD to synchronise. Synchronisation status can be checked with the
following command::
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 status
When done, setup DRBD in primary-primary mode.
5 - Migrate
-----------
::
it-test-16.lab.fr.lan ~ 0 # ipython
Python 2.7.3rc2 (default, Apr 22 2012, 22:30:17)
Type "copyright", "credits" or "license" for more information.
IPython 0.12 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
In [1]: import libvirt
In [2]: c = libvirt.open('qemu:///system')
In [3]: dest_c = libvirt.open('qemu+tcp://it-test-17.lab.fr.lan:1234/system')
In [5]: c.listDomainsID()
Out[5]: [6, 7]
In [8]: flags = libvirt.VIR_MIGRATE_LIVE | libvirt.VIR_MIGRATE_PEER2PEER | libvirt.VIR_MIGRATE_TUNNELLED | libvirt.VIR_MIGRATE_PERSIST_DEST | libvirt.VIR_MIGRATE_UNDEFINE_SOURCE
In [9]: flags
Out[9]: 31
In [11]: d = c.lookupByID(7)
In [12]: d.name()
Out[12]: 'TestVMDisk-small-16'
In [20]: d.migrate(dest_c, flags, None, 'qemu+tcp://it-test-17.lab.fr.lan:1234/system', 0)
Out[20]:
In case your virtio driver is not configured with cache set to 'none' you will
end up with the following message::
In [13]: d.migrate(dest_c, flags, None, 'qemu+tcp://it-test-17.lab.fr.lan:1234/system', 0)
libvir: QEMU error : Unsafe migration: Migration may lead to data corruption if disks use cache != none
---------------------------------------------------------------------------
libvirtError Traceback (most recent call last)
/root/ in ()
----> 1 d.migrate(dest_c, flags, None, 'qemu+tcp://it-test-17.lab.fr.lan:1234/system', 0)
/usr/lib/python2.7/dist-packages/libvirt.pyc in migrate(self, dconn, flags, dname, uri, bandwidth)
820 else: dconn__o = dconn._o
821 ret = libvirtmod.virDomainMigrate(self._o, dconn__o, flags, dname, uri, bandwidth)
--> 822 if ret is None:raise libvirtError('virDomainMigrate() failed', dom=self)
823 __tmp = virDomain(self,_obj=ret)
824 return __tmp
libvirtError: Unsafe migration: Migration may lead to data corruption if disks use cache != none
You may force the migration process by setting migration flags (libvirt >=
0.9.11)::
In [14]: flags |= libvirt.VIR_MIGRATE_UNSAFE
When done, setup DRBD in secondary-primary mode.
6 - Pause VM
------------
We must pause VM while we manipulate the device mapper because I/O errors would
occur otherwise.
7 - Remove DRBD
---------------
Remove DRBD::
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 disconnect
it-test-17.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 detach
/dev/drbd0: State change failed: (-2) Need access to UpToDate data
it-test-17.lab.fr.lan ~ 17 # drbdsetup /dev/drbd0 down
it-test-17.lab.fr.lan ~ 0 #
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 disconnect
it-test-16.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 detach
/dev/drbd0: State change failed: (-2) Need access to UpToDate data
it-test-16.lab.fr.lan ~ 17 # drbdsetup /dev/drbd0 down
it-test-16.lab.fr.lan ~ 0 #
Remove DM::
it-test-17.lab.fr.lan ~ 0 # echo '0 409600 linear 9:126 8300544' | dmsetup load /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--small--16: 0 262144 linear 147:0 0
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 411648
it-test-17.lab.fr.lan ~ 0 # dmsetup suspend /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup resume /dev/vg/TestVMDisk-small-16
it-test-17.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--small--16: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 2048
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 411648
it-test-16.lab.fr.lan ~ 0 # echo '0 409600 linear 9:126 8300544' | dmsetup load /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--17: 0 8192000 linear 9:126 8710144
vg-TestVMDisk--16: 0 8298496 linear 9:126 2048
vg-TestVMDisk--small--16: 0 262144 linear 147:0 0
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 16902144
it-test-16.lab.fr.lan ~ 0 # dmsetup suspend /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup resume /dev/vg/TestVMDisk-small-16
it-test-16.lab.fr.lan ~ 0 # dmsetup table
vg-TestVMDisk--17: 0 8192000 linear 9:126 8710144
vg-TestVMDisk--16: 0 8298496 linear 9:126 2048
vg-TestVMDisk--small--16: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.copy: 0 409600 linear 9:126 8300544
vg-TestVMDisk--small--16.drbdmeta: 0 262144 linear 9:126 16902144
Remove copy DM::
it-test-16.lab.fr.lan ~ 0 # dmsetup remove /dev/mapper/vg-TestVMDisk--small--16.copy
it-test-16.lab.fr.lan ~ 0 #
it-test-17.lab.fr.lan ~ 0 # dmsetup remove /dev/mapper/vg-TestVMDisk--small--16.copy
it-test-17.lab.fr.lan ~ 0 #
Remove metadata LV::
it-test-17.lab.fr.lan ~ 5 # lvremove /dev/vg/TestVMDisk-small-16.drbdmeta
Do you really want to remove active logical volume TestVMDisk-small-16.drbdmeta? [y/n]: y
Logical volume "TestVMDisk-small-16.drbdmeta" successfully removed
it-test-17.lab.fr.lan ~ 0 #
it-test-16.lab.fr.lan ~ 5 # lvremove /dev/vg/TestVMDisk-small-16.drbdmeta
Do you really want to remove active logical volume TestVMDisk-small-16.drbdmeta? [y/n]: y
Logical volume "TestVMDisk-small-16.drbdmeta" successfully removed
it-test-16.lab.fr.lan ~ 0 #
Remove VM LV on source hypervisor::
it-test-16.lab.fr.lan ~ 0 # lvremove /dev/vg/TestVMDisk-small-16
Do you really want to remove active logical volume TestVMDisk-small-16? [y/n]: y
Logical volume "TestVMDisk-small-16" successfully removed
it-test-16.lab.fr.lan ~ 0 #
8 - Resume VM
-------------
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/hypervisor.rst 0000664 0000000 0000000 00000000755 13444400225 0025645 0 ustar 00root root 0000000 0000000 Hypervisor
==========
Hypervisor handler
------------------
.. autoclass:: cloudcontrol.node.hypervisor.Handler
:members:
Hypervisor object
-----------------
.. autoclass:: cloudcontrol.node.hypervisor.Hypervisor
:members:
Storage pools and volumes
-------------------------
.. autoclass:: cloudcontrol.node.hypervisor.StorageIndex
:members:
.. autoclass:: cloudcontrol.node.hypervisor.Storage
:members:
.. autoclass:: cloudcontrol.node.hypervisor.Volume
:members:
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/index.rst 0000664 0000000 0000000 00000001127 13444400225 0024534 0 ustar 00root root 0000000 0000000 .. Cloud Control node documentation master file, created by
sphinx-quickstart on Thu Sep 15 15:48:00 2011.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to Cloud Control node's documentation!
==============================================
Contents:
.. toctree::
:maxdepth: 2
architecture
tags
plugins
host
libvirt
hypervisor
vm
jobs
cold_migration
hot_migration
test
exc
utils
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/jobs.rst 0000664 0000000 0000000 00000000647 13444400225 0024370 0 ustar 00root root 0000000 0000000 Jobs
====
Jobs manager
------------
.. autoclass:: cloudcontrol.node.jobs.JobManager
:members:
Abstract jobs
-------------
.. autoclass:: cloudcontrol.node.jobs.BaseThreadedJob
:members:
.. autoclass:: cloudcontrol.node.jobs.ForkedJob
:members:
.. autoclass:: cloudcontrol.node.jobs.BaseIOJob
:members:
Hypervisor jobs
---------------
.. automodule:: cloudcontrol.node.hypervisor.jobs
:members:
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/libvirt.rst 0000664 0000000 0000000 00000000120 13444400225 0025070 0 ustar 00root root 0000000 0000000 Libvirt
=======
.. automodule:: cloudcontrol.node.hypervisor.lib
:members:
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/plugins.rst 0000664 0000000 0000000 00000000111 13444400225 0025076 0 ustar 00root root 0000000 0000000 Plugins
=======
.. automodule:: cloudcontrol.node.plugins
:members:
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/tags.rst 0000664 0000000 0000000 00000000100 13444400225 0024351 0 ustar 00root root 0000000 0000000 Tags
====
.. automodule:: cloudcontrol.node.tags
:members:
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/test.rst 0000664 0000000 0000000 00000002412 13444400225 0024402 0 ustar 00root root 0000000 0000000 Test scenario for cc-node
=========================
Host
----
* Check all tags
* Check handlers: shutdown, execute_command
Hypervisor
----------
* Check all tags
* Check handlers: vm_start, vm_stop (+destroy), vm_suspend, vm_resume, vm_undefine,
vm_export, vm_define
* Check statuses of VMs change
* Check objects are registered/unregistered to the cc-server when VMs
disapear/appear.
Error handling
..............
Behaviour when libvirt connection is lost:
* libvirtstatus tag is updated to disconnected
* domains are unregistered
* some tags (relative to storage pools are unregistered)
* handlers relative to VMs are removed
Behaviour when libvirt connection is retrieved:
* libvirtstatus tag is updated to connected
* domains are registered
* tags (relative to storage pools) are registered
* handlers relative to VMs are added
CC-server
---------
Error handling
..............
Behaviour when connection is lost:
* Nothing changes
Behaviour when connection is retrieved:
* Authentify
* Check role returned and load another main plugin is role changed, else
keeps current runnning plugin
* Register tags and objects
Error handling
--------------
* Test with libvirt/cc-server failures multiple configuration
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/utils.rst 0000664 0000000 0000000 00000000103 13444400225 0024556 0 ustar 00root root 0000000 0000000 Utils
=====
.. automodule:: cloudcontrol.node.utils
:members:
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/dev/source/vm.rst 0000664 0000000 0000000 00000000336 13444400225 0024050 0 ustar 00root root 0000000 0000000 Domains
=======
KVM virtual machines
--------------------
.. automodule:: cloudcontrol.node.hypervisor.domains
:members:
KVM tags
--------
.. automodule:: cloudcontrol.node.hypervisor.domains.vm_tags
:members:
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/user/ 0000775 0000000 0000000 00000000000 13444400225 0021572 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/user/Makefile 0000664 0000000 0000000 00000011032 13444400225 0023227 0 ustar 00root root 0000000 0000000 # Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) source
.PHONY: help clean html dirhtml singlehtml pickle json htmlhelp qthelp devhelp epub latex latexpdf text man changes linkcheck doctest
help:
@echo "Please use \`make ' where is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " singlehtml to make a single large HTML file"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " devhelp to make HTML files and a Devhelp project"
@echo " epub to make an epub"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " latexpdf to make LaTeX files and run them through pdflatex"
@echo " text to make text files"
@echo " man to make manual pages"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
clean:
-rm -rf $(BUILDDIR)/*
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
singlehtml:
$(SPHINXBUILD) -b singlehtml $(ALLSPHINXOPTS) $(BUILDDIR)/singlehtml
@echo
@echo "Build finished. The HTML page is in $(BUILDDIR)/singlehtml."
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/CloudControlnode.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/CloudControlnode.qhc"
devhelp:
$(SPHINXBUILD) -b devhelp $(ALLSPHINXOPTS) $(BUILDDIR)/devhelp
@echo
@echo "Build finished."
@echo "To view the help file:"
@echo "# mkdir -p $$HOME/.local/share/devhelp/CloudControlnode"
@echo "# ln -s $(BUILDDIR)/devhelp $$HOME/.local/share/devhelp/CloudControlnode"
@echo "# devhelp"
epub:
$(SPHINXBUILD) -b epub $(ALLSPHINXOPTS) $(BUILDDIR)/epub
@echo
@echo "Build finished. The epub file is in $(BUILDDIR)/epub."
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make' in that directory to run these through (pdf)latex" \
"(use \`make latexpdf' here to do that automatically)."
latexpdf:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo "Running LaTeX files through pdflatex..."
make -C $(BUILDDIR)/latex all-pdf
@echo "pdflatex finished; the PDF files are in $(BUILDDIR)/latex."
text:
$(SPHINXBUILD) -b text $(ALLSPHINXOPTS) $(BUILDDIR)/text
@echo
@echo "Build finished. The text files are in $(BUILDDIR)/text."
man:
$(SPHINXBUILD) -b man $(ALLSPHINXOPTS) $(BUILDDIR)/man
@echo
@echo "Build finished. The manual pages are in $(BUILDDIR)/man."
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/user/source/ 0000775 0000000 0000000 00000000000 13444400225 0023072 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/user/source/conf.py 0000664 0000000 0000000 00000015657 13444400225 0024407 0 ustar 00root root 0000000 0000000 # -*- coding: utf-8 -*-
#
# Cloud Control node documentation build configuration file, created by
# sphinx-quickstart on Thu Sep 15 15:46:56 2011.
#
# This file is execfile()d with the current directory set to its containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#sys.path.insert(0, os.path.abspath('.'))
# -- General configuration -----------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.coverage']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'Cloud Control node'
copyright = u'2011, Smartjog'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '18'
# The full version, including alpha/beta/rc tags.
release = '18'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
exclude_patterns = []
# The reST default role (used for this markup: `text`) to use for all documents.
#default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# -- Options for HTML output ---------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
html_theme = 'default'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# " v documentation".
#html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# If false, no module index is generated.
#html_domain_indices = True
# If false, no index is generated.
#html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
#html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
#html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = None
# Output file base name for HTML help builder.
htmlhelp_basename = 'CloudControlnodedoc'
# -- Options for LaTeX output --------------------------------------------------
# The paper size ('letter' or 'a4').
#latex_paper_size = 'letter'
# The font size ('10pt', '11pt' or '12pt').
#latex_font_size = '10pt'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
('index', 'CloudControlnode.tex', u'Cloud Control node Documentation',
u'Smartjog', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# If true, show page references after internal links.
#latex_show_pagerefs = False
# If true, show URL addresses after external links.
#latex_show_urls = False
# Additional stuff for the LaTeX preamble.
#latex_preamble = ''
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# If false, no module index is generated.
#latex_domain_indices = True
# -- Options for manual page output --------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
('index', 'cloudcontrolnode', u'Cloud Control node Documentation',
[u'Smartjog'], 1)
]
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/user/source/index.rst 0000664 0000000 0000000 00000000741 13444400225 0024735 0 ustar 00root root 0000000 0000000 .. Cloud Control node documentation master file, created by
sphinx-quickstart on Thu Sep 15 15:46:56 2011.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to Cloud Control node's documentation!
==============================================
Contents:
.. toctree::
:maxdepth: 2
overview
install
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/user/source/install.rst 0000664 0000000 0000000 00000002213 13444400225 0025270 0 ustar 00root root 0000000 0000000 Installation
============
Installation
------------
Repository
..........
Install from Smartjog debian repository:
.. code-block:: text
# smartjog
deb http://debian.fr.smartjog.net/debian-smartjog/ squeeze smartjog
**Cloud Control node** provides two packages named ``cc-node`` and
``cc-node-hypervisor``. The last is just a meta package that provides
dependencies for running the **Cloud Control node** as an **hypervisor** (``KVM``).
The first contains all the **Cloud Control node** code base and can be installed
if you only need to run the node with the **host** role.
Install with your favorite package manager as root:
.. code-block:: bash
# apt-get install cc-node
or
.. code-block:: bash
# apt-get install cc-node-hypervisor
Configuration
-------------
Example configuration file:
``[ccserver]`` section:
.. literalinclude:: ../../../etc/cc-node.conf
:lines: 1-5
For logging section, see `Python documentation `_.
Full default file (using syslog):
.. literalinclude:: ../../../etc/cc-node.conf
:lines: 1-18, 20-30, 36, 38
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/user/source/libvirt.rst 0000664 0000000 0000000 00000004353 13444400225 0025304 0 ustar 00root root 0000000 0000000 Libvirt hot migration compatibility
-----------------------------------
Should work:
From 0.8.8 to 0.9.8
From 0.9.8 to 0.9.12
Should not work:
From 0.9.12 to 0.9.8 (because of bad xml seclabel, event if set explicitly it
doesn't work)
From 0.9.8 to 0.8.8 (because of XML format compat)
Note for all versions:
When migrating, libvirt try to acquire some mutex on the domain and this can
block forever (0.8.8) or timeout (0.9.8 and +). Take a look at the node logs to
be sure.
In the later case, you must restart libvirt in order for the migration to work
properly.
What to do when live migration fails
------------------------------------
Check DRBD and remove if needed:
::
# identify which DRBD volume needs to go down
it-test-15.lab.fr.lan ~ 0 # cat /proc/drbd
version: 8.3.9 (api:88/proto:86-95)
srcversion: CF228D42875CF3A43F2945A
0: cs:Connected ro:Primary/Primary ds:UpToDate/UpToDate C r-----
ns:2097152 nr:0 dw:0 dr:2097352 al:0 bm:128 lo:0 pe:0 ua:0 ap:0 ep:1 wo:f oos:0
# identify all LV (VM, copy and DRBD meta)
it-test-15.lab.fr.lan ~ 0 # dmsetup ls
vg-Test--Clone--root (253, 5)
vg-Test--Clone--root.copy (253, 2)
vg-Test--Clone--root.drbdmeta (253, 1)
vg-Etherpad--Etherpad (253, 0)
# reload VM LV table if needed
it-test-15.lab.fr.lan ~ 0 # dmsetup table vg-Test--Clone--root.copy | dmsetup load vg-Test--Clone--root
it-test-15.lab.fr.lan ~ 0 # dmsetup suspend vg-Test--Clone--root
it-test-15.lab.fr.lan ~ 0 # dmsetup resume vg-Test--Clone--root
# halt DRBD volume
it-test-15.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 disconnect
it-test-15.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 secondary
it-test-15.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 detach
it-test-15.lab.fr.lan ~ 0 # drbdsetup /dev/drbd0 down
# remove copy DM
it-test-15.lab.fr.lan ~ 0 # dmsetup remove vg-Test--Clone--root.copy
# remove DRBD meta
it-test-15.lab.fr.lan ~ 0 # lvremove /dev/vg/Test-Clone-root.drbdmeta
# /!\ on hypervisor where the VM is not present, also remove the VM LV
it-test-15.lab.fr.lan ~ 0 # lvremove /dev/vg/Test-Clone-root
# check that libvirt ended the qemu process
# refresh libvirt pools
it-test-15.lab.fr.lan ~ 0 # virsh pool-refresh vg
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/docs/user/source/overview.rst 0000664 0000000 0000000 00000000225 13444400225 0025471 0 ustar 00root root 0000000 0000000 Cloud Control
=============
CLoud Control Node
------------------
Objects
-------
Tags
----
TQL
---
Role
----
Host
----
Hypervisor
----------
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/etc/ 0000775 0000000 0000000 00000000000 13444400225 0020437 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/etc/cc-node.conf 0000664 0000000 0000000 00000001601 13444400225 0022614 0 ustar 00root root 0000000 0000000 [node]
# cc-server informations
address=_CC_SERVER_HOST_
port=1984
login=_CC_SERVER_LOGIN_
password=_CC_SERVER_PASSWD_
# logging verbosity (can be specified as number or name)
# 0=error, 1=warning, 2=info, 3=debug
# verbosity = error
# when debug is on, std output/input is not closed and logging is done through
# stderr instead of syslog
# debug = off
# directory where the cc-node jobs are saved
# jobs_store_path=/var/lib/cc-node/jobs/
# file where the plugins are saved for automatic reinstallation at node startup
# plugins_store_path=/var/lib/cc-node/plugins # it's a file
# backward compatibility for execute and shutdown handler
# remote_execution = yes
[node_handler] # this section is optional
# section to allow or forbid usage of RPC handlers at local level
# format is handler_name = yes|no
# execute = yes # this will be erased by the remote_execution function if set
# ...
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/examples/ 0000775 0000000 0000000 00000000000 13444400225 0021502 5 ustar 00root root 0000000 0000000 cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/examples/cc-node-debug.conf 0000664 0000000 0000000 00000000133 13444400225 0024742 0 ustar 00root root 0000000 0000000 [node]
address=__HOST__
port=1984
login=__USER__
password=__PASSWD__
verbosity=3
debug=on
cc-node-f5dd2058c95048237a7190404aa5813d84cefa8d/setup.py 0000664 0000000 0000000 00000002555 13444400225 0021405 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
#coding=utf-8
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import os
from setuptools import setup, find_packages
from cloudcontrol.node import __version__
ldesc = open(os.path.join(os.path.dirname(__file__), 'README')).read()
setup(
name='cc-node',
version=__version__,
description='Cloud Control Node',
long_description=ldesc,
author='Anaël Beutot',
author_email='anael.beutot@smartjog.com',
license='LGPL3',
namespace_packages=['cloudcontrol'],
packages=find_packages(),
scripts=['bin/cc-node'],
data_files=(
('/etc/', ('etc/cc-node.conf',)),
),
classifiers=[
'Operating System :: Unix',
'Programming Language :: Python',
],
)