# This file is part of CloudControl. # # CloudControl is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # CloudControl is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with CloudControl. If not, see . import os import pty import errno import fcntl import subprocess import logging import os.path import stat import shutil import struct import socket import termios import tempfile import threading from fcntl import ioctl import cPickle as pickle from itertools import imap, chain from subprocess import Popen, PIPE, STDOUT import pyev from sjrpc.utils import pass_connection, threadless from sjrpc.core.exceptions import RpcError from cloudcontrol.common.client.tags import Tag, tag_inspector, TagDB from cloudcontrol.common.client.plugins import Base as BasePlugin, rpc_handler from cloudcontrol.common.jobs import JobsManager, JobsStore from cloudcontrol.common.helpers.logger import patch_logging; patch_logging() from cloudcontrol.node.utils import execute from cloudcontrol.node.exc import RemoteExecutionError from cloudcontrol.node.host import tags from cloudcontrol.node.host.jobs import NodeJobsManagerInterface, ScriptJob from cloudcontrol.node.host.plugins import PluginMethodJob, Plugin logger = logging.getLogger(__name__) def disk_tag_value(disk_name): def size(): s = open(os.path.join('/sys/block', disk_name, 'size')).read().strip() try: s = int(s) if s > 0: return s * 512 else: return None except ValueError: return None return size class FakePtySocket(object): """A fake socket object wrapping a :class:`subprocess.Popen` object standard input/output. """ def __init__(self, fd): self._fd = fd def recv(self, size): return os.read(self._fd, size) def send(self, data): return os.write(self._fd, data) def fileno(self): return self._fd def setblocking(self, blocking): if not blocking: # Disable blocking mode on stdin: current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL) fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK) pass else: raise NotImplementedError('Not implemented') def close(self): pass class RemoteShell(object): """Handles basic operations on remote shell.""" def __init__(self, conn, exec_='/bin/bash'): """ :param conn: sjRPC connection :param exec_: path of binary to execute """ # handles close when waiting process to end from an other thread self.master, self.slave = pty.openpty() self.endpoint = FakePtySocket(self.master) self.proto = conn.create_tunnel(endpoint=self.endpoint, close_endpoint_on_shutdown=False, on_shutdown=self.on_tunnel_shutdown) self.conn = conn self.child_watcher = None try: self.process = subprocess.Popen( [exec_], stdout=self.slave, bufsize=0, stdin=self.slave, stderr=self.slave, preexec_fn=os.setsid, close_fds=True, cwd=os.environ.get('HOME', '/'), ) except OSError: logger.exception('Error while executing remote shell') # close opened fds self.close() raise self.child_watcher = conn.loop.child(self.process.pid, False, self.child_cb) self.child_watcher.start() @property def label(self): return self.proto.label def resize(self, row, col, xpixel, ypixel): if self.master is not None: ioctl(self.master, termios.TIOCSWINSZ, struct.pack('HHHH', row, col, xpixel, ypixel)) def on_tunnel_shutdown(self, *args): # *args is for callback arguments (ignored) self.proto, proto = None, self.proto # prevents multiple calls to # close method self.close() logger.debug('Tunnel closed by sjRPC') def child_cb(self, watcher, revents): logger.debug('Tunnel closed by process termination') self.process.returncode = watcher.rstatus self.close() def close(self): if self.child_watcher is not None: self.child_watcher.stop() self.child_watcher = None if self.process.returncode is None: # process is still alive self.process.kill() if self.master is not None: try: os.close(self.master) except OSError: logger.error('Error when closing file descriptor for pty') self.master = None if self.slave is not None: try: os.close(self.slave) except OSError: logger.error('Error when closing slave file descriptor for pty') self.slave = None if self.proto is not None: try: self.proto.shutdown() except Exception: logger.error('Error while trying to close RPC tunnel') self.proto = None class Handler(BasePlugin): """Handler for host role.""" def __init__(self, *args, **kwargs): BasePlugin.__init__(self, *args, **kwargs) # add plugin tags self.tag_db.add_tags(tag_inspector(tags, self)) # disk related tags self.tag_db.add_tags(imap( lambda d: Tag('disk%s_size' % d, disk_tag_value(d), 60), self.tag_db['__main__']['disk']._calculate_value().split(), )) # running shells self.shells = dict() # running remote commands self.commands = set() #: jobs manager (different from MainLoop.jobs_manager) try: self.jobs_manager = JobsManager(logger, NodeJobsManagerInterface(self), JobsStore(self.main.config.jobs_store_path)) except EnvironmentError as e: logger.critical('Cannot access jobs directory: %s', e.strerror) self.main.stop() raise #: loaded plugins self.plugins = {} # plugin name -> plugin object #: cache directory for scripts try: self.scripts_dir = tempfile.mkdtemp(prefix='cc-node-script-cache') except EnvironmentError as e: logger.critical('Cannot create temporary directory for scripts: %s', e.strerror) self.main.stop() raise def start(self): BasePlugin.start(self) # load plugins if persistent file is here if os.path.isfile(self.main.config.plugins_store_path): logger.debug('Loading previous plugins') try: with open(self.main.config.plugins_store_path) as f: to_load = pickle.load(f) except EnvironmentError as exc: logger.error('Cannot load previous plugins: %s (%s)', exc.errno, exc.strerror) except ( EOFError, AttributeError, ImportError, IndexError, pickle.UnpicklingError, ) as exc: logger.error('Cannot read file, bad format, %s', exc) else: def plugins_install(): for p in to_load: try: self.plugin_install(self.main.rpc_con.rpc, None, p) except Exception: logger.exception('Error while loading plugin %s', p) else: logger.debug('Successfuly loaded plugin %s', p) th = threading.Thread(target=plugins_install) th.daemon = True th.start() else: logger.debug('No previously loaded plugins') def stop(self): # remove script cache directory shutil.rmtree(self.scripts_dir, ignore_errors=True) # kill all currently running shells and commands logger.debug('Kill currently running shells and commands') for stuff in chain(self.shells.values(), list(self.commands)): stuff.close() self.shells.clear() self.commands.clear() BasePlugin.stop(self) def update_plugin_index(self): self.tag_db['__main__']['plugins'].update_value() # write plugins to file try: with open(self.main.config.plugins_store_path, 'w') as f: pickle.dump([p for p in self.plugins], f) except EnvironmentError as exc: logger.error('Cannot save loaded plugins in \'%s\': %s (%s)', self.main.config.plugins_store_path, exc.errno, exc.strerror) else: logger.debug('Plugins state saved') @rpc_handler def execute_command(self, command, stdin=None): command = ['/bin/sh', '-c', command] rcode, output = execute(self.main, command, stdin) if rcode != 0: # 127 means command not found, 126 means not executable if rcode == 127: raise RemoteExecutionError('Command not found: %s' % output) elif rcode == 126: raise RemoteExecutionError('Command is not executable') else: raise RemoteExecutionError('Child exited with non zero status %s' % rcode) return output @rpc_handler def node_shutdown(self, reboot=True, gracefull=True): """Halt/Reboot the node. :param bool reboot: halt/reboot the system :param bool gracefull: force the operation (gracefull == not force) """ args = ['/sbin/shutdown'] if reboot: args.append('-r') if gracefull: logger.info('Going to reboot the host...') args.append('-f') else: logger.info('Going to force the reboot of the host...') args.append('-n') else: # halt args.append('-h -P') if not gracefull: logger.info('Going to halt the host...') args.append('-n') else: logger.info('Going to force the halt of the host...') args.append('0') return self.execute_command(' '.join(args)) @threadless @pass_connection @rpc_handler def shell(self, conn, shell='/bin/bash'): """Create a shell tunnel and return the label of the created tunnel. """ logger.debug('Opening shell') remote_shell = RemoteShell(conn, shell) self.shells[remote_shell.label] = remote_shell return remote_shell.label @threadless @rpc_handler('resize') def shell_resize(self, label, row, col, xpixel, ypixel): """Resize the shell's attached terminal.""" logger.debug('Shell resize') self.shells[label].resize(row, col, xpixel, ypixel) @pass_connection @rpc_handler def forward(self, conn, label, port, destination='127.0.0.1'): """TCP port forwarding.""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) except socket.error: logger.exception('Cannot create socket in forward handler') raise try: sock.connect((destination, port)) except socket.error: logger.exception('Error while connecting to destination (forward)') raise conn.create_tunnel(label=label, endpoint=sock) @pass_connection @rpc_handler def script_run(self, conn, sha1, script, owner, batch=None, *args): # retrive script if not here filename = os.path.join(self.scripts_dir, sha1) if not os.access(filename, os.X_OK): try: sha1, content = conn.call('script_get', script) except RpcError: logger.error('Error while retrieving script: %s', script) raise with open(filename, "w") as f: try: f.write(content) os.chmod(filename, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC) except OSError: logger.error( 'Could not set execution permission to script: %s', script) raise except IOError: logger.error( 'Could not write script to repository: %s', script) raise return self.jobs_manager.spawn(ScriptJob, owner, batch=batch, settings=dict( script=script, filename=filename, args=args, )).id @rpc_handler def job_cancel(self, job_id): self.jobs_manager.get(job_id).cancel() @rpc_handler def job_purge(self, job_id): self.jobs_manager.purge(job_id) @rpc_handler def job_attachment(self, job_id, name): """ :param name: attachment name """ try: return self.jobs_manager.get(job_id).read_attachment(name) except Exception: logger.exception('Error while getting job attachment') raise @pass_connection @rpc_handler def plugin_install(self, conn, sha1, name): """ :param conn: RPC connection :param sha1: sha1 string or None (get the latest) :param name: plugin name """ # check if plugin is not already loaded and upgrade it if the sha1 hash # has changed: if name in self.plugins: if self.plugins[name].sha1 != sha1: self.plugins[name].uninstall() del self.plugins[name] else: return # get plugin from server: sha1_get, content = conn.call('plugin_get', name) if sha1 is not None and sha1 != sha1_get: raise RuntimeError('Requested sha1 is not available on the server') # load the plugin: plugin_logger = logger.getChild('plugin.%s' % name) self.plugins[name] = Plugin(plugin_logger, TagDB(self.tag_db), name, sha1, content) self.update_plugin_index() @rpc_handler def plugin_uninstall(self, name): plugin = self.plugins.pop(name, None) if plugin is None: raise KeyError('Plugin %r is not running' % name) plugin.uninstall() self.update_plugin_index() @rpc_handler def plugin_run(self, name, method, owner, batch=None, **kwargs): if name not in self.plugins: raise KeyError('Plugin %r is not running' % name) try: func = self.plugins[name].methods[method] except KeyError: raise KeyError('Unknown method %r' % method) return self.jobs_manager.spawn(PluginMethodJob, owner, batch=batch, settings=dict( plugin_name=name, method_name=method, method=func, method_kwargs=kwargs, )).id