Skip to content
__init__.py 15.6 KiB
Newer Older
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl.  If not, see <http://www.gnu.org/licenses/>.


import os
import pty
import errno
import fcntl
import subprocess
Anael Beutot's avatar
Anael Beutot committed
import os.path
Anael Beutot's avatar
Anael Beutot committed
import stat
Anael Beutot's avatar
Anael Beutot committed
import shutil
import struct
import socket
import termios
Anael Beutot's avatar
Anael Beutot committed
import tempfile
Anael Beutot's avatar
Anael Beutot committed
import threading
from fcntl import ioctl
Anael Beutot's avatar
Anael Beutot committed
import cPickle as pickle
from itertools import imap, chain
from subprocess import Popen, PIPE, STDOUT

from sjrpc.utils import pass_connection, threadless
Anael Beutot's avatar
Anael Beutot committed
from sjrpc.core.exceptions import RpcError
from cloudcontrol.common.client.tags import Tag, tag_inspector, TagDB
from cloudcontrol.common.client.plugins import Base as BasePlugin, rpc_handler
from cloudcontrol.common.jobs import JobsManager, JobsStore
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.common.helpers.logger import patch_logging; patch_logging()
from cloudcontrol.node.utils import execute
from cloudcontrol.node.exc import RemoteExecutionError
from cloudcontrol.node.host import tags
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.node.host.jobs import NodeJobsManagerInterface, ScriptJob
from cloudcontrol.node.host.plugins import PluginMethodJob, Plugin
Anael Beutot's avatar
Anael Beutot committed
def disk_tag_value(disk_name):
    def size():
        s = open(os.path.join('/sys/block', disk_name, 'size')).read().strip()
        try:
            s = int(s)
            if s > 0:
                return s * 512
            else:
                return None
        except ValueError:
            return None

    return size


class FakePtySocket(object):
    """A fake socket object wrapping a :class:`subprocess.Popen` object
    standard input/output.

    """
    def __init__(self, fd):
        self._fd = fd

    def recv(self, size):
        return os.read(self._fd, size)

    def send(self, data):
        return os.write(self._fd, data)

    def fileno(self):
        return self._fd

    def setblocking(self, blocking):
        if not blocking:
            # Disable blocking mode on stdin:
            current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
            fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
            pass
        else:
            raise NotImplementedError('Not implemented')

    def close(self):
        pass


class RemoteShell(object):
    """Handles basic operations on remote shell."""
    def __init__(self, conn, exec_='/bin/bash'):
        """
        :param conn: sjRPC connection
        :param exec_: path of binary to execute
        """
        # handles close when waiting process to end from an other thread
        self.master, self.slave = pty.openpty()
        self.endpoint = FakePtySocket(self.master)
        self.proto = conn.create_tunnel(endpoint=self.endpoint,
Anael Beutot's avatar
Anael Beutot committed
                                        close_endpoint_on_shutdown=False,
                                        on_shutdown=self.on_tunnel_shutdown)
        self.conn = conn
        self.child_watcher = None
        try:
            self.process = subprocess.Popen(
                [exec_], stdout=self.slave,
                bufsize=0, stdin=self.slave,
                stderr=self.slave,
                preexec_fn=os.setsid,
                close_fds=True,
                cwd=os.environ.get('HOME', '/'),
            )
        except OSError:
            logger.exception('Error while executing remote shell')
            # close opened fds
            self.close()
        self.child_watcher = conn.loop.child(self.process.pid, False,
                                             self.child_cb)
        self.child_watcher.start()

    @property
    def label(self):
        return self.proto.label

    def resize(self, row, col, xpixel, ypixel):
        if self.master is not None:
            ioctl(self.master, termios.TIOCSWINSZ,
                  struct.pack('HHHH', row, col, xpixel, ypixel))
    def on_tunnel_shutdown(self, *args):
        # *args is for callback arguments (ignored)
        self.proto, proto = None, self.proto  # prevents multiple calls to
                                              # close method
        self.close()
Anael Beutot's avatar
Anael Beutot committed
        logger.debug('Tunnel closed by sjRPC')
    def child_cb(self, watcher, revents):
        logger.debug('Tunnel closed by process termination')
        self.process.returncode = watcher.rstatus
        self.close()

        if self.child_watcher is not None:
            self.child_watcher.stop()
            self.child_watcher = None
        if self.process.returncode is None:
            # process is still alive
            self.process.kill()
        if self.master is not None:
            try:
                os.close(self.master)
            except OSError:
                logger.error('Error when closing file descriptor for pty')
            self.master = None
        if self.slave is not None:
            try:
                os.close(self.slave)
            except OSError:
                logger.error('Error when closing slave file descriptor for pty')
            self.slave = None
        if self.proto is not None:
            try:
            except Exception:
                logger.error('Error while trying to close RPC tunnel')
            self.proto = None


class Handler(BasePlugin):
    """Handler for host role."""
    def __init__(self, *args, **kwargs):
        BasePlugin.__init__(self, *args, **kwargs)
        # add plugin tags
        self.tag_db.add_tags(tag_inspector(tags, self))
Anael Beutot's avatar
Anael Beutot committed
        # disk related tags
        self.tag_db.add_tags(imap(
            lambda d: Tag('disk%s_size' % d, disk_tag_value(d), 60),
            self.tag_db['__main__']['disk']._calculate_value().split(),
        # running shells
        self.shells = dict()
        # running remote commands
        self.commands = set()
Anael Beutot's avatar
Anael Beutot committed

        #: jobs manager (different from MainLoop.jobs_manager)
        try:
            self.jobs_manager = JobsManager(logger, NodeJobsManagerInterface(self),
                                           JobsStore(self.main.config.jobs_store_path))
        except EnvironmentError as e:
            logger.critical('Cannot access jobs directory: %s', e.strerror)
            self.main.stop()
            raise
        #: loaded plugins
        self.plugins = {}  # plugin name -> plugin object

Anael Beutot's avatar
Anael Beutot committed
        #: cache directory for scripts
        try:
            self.scripts_dir = tempfile.mkdtemp(prefix='cc-node-script-cache')
        except EnvironmentError as e:
            logger.critical('Cannot create temporary directory for scripts: %s',
                         e.strerror)
            self.main.stop()
            raise

Anael Beutot's avatar
Anael Beutot committed
    def start(self):
        BasePlugin.start(self)
        # load plugins if persistent file is here
        if os.path.isfile(self.main.config.plugins_store_path):
            logger.debug('Loading previous plugins')
            try:
                with open(self.main.config.plugins_store_path) as f:
                    to_load = pickle.load(f)
            except EnvironmentError as exc:
                logger.error('Cannot load previous plugins: %s (%s)',
                             exc.errno, exc.strerror)
            except (
                EOFError,
                AttributeError,
                ImportError,
                IndexError,
                pickle.UnpicklingError,
            ) as exc:
Anael Beutot's avatar
Anael Beutot committed
                logger.error('Cannot read file, bad format, %s', exc)
            else:
                def plugins_install():
                    for p in to_load:
                        try:
                            self.plugin_install(self.main.rpc_con.rpc, None, p)
Anael Beutot's avatar
Anael Beutot committed
                        except Exception:
                            logger.exception('Error while loading plugin %s', p)
Anael Beutot's avatar
Anael Beutot committed
                        else:
                            logger.debug('Successfuly loaded plugin %s', p)

                th = threading.Thread(target=plugins_install)
                th.daemon = True
                th.start()
        else:
            logger.debug('No previously loaded plugins')

Anael Beutot's avatar
Anael Beutot committed
        # remove script cache directory
        shutil.rmtree(self.scripts_dir, ignore_errors=True)
        # kill all currently running shells and commands
        logger.debug('Kill currently running shells and commands')
        for stuff in chain(self.shells.values(),
                           list(self.commands)):
            stuff.close()
        self.shells.clear()
        self.commands.clear()
        BasePlugin.stop(self)

Anael Beutot's avatar
Anael Beutot committed
    def update_plugin_index(self):
        self.tag_db['__main__']['plugins'].update_value()
        # write plugins to file
        try:
            with open(self.main.config.plugins_store_path, 'w') as f:
                pickle.dump([p for p in self.plugins], f)
        except EnvironmentError as exc:
            logger.error('Cannot save loaded plugins in \'%s\': %s (%s)',
                         self.main.config.plugins_store_path,
Anael Beutot's avatar
Anael Beutot committed
                         exc.errno, exc.strerror)
        else:
            logger.debug('Plugins state saved')

    @rpc_handler
    def execute_command(self, command, stdin=None):
        command = ['/bin/sh', '-c', command]
        rcode, output = execute(self.main, command, stdin)
        if rcode != 0:
            # 127 means command not found, 126 means not executable
            if rcode == 127:
                raise RemoteExecutionError('Command not found: %s' % output)
            elif rcode == 126:
                raise RemoteExecutionError('Command is not executable')
            else:
                raise RemoteExecutionError('Child exited with non zero status %s' % rcode)
        return output
    @rpc_handler
    def node_shutdown(self, reboot=True, gracefull=True):
        """Halt/Reboot the node.

        :param bool reboot: halt/reboot the system
        :param bool gracefull: force the operation (gracefull == not force)
        """
        args = ['/sbin/shutdown']
        if reboot:
            args.append('-r')
            if gracefull:
Anael Beutot's avatar
Anael Beutot committed
                logger.info('Going to reboot the host...')
Anael Beutot's avatar
Anael Beutot committed
                logger.info('Going to force the reboot of the host...')
                args.append('-n')
        else:
            # halt
            args.append('-h -P')
            if not gracefull:
Anael Beutot's avatar
Anael Beutot committed
                logger.info('Going to halt the host...')
Anael Beutot's avatar
Anael Beutot committed
                logger.info('Going to force the halt of the host...')

        args.append('0')

        return self.execute_command(' '.join(args))

    @threadless
    @pass_connection
    @rpc_handler
    def shell(self, conn, shell='/bin/bash'):
        """Create a shell tunnel and return the label of the created tunnel.

        """
Anael Beutot's avatar
Anael Beutot committed
        logger.debug('Opening shell')
        remote_shell = RemoteShell(conn, shell)
        self.shells[remote_shell.label] = remote_shell
        return remote_shell.label

    @threadless
    @rpc_handler('resize')
    def shell_resize(self, label, row, col, xpixel, ypixel):
Anael Beutot's avatar
Anael Beutot committed
        """Resize the shell's attached terminal."""
Anael Beutot's avatar
Anael Beutot committed
        logger.debug('Shell resize')
        self.shells[label].resize(row, col, xpixel, ypixel)

    @pass_connection
    @rpc_handler
    def forward(self, conn, label, port, destination='127.0.0.1'):
Anael Beutot's avatar
Anael Beutot committed
        """TCP port forwarding."""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except socket.error:
            logger.exception('Cannot create socket in forward handler')
            raise
        try:
            sock.connect((destination, port))
        except socket.error:
            logger.exception('Error while connecting to destination (forward)')
            raise
        conn.create_tunnel(label=label, endpoint=sock)
Anael Beutot's avatar
Anael Beutot committed
    @pass_connection
    @rpc_handler
    def script_run(self, conn, sha1, script, owner, batch=None, *args):
Anael Beutot's avatar
Anael Beutot committed
        # retrive script if not here
Anael Beutot's avatar
Anael Beutot committed
        filename = os.path.join(self.scripts_dir, sha1)
Anael Beutot's avatar
Anael Beutot committed
        if not os.access(filename, os.X_OK):
            try:
                sha1, content = conn.call('script_get', script)
            except RpcError:
                logger.error('Error while retrieving script: %s', script)
                raise
            with open(filename, "w") as f:
                try:
                    f.write(content)
                    os.chmod(filename, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
                except OSError:
                    logger.error(
                        'Could not set execution permission to script: %s',
                        script)
                    raise
                except IOError:
                    logger.error(
                        'Could not write script to repository: %s', script)
                    raise

        return self.jobs_manager.spawn(ScriptJob, owner, batch=batch,
            settings=dict(
                script=script,
                filename=filename,
                args=args,
Anael Beutot's avatar
Anael Beutot committed
        )).id

    @rpc_handler
    def job_cancel(self, job_id):
        self.jobs_manager.get(job_id).cancel()

    @rpc_handler
    def job_purge(self, job_id):
        self.jobs_manager.purge(job_id)

    @rpc_handler
    def job_attachment(self, job_id, name):
        """
Anael Beutot's avatar
Anael Beutot committed
        :param name: attachment name
        try:
            return self.jobs_manager.get(job_id).read_attachment(name)
        except Exception:
            logger.exception('Error while getting job attachment')
            raise
    @rpc_handler
    def plugin_install(self, conn, sha1, name):
        """
        :param conn: RPC connection
        :param sha1: sha1 string or None (get the latest)
        :param name: plugin name
        """
        # check if plugin is not already loaded and upgrade it if the sha1 hash
        # has changed:
        if name in self.plugins:
            if self.plugins[name].sha1 != sha1:
                self.plugins[name].uninstall()
                del self.plugins[name]
            else:
                return

        # get plugin from server:
        sha1_get, content = conn.call('plugin_get', name)
        if sha1 is not None and sha1 != sha1_get:
            raise RuntimeError('Requested sha1 is not available on the server')

        # load the plugin:
        plugin_logger = logger.getChild('plugin.%s' % name)
        self.plugins[name] = Plugin(plugin_logger, TagDB(self.tag_db),
                                    name, sha1, content)
Anael Beutot's avatar
Anael Beutot committed
        self.update_plugin_index()
    @rpc_handler
    def plugin_uninstall(self, name):
        plugin = self.plugins.pop(name, None)
        if plugin is None:
            raise KeyError('Plugin %r is not running' % name)
        plugin.uninstall()
Anael Beutot's avatar
Anael Beutot committed
        self.update_plugin_index()
    @rpc_handler
    def plugin_run(self, name, method, owner, batch=None, **kwargs):
        if name not in self.plugins:
            raise KeyError('Plugin %r is not running' % name)
        try:
            func = self.plugins[name].methods[method]
        except KeyError:
            raise KeyError('Unknown method %r' % method)
        return self.jobs_manager.spawn(PluginMethodJob, owner, batch=batch,
            settings=dict(
                plugin_name=name,
                method_name=method,
                method=func,
                method_kwargs=kwargs,