Loading ccnode/host/__init__.py +149 −0 Original line number Diff line number Diff line import os import sys import pty import fcntl import subprocess import logging import os.path import struct import termios from fcntl import ioctl from itertools import chain, imap from subprocess import Popen, PIPE, STDOUT from sjrpc.utils import pass_connection, threadless from sjrpc.core.protocols import TunnelProtocol from ccnode.tags import Tag, tag_inspector from ccnode.plugins import Base as BasePlugin from ccnode.host import tags Loading @@ -26,6 +37,120 @@ def disk_tag_value(disk_name): return size class FakePtySocket(object): """A fake socket object wrapping a :class:`subprocess.Popen` object standard input/output. """ def __init__(self, fd): self._fd = fd def recv(self, size): return os.read(self._fd, size) def send(self, data): return os.write(self._fd, data) def fileno(self): return self._fd def setblocking(self, blocking): if not blocking: # Disable blocking mode on stdin: current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL) fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK) pass else: raise NotImplementedError('Not implemented') def close(self): pass class RemoteShell(object): """Handles basic operations on remote shell.""" def __init__(self, ev_loop, conn, exec_='/bin/bash'): """ :param ev_loop: pyev loop instance :param conn: sjRPC connection :param exec_: binary path .. warning:: constructor must be called inside the thread runnig libev loop, this is necessary in order to create and start watcher and to close file descriptors """ # handles close when waiting process to end from an other thread self.watcher = ev_loop.async(self.close_cb) self.watcher.start() self.master, self.slave = pty.openpty() self.endpoint = FakePtySocket(self.master) self.proto = conn.create_tunnel(endpoint=self.endpoint) self.conn = conn try: self.process = subprocess.Popen( [exec_], stdout=self.slave, bufsize=0, stdin=self.slave, stderr=self.slave, preexec_fn=os.setsid, close_fds=True, cwd=os.environ.get('HOME', '/'), ) except OSError: logger.exception('Error while executing remote shell') # close opened fds self.close_cb() raise @property def label(self): return self.proto.label def resize(self, row, col, xpixel, ypixel): ioctl(self.master, termios.TIOCSWINSZ, struct.pack('HHHH', row, col, xpixel, ypixel)) def close(self): # async self.watcher.send() def terminate(self): self.process.terminate() def kill(self): self.process.kill() def close_cb(self, *args): if self.watcher is not None: self.watcher.stop() self.watcher = None if self.master is not None: try: os.close(self.master) except OSError: logger.error('Error when closing file descriptor for pty') self.master = None if self.slave is not None: try: os.close(self.slave) except OSError: logger.error('Error when closing slave file descriptor for pty') self.slave = None if self.proto is not None: try: self.proto.close() except Exception: logger.error('Error while trying to close RPC tunnel') self.proto = None def wait(self): return self.process.wait() def wait_n_close(self): ret_code = self.wait() self.close() return ret_code class Handler(BasePlugin): """Handler for host role.""" def __init__(self, *args, **kwargs): Loading @@ -44,7 +169,12 @@ class Handler(BasePlugin): self.rpc_handler.update(dict( execute_command=self.execute_command, node_shutdown=self.node_shutdown, rshell=self.rshell, rshell_resize=self.rshell_resize, rshell_wait=self.rshell_wait, )) # running shells self.shells = dict() def execute_command(self, command): """Execute an arbitrary shell command on the host. Loading Loading @@ -86,3 +216,22 @@ class Handler(BasePlugin): args.append('0') return self.execute_command(' '.join(args)) @threadless @pass_connection def rshell(self, conn, shell='/bin/bash'): """Create a shell tunnel and return the label of the created tunnel. """ remote_shell = RemoteShell(self.main.evloop, conn, shell) self.shells[remote_shell.label] = remote_shell return remote_shell.label @threadless def rshell_resize(self, label, row, col, xpixel, ypixel): """Resize the rshell.""" self.shells[label].resize(row, col, xpixel, ypixel) def rshell_wait(self, label): """Wait for a tunnel termination.""" return self.shells[label].wait_n_close() Loading
ccnode/host/__init__.py +149 −0 Original line number Diff line number Diff line import os import sys import pty import fcntl import subprocess import logging import os.path import struct import termios from fcntl import ioctl from itertools import chain, imap from subprocess import Popen, PIPE, STDOUT from sjrpc.utils import pass_connection, threadless from sjrpc.core.protocols import TunnelProtocol from ccnode.tags import Tag, tag_inspector from ccnode.plugins import Base as BasePlugin from ccnode.host import tags Loading @@ -26,6 +37,120 @@ def disk_tag_value(disk_name): return size class FakePtySocket(object): """A fake socket object wrapping a :class:`subprocess.Popen` object standard input/output. """ def __init__(self, fd): self._fd = fd def recv(self, size): return os.read(self._fd, size) def send(self, data): return os.write(self._fd, data) def fileno(self): return self._fd def setblocking(self, blocking): if not blocking: # Disable blocking mode on stdin: current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL) fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK) pass else: raise NotImplementedError('Not implemented') def close(self): pass class RemoteShell(object): """Handles basic operations on remote shell.""" def __init__(self, ev_loop, conn, exec_='/bin/bash'): """ :param ev_loop: pyev loop instance :param conn: sjRPC connection :param exec_: binary path .. warning:: constructor must be called inside the thread runnig libev loop, this is necessary in order to create and start watcher and to close file descriptors """ # handles close when waiting process to end from an other thread self.watcher = ev_loop.async(self.close_cb) self.watcher.start() self.master, self.slave = pty.openpty() self.endpoint = FakePtySocket(self.master) self.proto = conn.create_tunnel(endpoint=self.endpoint) self.conn = conn try: self.process = subprocess.Popen( [exec_], stdout=self.slave, bufsize=0, stdin=self.slave, stderr=self.slave, preexec_fn=os.setsid, close_fds=True, cwd=os.environ.get('HOME', '/'), ) except OSError: logger.exception('Error while executing remote shell') # close opened fds self.close_cb() raise @property def label(self): return self.proto.label def resize(self, row, col, xpixel, ypixel): ioctl(self.master, termios.TIOCSWINSZ, struct.pack('HHHH', row, col, xpixel, ypixel)) def close(self): # async self.watcher.send() def terminate(self): self.process.terminate() def kill(self): self.process.kill() def close_cb(self, *args): if self.watcher is not None: self.watcher.stop() self.watcher = None if self.master is not None: try: os.close(self.master) except OSError: logger.error('Error when closing file descriptor for pty') self.master = None if self.slave is not None: try: os.close(self.slave) except OSError: logger.error('Error when closing slave file descriptor for pty') self.slave = None if self.proto is not None: try: self.proto.close() except Exception: logger.error('Error while trying to close RPC tunnel') self.proto = None def wait(self): return self.process.wait() def wait_n_close(self): ret_code = self.wait() self.close() return ret_code class Handler(BasePlugin): """Handler for host role.""" def __init__(self, *args, **kwargs): Loading @@ -44,7 +169,12 @@ class Handler(BasePlugin): self.rpc_handler.update(dict( execute_command=self.execute_command, node_shutdown=self.node_shutdown, rshell=self.rshell, rshell_resize=self.rshell_resize, rshell_wait=self.rshell_wait, )) # running shells self.shells = dict() def execute_command(self, command): """Execute an arbitrary shell command on the host. Loading Loading @@ -86,3 +216,22 @@ class Handler(BasePlugin): args.append('0') return self.execute_command(' '.join(args)) @threadless @pass_connection def rshell(self, conn, shell='/bin/bash'): """Create a shell tunnel and return the label of the created tunnel. """ remote_shell = RemoteShell(self.main.evloop, conn, shell) self.shells[remote_shell.label] = remote_shell return remote_shell.label @threadless def rshell_resize(self, label, row, col, xpixel, ypixel): """Resize the rshell.""" self.shells[label].resize(row, col, xpixel, ypixel) def rshell_wait(self, label): """Wait for a tunnel termination.""" return self.shells[label].wait_n_close()