Skip to content
__init__.py 12.5 KiB
Newer Older
import os
import sys
import pty
import errno
import fcntl
import subprocess
Anael Beutot's avatar
Anael Beutot committed
import os.path
Anael Beutot's avatar
Anael Beutot committed
import stat
Anael Beutot's avatar
Anael Beutot committed
import shutil
import struct
import socket
import termios
Anael Beutot's avatar
Anael Beutot committed
import tempfile
from fcntl import ioctl
from itertools import chain, imap
from subprocess import Popen, PIPE, STDOUT

from sjrpc.utils import pass_connection, threadless
from sjrpc.core.protocols import TunnelProtocol
Anael Beutot's avatar
Anael Beutot committed
from sjrpc.core.exceptions import RpcError
from cloudcontrol.common.client.tags import Tag, tag_inspector, TagDB
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.common.client.plugins import Base as BasePlugin
from cloudcontrol.common.jobs import JobsManager, JobsStore
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.common.helpers.logger import patch_logging; patch_logging()
from cloudcontrol.node.host import tags
Anael Beutot's avatar
Anael Beutot committed
from cloudcontrol.node.host.jobs import NodeJobsManagerInterface, ScriptJob
from cloudcontrol.node.host.plugins import PluginMethodJob, Plugin
Anael Beutot's avatar
Anael Beutot committed
def disk_tag_value(disk_name):
    def size():
        s = open(os.path.join('/sys/block', disk_name, 'size')).read().strip()
        try:
            s = int(s)
            if s > 0:
                return s * 512
            else:
                return None
        except ValueError:
            return None

    return size


class FakePtySocket(object):
    """A fake socket object wrapping a :class:`subprocess.Popen` object
    standard input/output.

    """
    def __init__(self, fd):
        self._fd = fd

    def recv(self, size):
        return os.read(self._fd, size)

    def send(self, data):
        return os.write(self._fd, data)

    def fileno(self):
        return self._fd

    def setblocking(self, blocking):
        if not blocking:
            # Disable blocking mode on stdin:
            current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
            fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
            pass
        else:
            raise NotImplementedError('Not implemented')

    def close(self):
        pass


class RemoteShell(object):
    """Handles basic operations on remote shell."""
    def __init__(self, ev_loop, conn, exec_='/bin/bash'):
        """
        :param ev_loop: pyev loop instance
        :param conn: sjRPC connection
        :param exec_: binary path

        .. warning:: constructor must be called inside the thread runnig libev
            loop, this is necessary in order to create and start watcher and to
            close file descriptors
        """
        # handles close when waiting process to end from an other thread
        self.watcher = ev_loop.async(self.close_cb)
        self.watcher.start()
        self.master, self.slave = pty.openpty()
        self.endpoint = FakePtySocket(self.master)
        self.proto = conn.create_tunnel(endpoint=self.endpoint)
        self.conn = conn
        try:
            self.process = subprocess.Popen(
                [exec_], stdout=self.slave,
                bufsize=0, stdin=self.slave,
                stderr=self.slave,
                preexec_fn=os.setsid,
                close_fds=True,
                cwd=os.environ.get('HOME', '/'),
            )
        except OSError:
            logger.exception('Error while executing remote shell')
            # close opened fds
            self.close_cb()
            raise

    @property
    def label(self):
        return self.proto.label

    def resize(self, row, col, xpixel, ypixel):
        ioctl(self.master, termios.TIOCSWINSZ,
              struct.pack('HHHH', row, col, xpixel, ypixel))

    def close(self):
        # async
        self.watcher.send()

    def terminate(self):
        self.process.terminate()

    def kill(self):
        self.process.kill()

    def close_cb(self, *args):
        if self.watcher is not None:
            self.watcher.stop()
            self.watcher = None
        if self.master is not None:
            try:
                os.close(self.master)
            except OSError:
                logger.error('Error when closing file descriptor for pty')
            self.master = None
        if self.slave is not None:
            try:
                os.close(self.slave)
            except OSError:
                logger.error('Error when closing slave file descriptor for pty')
            self.slave = None
        if self.proto is not None:
            try:
                self.proto.close()
            except Exception:
                logger.error('Error while trying to close RPC tunnel')
            self.proto = None

    def wait(self):
        return self.process.wait()

    def wait_n_close(self):
        ret_code = self.wait()
        self.close()
        return ret_code


class Handler(BasePlugin):
    """Handler for host role."""
    def __init__(self, *args, **kwargs):
        BasePlugin.__init__(self, *args, **kwargs)
        # add plugin tags
        self.tag_db.add_tags(tag_inspector(tags, self))
Anael Beutot's avatar
Anael Beutot committed
        # disk related tags
        self.tag_db.add_tags(imap(
            lambda d: Tag('disk%s_size' % d, disk_tag_value(d), 60),
            self.tag_db['__main__']['disk']._calculate_value().split(),

        # rpc handler
        self.rpc_handler.update(dict(
            execute_command=self.execute_command,
            node_shutdown=self.node_shutdown,
            rshell=self.rshell,
            rshell_resize=self.rshell_resize,
            rshell_wait=self.rshell_wait,
            forward=self.forward,
            job_cancel=self.job_cancel,
            job_purge=self.job_purge,
            job_attachment=self.job_attachment,
Anael Beutot's avatar
Anael Beutot committed
            script_run=self.script_run,
            plugin_install=self.plugin_install,
            plugin_uninstall=self.plugin_uninstall,
            plugin_run=self.plugin_run
        # running shells
        self.shells = dict()
Anael Beutot's avatar
Anael Beutot committed

        #: jobs manager (different from MainLoop.jobs_manager)
        try:
            self.jobs_manager = JobsManager(logger, NodeJobsManagerInterface(self),
                                           JobsStore(self.main.config.jobs_store_path))
        except EnvironmentError as e:
            logger.critical('Cannot access jobs directory: %s', e.strerror)
            self.main.stop()
            raise
        #: loaded plugins
        self.plugins = {}  # plugin name -> plugin object

Anael Beutot's avatar
Anael Beutot committed
        #: cache directory for scripts
        try:
            self.scripts_dir = tempfile.mkdtemp(prefix='cc-node-script-cache')
        except EnvironmentError as e:
            logger.critical('Cannot create temporary directory for scripts: %s',
                         e.strerror)
            self.main.stop()
            raise

Anael Beutot's avatar
Anael Beutot committed
        # remove script cache directory
        shutil.rmtree(self.scripts_dir, ignore_errors=True)
        # kill all currently running shells
        for shell in self.shells.values():
            try:
                shell.kill()
            except OSError as exc:
                if exc.errno != errno.ESRCH:
                    raise
                # process does not exists
            shell.close_cb()
        self.shells.clear()
        BasePlugin.stop(self)

    def execute_command(self, command):
        """Execute an arbitrary shell command on the host.
        :param string command: shell command to run
        """
        # return stdout and stderr mixed in
        try:
            return Popen(command, shell=True, bufsize=-1, stdin=PIPE, stdout=PIPE,
                         stderr=STDOUT).communicate()[0] or None
        except Exception:
            logger.exception('Error while executing a remote command')
            raise

    def node_shutdown(self, reboot=True, gracefull=True):
        """Halt/Reboot the node.

        :param bool reboot: halt/reboot the system
        :param bool gracefull: force the operation (gracefull == not force)
        """
        args = ['/sbin/shutdown']
        if reboot:
            args.append('-r')
            if gracefull:
Anael Beutot's avatar
Anael Beutot committed
                logger.info('Going to reboot the host...')
Anael Beutot's avatar
Anael Beutot committed
                logger.info('Going to force the reboot of the host...')
                args.append('-n')
        else:
            # halt
            args.append('-h -P')
            if not gracefull:
Anael Beutot's avatar
Anael Beutot committed
                logger.info('Going to halt the host...')
Anael Beutot's avatar
Anael Beutot committed
                logger.info('Going to force the halt of the host...')

        args.append('0')

        return self.execute_command(' '.join(args))

    @threadless
    @pass_connection
    def rshell(self, conn, shell='/bin/bash'):
        """Create a shell tunnel and return the label of the created tunnel.

        """
        remote_shell = RemoteShell(self.main.evloop, conn, shell)
        self.shells[remote_shell.label] = remote_shell
        return remote_shell.label

    @threadless
    def rshell_resize(self, label, row, col, xpixel, ypixel):
        """Resize the rshell."""
        self.shells[label].resize(row, col, xpixel, ypixel)

    def rshell_wait(self, label):
        """Wait for a tunnel termination."""
        return self.shells[label].wait_n_close()

    @pass_connection
    def forward(self, conn, label, port, destination='127.0.0.1'):
        # create socket
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except socket.error:
            logger.exception('Cannot create socket in forward handler')
            raise
        try:
            sock.connect((destination, port))
        except socket.error:
            logger.exception('Error while connecting to destination (forward)')
            raise
        # create tunnel
        conn.create_tunnel(label=label, endpoint=sock)
Anael Beutot's avatar
Anael Beutot committed
    @pass_connection
    def script_run(self, conn, sha1, script, owner, *args):
        # retrive script if not here
Anael Beutot's avatar
Anael Beutot committed
        filename = os.path.join(self.scripts_dir, sha1)
Anael Beutot's avatar
Anael Beutot committed
        if not os.access(filename, os.X_OK):
            try:
                sha1, content = conn.call('script_get', script)
            except RpcError:
                logger.error('Error while retrieving script: %s', script)
                raise
            with open(filename, "w") as f:
                try:
                    f.write(content)
                    os.chmod(filename, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
                except OSError:
                    logger.error(
                        'Could not set execution permission to script: %s',
                        script)
                    raise
                except IOError:
                    logger.error(
                        'Could not write script to repository: %s', script)
                    raise

        return self.jobs_manager.spawn(ScriptJob, owner, settings=dict(
            script=script,
            filename=filename,
            args=args,
        )).id

    def job_cancel(self, job_id):
        self.jobs_manager.get(job_id).cancel()

    def job_purge(self, job_id):
        self.jobs_manager.purge(job_id)

    def job_attachment(self, job_id, name):
        """
        :param name: attachement name
        """
        return self.jobs_manager.get(job_id).read_attachment(name)

    @pass_connection
    def plugin_install(self, conn, sha1, name):
        # check if plugin is not already loaded and upgrade it if the sha1 hash
        # has changed:
        if name in self.plugins:
            if self.plugins[name].sha1 != sha1:
                self.plugins[name].uninstall()
                del self.plugins[name]
            else:
                return

        # get plugin from server:
        sha1_get, content = conn.call('plugin_get', name)
        if sha1 != sha1_get:
            raise RuntimeError('Requested sha1 is not available on the server')

        # load the plugin:
        plugin_logger = logger.getChild('plugin.%s' % name)
        self.plugins[name] = Plugin(plugin_logger, TagDB(self.tag_db),
                                    name, sha1, content)
Anael Beutot's avatar
Anael Beutot committed
        self.tag_db['__main__']['plugins'].update_value()

    def plugin_uninstall(self, name):
        plugin = self.plugins.pop(name, None)
        if plugin is None:
            raise KeyError('Plugin %r is not running' % name)
        plugin.uninstall()
Anael Beutot's avatar
Anael Beutot committed
        self.tag_db['__main__']['plugins'].update_value()

    def plugin_run(self, name, method, owner, **kwargs):
        if name not in self.plugins:
            raise KeyError('Plugin %r is not running' % name)
        try:
            func = self.plugins[name].methods[method]
        except KeyError:
            raise KeyError('Unknown method %r' % method)
        return self.jobs_manager.spawn(PluginMethodJob, owner, settings=dict(
            plugin_name=name,
            method_name=method,
            method=func,
            method_kwargs=kwargs,
        )).id