Newer
Older
import os
import sys
import pty
import fcntl
import subprocess
import termios
from fcntl import ioctl
from itertools import chain, imap
from subprocess import Popen, PIPE, STDOUT
from sjrpc.utils import pass_connection, threadless
from sjrpc.core.protocols import TunnelProtocol
from cloudcontrol.node.tags import Tag, tag_inspector
from cloudcontrol.node.plugins import Base as BasePlugin
from cloudcontrol.node.host import tags
logger = logging.getLogger(__name__)
def disk_tag_value(disk_name):
def size():
s = open(os.path.join('/sys/block', disk_name, 'size')).read().strip()
try:
s = int(s)
if s > 0:
return s * 512
else:
return None
except ValueError:
return None
return size
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class FakePtySocket(object):
"""A fake socket object wrapping a :class:`subprocess.Popen` object
standard input/output.
"""
def __init__(self, fd):
self._fd = fd
def recv(self, size):
return os.read(self._fd, size)
def send(self, data):
return os.write(self._fd, data)
def fileno(self):
return self._fd
def setblocking(self, blocking):
if not blocking:
# Disable blocking mode on stdin:
current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
pass
else:
raise NotImplementedError('Not implemented')
def close(self):
pass
class RemoteShell(object):
"""Handles basic operations on remote shell."""
def __init__(self, ev_loop, conn, exec_='/bin/bash'):
"""
:param ev_loop: pyev loop instance
:param conn: sjRPC connection
:param exec_: binary path
.. warning:: constructor must be called inside the thread runnig libev
loop, this is necessary in order to create and start watcher and to
close file descriptors
"""
# handles close when waiting process to end from an other thread
self.watcher = ev_loop.async(self.close_cb)
self.watcher.start()
self.master, self.slave = pty.openpty()
self.endpoint = FakePtySocket(self.master)
self.proto = conn.create_tunnel(endpoint=self.endpoint)
self.conn = conn
try:
self.process = subprocess.Popen(
[exec_], stdout=self.slave,
bufsize=0, stdin=self.slave,
stderr=self.slave,
preexec_fn=os.setsid,
close_fds=True,
cwd=os.environ.get('HOME', '/'),
)
except OSError:
logger.exception('Error while executing remote shell')
# close opened fds
self.close_cb()
raise
@property
def label(self):
return self.proto.label
def resize(self, row, col, xpixel, ypixel):
ioctl(self.master, termios.TIOCSWINSZ,
struct.pack('HHHH', row, col, xpixel, ypixel))
def close(self):
# async
self.watcher.send()
def terminate(self):
self.process.terminate()
def kill(self):
self.process.kill()
def close_cb(self, *args):
if self.watcher is not None:
self.watcher.stop()
self.watcher = None
if self.master is not None:
try:
os.close(self.master)
except OSError:
logger.error('Error when closing file descriptor for pty')
self.master = None
if self.slave is not None:
try:
os.close(self.slave)
except OSError:
logger.error('Error when closing slave file descriptor for pty')
self.slave = None
if self.proto is not None:
try:
self.proto.close()
except Exception:
logger.error('Error while trying to close RPC tunnel')
self.proto = None
def wait(self):
return self.process.wait()
def wait_n_close(self):
ret_code = self.wait()
self.close()
return ret_code
"""Handler for host role."""
def __init__(self, *args, **kwargs):
BasePlugin.__init__(self, *args, **kwargs)
self.tag_db.add_tags(tag_inspector(tags))
lambda d: Tag('disk%s_size' % d, disk_tag_value(d), 60),
self.tag_db['__main__']['disk']._calculate_value().split(),
# rpc handler
self.rpc_handler.update(dict(
execute_command=self.execute_command,
node_shutdown=self.node_shutdown,
rshell=self.rshell,
rshell_resize=self.rshell_resize,
rshell_wait=self.rshell_wait,
# running shells
self.shells = dict()
def stop(self):
# kill all currently running shells
for shell in self.shells.values():
shell.kill()
shell.close_cb()
self.shells.clear()
BasePlugin.stop(self)
def execute_command(self, command):
"""Execute an arbitrary shell command on the host.
:param string command: shell command to run
"""
# return stdout and stderr mixed in
try:
return Popen(command, shell=True, bufsize=-1, stdin=PIPE, stdout=PIPE,
stderr=STDOUT).communicate()[0] or None
except Exception:
logger.exception('Error while executing a remote command')
raise
def node_shutdown(self, reboot=True, gracefull=True):
"""Halt/Reboot the node.
:param bool reboot: halt/reboot the system
:param bool gracefull: force the operation (gracefull == not force)
"""
args = ['/sbin/shutdown']
if reboot:
args.append('-r')
if gracefull:
args.append('-f')
else:
args.append('-n')
else:
# halt
args.append('-h -P')
if not gracefull:
args.append('-n')
else:
args.append('0')
return self.execute_command(' '.join(args))
@threadless
@pass_connection
def rshell(self, conn, shell='/bin/bash'):
"""Create a shell tunnel and return the label of the created tunnel.
"""
remote_shell = RemoteShell(self.main.evloop, conn, shell)
self.shells[remote_shell.label] = remote_shell
return remote_shell.label
@threadless
def rshell_resize(self, label, row, col, xpixel, ypixel):
"""Resize the rshell."""
self.shells[label].resize(row, col, xpixel, ypixel)
def rshell_wait(self, label):
"""Wait for a tunnel termination."""
return self.shells[label].wait_n_close()
@pass_connection
def forward(self, conn, label, port, destination='127.0.0.1'):
# create socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
logger.exception('Cannot create socket in forward handler')
raise
try:
sock.connect((destination, port))
except socket.error:
logger.exception('Error while connecting to destination (forward)')
raise
# create tunnel
conn.create_tunnel(label=label, endpoint=sock)