import os import sys import pty import fcntl import subprocess import logging import os.path import struct import termios from fcntl import ioctl from itertools import chain, imap from subprocess import Popen, PIPE, STDOUT from sjrpc.utils import pass_connection, threadless from sjrpc.core.protocols import TunnelProtocol from cloudcontrol.node.tags import Tag, tag_inspector from cloudcontrol.node.plugins import Base as BasePlugin from cloudcontrol.node.host import tags logger = logging.getLogger(__name__) def disk_tag_value(disk_name): def size(): s = open(os.path.join('/sys/block', disk_name, 'size')).read().strip() try: s = int(s) if s > 0: return s * 512 else: return None except ValueError: return None return size class FakePtySocket(object): """A fake socket object wrapping a :class:`subprocess.Popen` object standard input/output. """ def __init__(self, fd): self._fd = fd def recv(self, size): return os.read(self._fd, size) def send(self, data): return os.write(self._fd, data) def fileno(self): return self._fd def setblocking(self, blocking): if not blocking: # Disable blocking mode on stdin: current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL) fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK) pass else: raise NotImplementedError('Not implemented') def close(self): pass class RemoteShell(object): """Handles basic operations on remote shell.""" def __init__(self, ev_loop, conn, exec_='/bin/bash'): """ :param ev_loop: pyev loop instance :param conn: sjRPC connection :param exec_: binary path .. warning:: constructor must be called inside the thread runnig libev loop, this is necessary in order to create and start watcher and to close file descriptors """ # handles close when waiting process to end from an other thread self.watcher = ev_loop.async(self.close_cb) self.watcher.start() self.master, self.slave = pty.openpty() self.endpoint = FakePtySocket(self.master) self.proto = conn.create_tunnel(endpoint=self.endpoint) self.conn = conn try: self.process = subprocess.Popen( [exec_], stdout=self.slave, bufsize=0, stdin=self.slave, stderr=self.slave, preexec_fn=os.setsid, close_fds=True, cwd=os.environ.get('HOME', '/'), ) except OSError: logger.exception('Error while executing remote shell') # close opened fds self.close_cb() raise @property def label(self): return self.proto.label def resize(self, row, col, xpixel, ypixel): ioctl(self.master, termios.TIOCSWINSZ, struct.pack('HHHH', row, col, xpixel, ypixel)) def close(self): # async self.watcher.send() def terminate(self): self.process.terminate() def kill(self): self.process.kill() def close_cb(self, *args): if self.watcher is not None: self.watcher.stop() self.watcher = None if self.master is not None: try: os.close(self.master) except OSError: logger.error('Error when closing file descriptor for pty') self.master = None if self.slave is not None: try: os.close(self.slave) except OSError: logger.error('Error when closing slave file descriptor for pty') self.slave = None if self.proto is not None: try: self.proto.close() except Exception: logger.error('Error while trying to close RPC tunnel') self.proto = None def wait(self): return self.process.wait() def wait_n_close(self): ret_code = self.wait() self.close() return ret_code class Handler(BasePlugin): """Handler for host role.""" def __init__(self, *args, **kwargs): BasePlugin.__init__(self, *args, **kwargs) # add plugin tags self.tag_db.add_tags(tag_inspector(tags)) # disk related tags self.tag_db.add_tags(imap( lambda d: Tag('disk%s_size' % d, disk_tag_value(d), 60), self.tag_db['__main__']['disk']._calculate_value().split(), )) # rpc handler self.rpc_handler.update(dict( execute_command=self.execute_command, node_shutdown=self.node_shutdown, rshell=self.rshell, rshell_resize=self.rshell_resize, rshell_wait=self.rshell_wait, )) # running shells self.shells = dict() def execute_command(self, command): """Execute an arbitrary shell command on the host. :param string command: shell command to run """ # return stdout and stderr mixed in try: return Popen(command, shell=True, bufsize=-1, stdin=PIPE, stdout=PIPE, stderr=STDOUT).communicate()[0] or None except Exception: logger.exception('Error while executing a remote command') raise def node_shutdown(self, reboot=True, gracefull=True): """Halt/Reboot the node. :param bool reboot: halt/reboot the system :param bool gracefull: force the operation (gracefull == not force) """ args = ['/sbin/shutdown'] if reboot: args.append('-r') if gracefull: logger.info('Going to reboot the host...') args.append('-f') else: logger.info('Going to force the reboot of the host...') args.append('-n') else: # halt args.append('-h -P') if not gracefull: logger.info('Going to halt the host...') args.append('-n') else: logger.info('Going to force the halt of the host...') args.append('0') return self.execute_command(' '.join(args)) @threadless @pass_connection def rshell(self, conn, shell='/bin/bash'): """Create a shell tunnel and return the label of the created tunnel. """ remote_shell = RemoteShell(self.main.evloop, conn, shell) self.shells[remote_shell.label] = remote_shell return remote_shell.label @threadless def rshell_resize(self, label, row, col, xpixel, ypixel): """Resize the rshell.""" self.shells[label].resize(row, col, xpixel, ypixel) def rshell_wait(self, label): """Wait for a tunnel termination.""" return self.shells[label].wait_n_close()