Newer
Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see <http://www.gnu.org/licenses/>.
import os
import pty
import fcntl
import subprocess
from itertools import imap, chain
from sjrpc.utils import pass_connection, threadless
from cloudcontrol.common.client.tags import Tag, tag_inspector, TagDB
from cloudcontrol.common.client.plugins import Base as BasePlugin, rpc_handler
from cloudcontrol.common.jobs import JobsManager, JobsStore
from cloudcontrol.common.helpers.logger import patch_logging; patch_logging()
from cloudcontrol.node.utils import execute
from cloudcontrol.node.exc import RemoteExecutionError
from cloudcontrol.node.host.jobs import NodeJobsManagerInterface, ScriptJob
from cloudcontrol.node.host.plugins import PluginMethodJob, Plugin
logger = logging.getLogger(__name__)
def disk_tag_value(disk_name):
def size():
s = open(os.path.join('/sys/block', disk_name, 'size')).read().strip()
try:
s = int(s)
if s > 0:
return s * 512
else:
return None
except ValueError:
return None
return size
class FakePtySocket(object):
"""A fake socket object wrapping a :class:`subprocess.Popen` object
standard input/output.
"""
def __init__(self, fd):
self._fd = fd
def recv(self, size):
return os.read(self._fd, size)
def send(self, data):
return os.write(self._fd, data)
def fileno(self):
return self._fd
def setblocking(self, blocking):
if not blocking:
# Disable blocking mode on stdin:
current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
pass
else:
raise NotImplementedError('Not implemented')
def close(self):
pass
class RemoteShell(object):
"""Handles basic operations on remote shell."""
def __init__(self, conn, exec_='/bin/bash'):
:param exec_: path of binary to execute
"""
# handles close when waiting process to end from an other thread
self.master, self.slave = pty.openpty()
self.endpoint = FakePtySocket(self.master)
self.proto = conn.create_tunnel(endpoint=self.endpoint,
on_shutdown=self.on_tunnel_shutdown)
try:
self.process = subprocess.Popen(
[exec_], stdout=self.slave,
bufsize=0, stdin=self.slave,
stderr=self.slave,
preexec_fn=os.setsid,
close_fds=True,
cwd=os.environ.get('HOME', '/'),
)
except OSError:
logger.exception('Error while executing remote shell')
# close opened fds
self.child_watcher = conn.loop.child(self.process.pid, False,
self.child_cb)
self.child_watcher.start()
@property
def label(self):
return self.proto.label
def resize(self, row, col, xpixel, ypixel):
if self.master is not None:
ioctl(self.master, termios.TIOCSWINSZ,
struct.pack('HHHH', row, col, xpixel, ypixel))
def on_tunnel_shutdown(self, *args):
# *args is for callback arguments (ignored)
self.proto, proto = None, self.proto # prevents multiple calls to
# close method
self.close()
def child_cb(self, watcher, revents):
logger.debug('Tunnel closed by process termination')
self.process.returncode = watcher.rstatus
self.close()
def close(self):
if self.child_watcher is not None:
self.child_watcher.stop()
self.child_watcher = None
if self.process.returncode is None:
# process is still alive
self.process.kill()
if self.master is not None:
try:
os.close(self.master)
except OSError:
logger.error('Error when closing file descriptor for pty')
self.master = None
if self.slave is not None:
try:
os.close(self.slave)
except OSError:
logger.error('Error when closing slave file descriptor for pty')
self.slave = None
if self.proto is not None:
try:
self.proto.shutdown()
except Exception:
logger.error('Error while trying to close RPC tunnel')
self.proto = None
"""Handler for host role."""
def __init__(self, *args, **kwargs):
BasePlugin.__init__(self, *args, **kwargs)
self.tag_db.add_tags(tag_inspector(tags, self))
lambda d: Tag('disk%s_size' % d, disk_tag_value(d), 60),
self.tag_db['__main__']['disk']._calculate_value().split(),
# running shells
self.shells = dict()
# running remote commands
self.commands = set()
#: jobs manager (different from MainLoop.jobs_manager)
try:
self.jobs_manager = JobsManager(logger, NodeJobsManagerInterface(self),
JobsStore(self.main.config.jobs_store_path))
except EnvironmentError as e:
logger.critical('Cannot access jobs directory: %s', e.strerror)
self.main.stop()
raise
#: loaded plugins
self.plugins = {} # plugin name -> plugin object
#: cache directory for scripts
try:
self.scripts_dir = tempfile.mkdtemp(prefix='cc-node-script-cache')
except EnvironmentError as e:
logger.critical('Cannot create temporary directory for scripts: %s',
e.strerror)
self.main.stop()
raise
def start(self):
BasePlugin.start(self)
# load plugins if persistent file is here
if os.path.isfile(self.main.config.plugins_store_path):
logger.debug('Loading previous plugins')
try:
with open(self.main.config.plugins_store_path) as f:
to_load = pickle.load(f)
except EnvironmentError as exc:
logger.error('Cannot load previous plugins: %s (%s)',
exc.errno, exc.strerror)
except (
EOFError,
AttributeError,
ImportError,
IndexError,
pickle.UnpicklingError,
) as exc:
logger.error('Cannot read file, bad format, %s', exc)
else:
def plugins_install():
for p in to_load:
try:
self.plugin_install(self.main.rpc_con.rpc, None, p)
logger.exception('Error while loading plugin %s', p)
else:
logger.debug('Successfuly loaded plugin %s', p)
th = threading.Thread(target=plugins_install)
th.daemon = True
th.start()
else:
logger.debug('No previously loaded plugins')
# remove script cache directory
shutil.rmtree(self.scripts_dir, ignore_errors=True)
# kill all currently running shells and commands
logger.debug('Kill currently running shells and commands')
for stuff in chain(self.shells.values(),
list(self.commands)):
stuff.close()
def update_plugin_index(self):
self.tag_db['__main__']['plugins'].update_value()
# write plugins to file
try:
with open(self.main.config.plugins_store_path, 'w') as f:
pickle.dump([p for p in self.plugins], f)
except EnvironmentError as exc:
logger.error('Cannot save loaded plugins in \'%s\': %s (%s)',
self.main.config.plugins_store_path,
exc.errno, exc.strerror)
else:
logger.debug('Plugins state saved')
def execute_command(self, command, stdin=None):
command = ['/bin/sh', '-c', command]
rcode, output = execute(self.main, command, stdin)
# 127 means command not found, 126 means not executable
if rcode == 127:
raise RemoteExecutionError('Command not found: %s' % output)
elif rcode == 126:
raise RemoteExecutionError('Command is not executable')
else:
raise RemoteExecutionError('Child exited with non zero status %s' % rcode)
def node_shutdown(self, reboot=True, gracefull=True):
"""Halt/Reboot the node.
:param bool reboot: halt/reboot the system
:param bool gracefull: force the operation (gracefull == not force)
"""
args = ['/sbin/shutdown']
if reboot:
args.append('-r')
if gracefull:
args.append('-f')
else:
args.append('-n')
else:
# halt
args.append('-h -P')
if not gracefull:
args.append('-n')
else:
args.append('0')
return self.execute_command(' '.join(args))
def shell(self, conn, shell='/bin/bash'):
"""Create a shell tunnel and return the label of the created tunnel.
"""
remote_shell = RemoteShell(conn, shell)
self.shells[remote_shell.label] = remote_shell
return remote_shell.label
@threadless
def shell_resize(self, label, row, col, xpixel, ypixel):
self.shells[label].resize(row, col, xpixel, ypixel)
def forward(self, conn, label, port, destination='127.0.0.1'):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
logger.exception('Cannot create socket in forward handler')
raise
try:
sock.connect((destination, port))
except socket.error:
logger.exception('Error while connecting to destination (forward)')
raise
conn.create_tunnel(label=label, endpoint=sock)
def script_run(self, conn, sha1, script, owner, batch=None, *args):
if not os.access(filename, os.X_OK):
try:
sha1, content = conn.call('script_get', script)
except RpcError:
logger.error('Error while retrieving script: %s', script)
raise
with open(filename, "w") as f:
try:
f.write(content)
os.chmod(filename, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
except OSError:
logger.error(
'Could not set execution permission to script: %s',
script)
raise
except IOError:
logger.error(
'Could not write script to repository: %s', script)
raise
return self.jobs_manager.spawn(ScriptJob, owner, batch=batch,
settings=dict(
script=script,
filename=filename,
args=args,
def job_cancel(self, job_id):
self.jobs_manager.get(job_id).cancel()
def job_purge(self, job_id):
self.jobs_manager.purge(job_id)
def job_attachment(self, job_id, name):
"""
try:
return self.jobs_manager.get(job_id).read_attachment(name)
except Exception:
logger.exception('Error while getting job attachment')
raise
def plugin_install(self, conn, sha1, name):
"""
:param conn: RPC connection
:param sha1: sha1 string or None (get the latest)
:param name: plugin name
"""
# check if plugin is not already loaded and upgrade it if the sha1 hash
# has changed:
if name in self.plugins:
if self.plugins[name].sha1 != sha1:
self.plugins[name].uninstall()
del self.plugins[name]
else:
return
# get plugin from server:
sha1_get, content = conn.call('plugin_get', name)
raise RuntimeError('Requested sha1 is not available on the server')
# load the plugin:
plugin_logger = logger.getChild('plugin.%s' % name)
self.plugins[name] = Plugin(plugin_logger, TagDB(self.tag_db),
name, sha1, content)
def plugin_uninstall(self, name):
plugin = self.plugins.pop(name, None)
if plugin is None:
raise KeyError('Plugin %r is not running' % name)
plugin.uninstall()
def plugin_run(self, name, method, owner, batch=None, **kwargs):
if name not in self.plugins:
raise KeyError('Plugin %r is not running' % name)
try:
func = self.plugins[name].methods[method]
except KeyError:
raise KeyError('Unknown method %r' % method)
return self.jobs_manager.spawn(PluginMethodJob, owner, batch=batch,
settings=dict(
plugin_name=name,
method_name=method,
method=func,
method_kwargs=kwargs,