Newer
Older
from subprocess import Popen, PIPE, STDOUT
from sjrpc.utils import pass_connection, threadless
from cloudcontrol.common.client.tags import Tag, tag_inspector, TagDB
from cloudcontrol.common.client.plugins import Base as BasePlugin
from cloudcontrol.common.jobs import JobsManager, JobsStore
from cloudcontrol.common.helpers.logger import patch_logging; patch_logging()
from cloudcontrol.node.host.jobs import NodeJobsManagerInterface, ScriptJob
from cloudcontrol.node.host.plugins import PluginMethodJob, Plugin
logger = logging.getLogger(__name__)
def disk_tag_value(disk_name):
def size():
s = open(os.path.join('/sys/block', disk_name, 'size')).read().strip()
try:
s = int(s)
if s > 0:
return s * 512
else:
return None
except ValueError:
return None
return size
class FakePtySocket(object):
"""A fake socket object wrapping a :class:`subprocess.Popen` object
standard input/output.
"""
def __init__(self, fd):
self._fd = fd
def recv(self, size):
return os.read(self._fd, size)
def send(self, data):
return os.write(self._fd, data)
def fileno(self):
return self._fd
def setblocking(self, blocking):
if not blocking:
# Disable blocking mode on stdin:
current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
pass
else:
raise NotImplementedError('Not implemented')
def close(self):
pass
class RemoteShell(object):
"""Handles basic operations on remote shell."""
def __init__(self, conn, exec_='/bin/bash'):
:param exec_: path of binary to execute
"""
# handles close when waiting process to end from an other thread
self.master, self.slave = pty.openpty()
self.endpoint = FakePtySocket(self.master)
self.proto = conn.create_tunnel(endpoint=self.endpoint,
on_close=self.on_tunnel_close)
self.conn = conn
try:
self.process = subprocess.Popen(
[exec_], stdout=self.slave,
bufsize=0, stdin=self.slave,
stderr=self.slave,
preexec_fn=os.setsid,
close_fds=True,
cwd=os.environ.get('HOME', '/'),
)
except OSError:
logger.exception('Error while executing remote shell')
# close opened fds
self.close_cb()
raise
@property
def label(self):
return self.proto.label
def resize(self, row, col, xpixel, ypixel):
if self.master is not None:
ioctl(self.master, termios.TIOCSWINSZ,
struct.pack('HHHH', row, col, xpixel, ypixel))
def on_tunnel_close(self, *args):
# *args is for callback arguments (ignored)
self.proto, proto = None, self.proto # prevents multiple calls to
# close method
self.close()
def close(self):
if self.process.poll() is None:
# process is still alive
self.process.kill()
self.process.wait()
if self.master is not None:
try:
os.close(self.master)
except OSError:
logger.error('Error when closing file descriptor for pty')
self.master = None
if self.slave is not None:
try:
os.close(self.slave)
except OSError:
logger.error('Error when closing slave file descriptor for pty')
self.slave = None
if self.proto is not None:
try:
self.proto.close()
except Exception:
logger.error('Error while trying to close RPC tunnel')
self.proto = None
"""Handler for host role."""
def __init__(self, *args, **kwargs):
BasePlugin.__init__(self, *args, **kwargs)
self.tag_db.add_tags(tag_inspector(tags, self))
lambda d: Tag('disk%s_size' % d, disk_tag_value(d), 60),
self.tag_db['__main__']['disk']._calculate_value().split(),
# rpc handler
self.rpc_handler.update(dict(
execute_command=self.execute_command,
node_shutdown=self.node_shutdown,
shell=self.shell,
resize=self.shell_resize,
job_cancel=self.job_cancel,
job_purge=self.job_purge,
job_attachment=self.job_attachment,
plugin_install=self.plugin_install,
plugin_uninstall=self.plugin_uninstall,
plugin_run=self.plugin_run
# running shells
self.shells = dict()
#: jobs manager (different from MainLoop.jobs_manager)
try:
self.jobs_manager = JobsManager(logger, NodeJobsManagerInterface(self),
JobsStore(self.main.config.jobs_store_path))
except EnvironmentError as e:
logger.critical('Cannot access jobs directory: %s', e.strerror)
self.main.stop()
raise
#: loaded plugins
self.plugins = {} # plugin name -> plugin object
#: cache directory for scripts
try:
self.scripts_dir = tempfile.mkdtemp(prefix='cc-node-script-cache')
except EnvironmentError as e:
logger.critical('Cannot create temporary directory for scripts: %s',
e.strerror)
self.main.stop()
raise
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def start(self):
BasePlugin.start(self)
# load plugins if persistent file is here
if os.path.isfile(self.main.config.plugins_store_path):
logger.debug('Loading previous plugins')
try:
with open(self.main.config.plugins_store_path) as f:
to_load = pickle.load(f)
except EnvironmentError as exc:
logger.error('Cannot load previous plugins: %s (%s)',
exc.errno, exc.strerror)
except pickle.UnpicklingError as exc:
logger.error('Cannot read file, bad format, %s', exc)
else:
def plugins_install():
for p in to_load:
try:
self.plugin_install(self.main.rpc.rpc_con, None, p)
except Exception:
logger.error('Error while loading plugin %s', p)
else:
logger.debug('Successfuly loaded plugin %s', p)
th = threading.Thread(target=plugins_install)
th.daemon = True
th.start()
else:
logger.debug('No previously loaded plugins')
# remove script cache directory
shutil.rmtree(self.scripts_dir, ignore_errors=True)
# kill all currently running shells
for shell in self.shells.values():
shell.close()
self.shells.clear()
BasePlugin.stop(self)
def update_plugin_index(self):
self.tag_db['__main__']['plugins'].update_value()
# write plugins to file
try:
with open(self.main.config.plugins_store_path, 'w') as f:
pickle.dump([p for p in self.plugins], f)
except EnvironmentError as exc:
logger.error('Cannot save loaded plugins in \'%s\': %s (%s)',
self.main.config.plugins_store_path,
exc.errno, exc.strerror)
else:
logger.debug('Plugins state saved')
def execute_command(self, command):
"""Execute an arbitrary shell command on the host.
:param string command: shell command to run
"""
# return stdout and stderr mixed in
try:
return Popen(command, shell=True, bufsize=-1, stdin=PIPE, stdout=PIPE,
stderr=STDOUT).communicate()[0] or None
except Exception:
logger.exception('Error while executing a remote command')
raise
def node_shutdown(self, reboot=True, gracefull=True):
"""Halt/Reboot the node.
:param bool reboot: halt/reboot the system
:param bool gracefull: force the operation (gracefull == not force)
"""
args = ['/sbin/shutdown']
if reboot:
args.append('-r')
if gracefull:
args.append('-f')
else:
args.append('-n')
else:
# halt
args.append('-h -P')
if not gracefull:
args.append('-n')
else:
args.append('0')
return self.execute_command(' '.join(args))
def shell(self, conn, shell='/bin/bash'):
"""Create a shell tunnel and return the label of the created tunnel.
"""
remote_shell = RemoteShell(conn, shell)
self.shells[remote_shell.label] = remote_shell
return remote_shell.label
@threadless
def shell_resize(self, label, row, col, xpixel, ypixel):
self.shells[label].resize(row, col, xpixel, ypixel)
@pass_connection
def forward(self, conn, label, port, destination='127.0.0.1'):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error:
logger.exception('Cannot create socket in forward handler')
raise
try:
sock.connect((destination, port))
except socket.error:
logger.exception('Error while connecting to destination (forward)')
raise
conn.create_tunnel(label=label, endpoint=sock)
def script_run(self, conn, sha1, script, owner, batch=None, *args):
if not os.access(filename, os.X_OK):
try:
sha1, content = conn.call('script_get', script)
except RpcError:
logger.error('Error while retrieving script: %s', script)
raise
with open(filename, "w") as f:
try:
f.write(content)
os.chmod(filename, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
except OSError:
logger.error(
'Could not set execution permission to script: %s',
script)
raise
except IOError:
logger.error(
'Could not write script to repository: %s', script)
raise
return self.jobs_manager.spawn(ScriptJob, owner, batch=batch,
settings=dict(
script=script,
filename=filename,
args=args,
def job_cancel(self, job_id):
self.jobs_manager.get(job_id).cancel()
def job_purge(self, job_id):
self.jobs_manager.purge(job_id)
def job_attachment(self, job_id, name):
"""
try:
return self.jobs_manager.get(job_id).read_attachment(name)
except Exception:
logger.exception('Error while getting job attachment')
raise
@pass_connection
def plugin_install(self, conn, sha1, name):
# check if plugin is not already loaded and upgrade it if the sha1 hash
# has changed:
if name in self.plugins:
if self.plugins[name].sha1 != sha1:
self.plugins[name].uninstall()
del self.plugins[name]
else:
return
# get plugin from server:
sha1_get, content = conn.call('plugin_get', name)
if sha1 != sha1_get:
raise RuntimeError('Requested sha1 is not available on the server')
# load the plugin:
plugin_logger = logger.getChild('plugin.%s' % name)
self.plugins[name] = Plugin(plugin_logger, TagDB(self.tag_db),
name, sha1, content)
def plugin_uninstall(self, name):
plugin = self.plugins.pop(name, None)
if plugin is None:
raise KeyError('Plugin %r is not running' % name)
plugin.uninstall()
def plugin_run(self, name, method, owner, batch=None, **kwargs):
if name not in self.plugins:
raise KeyError('Plugin %r is not running' % name)
try:
func = self.plugins[name].methods[method]
except KeyError:
raise KeyError('Unknown method %r' % method)
return self.jobs_manager.spawn(PluginMethodJob, owner, batch=batch,
settings=dict(
plugin_name=name,
method_name=method,
method=func,
method_kwargs=kwargs,