Loading cloudcontrol/node/exc.py +4 −0 Original line number Diff line number Diff line Loading @@ -28,3 +28,7 @@ class DRBDAllocationError(CCNodeError): class DRBDError(CCNodeError): pass class ConsoleAlreadyOpened(CCNodeError): pass cloudcontrol/node/hypervisor/domains/__init__.py +231 −0 Original line number Diff line number Diff line import errno import logging import socket import weakref from StringIO import StringIO from xml.etree import cElementTree as et from collections import namedtuple import pyev import libvirt from cloudcontrol.node.tags import Tag, tag_inspector from cloudcontrol.node.hypervisor import lib as _libvirt from cloudcontrol.node.hypervisor.lib import DOMAIN_STATES as STATE from cloudcontrol.node.hypervisor.domains import vm_tags from cloudcontrol.node.utils import SocketBuffer from cloudcontrol.node.exc import ConsoleAlreadyOpened logger = logging.getLogger(__name__) Loading @@ -18,6 +25,10 @@ NetworkInterface = namedtuple('NetworkInterface', ('source', 'mac', 'model')) class VirtualMachine(object): """Represent a VM instance.""" #: buffer size for console connection handling BUFFER_LEN = 1024 def __init__(self, dom, hypervisor): """ :param dom: libvirt domain instance Loading Loading @@ -59,6 +70,15 @@ class VirtualMachine(object): #: keep record of CPU stats (libev timestamp, cpu time) self.cpu_stats = (hypervisor.handler.main.evloop.now(), dom.info()[4]) # attributes related to console handling self.stream = None self.sock = None # socketpair endpoint self.read_watcher = None # ev watcher for sock self.write_watcher = None # ev watcher for sock self.from_tunnel = None # buffer self.from_stream = None # buffer self.stream_handling = 0 # libvirt stream event mask @property def state(self): return self._state Loading Loading @@ -141,3 +161,214 @@ class VirtualMachine(object): source=source, model=model, ) def open_console(self): if self.stream is not None: raise ConsoleAlreadyOpened logger.info('Opening console stream on VM %s', self.name) try: self.stream = self.hypervisor.vir_con.newStream( libvirt.VIR_STREAM_NONBLOCK) except libvirt.libvirtError: logger.error('Cannot create new stream for console %s', self.name) self.close_console() raise self.stream_handling = libvirt.VIR_STREAM_EVENT_READABLE | ( libvirt.VIR_STREAM_EVENT_ERROR | libvirt.VIR_STREAM_EVENT_HANGUP) try: self.lv_dom.openConsole(None, self.stream, 0) self.stream.eventAddCallback(self.stream_handling, self.virt_console_stream_cb, None) except libvirt.libvirtError: logger.error('Cannot open console on domain %s', self.name) self.close_console() raise try: self.sock, tunnel_endpoint = socket.socketpair() except socket.error: logger.error('Cannot create socket pair for console on domain %s', self.name) self.close_console() raise try: self.sock.setblocking(0) except socket.error: logger.error('Cannot set socket to non blocking for console on' ' domain %s', self.name) self.close_console() raise self.read_watcher = self.hypervisor.handler.main.evloop.io( self.sock, pyev.EV_READ, self.read_from_tun_cb) self.read_watcher.start() self.write_watcher = self.hypervisor.handler.main.evloop.io( self.sock, pyev.EV_WRITE, self.write_to_tun_cb) # self.write_watcher.start() self.from_tunnel = SocketBuffer(4096) self.from_stream = SocketBuffer(4096) return tunnel_endpoint def virt_console_stream_cb(self, stream, events, opaque): """Handles read/write from/to libvirt stream.""" if events & libvirt.VIR_EVENT_HANDLE_ERROR or ( events & libvirt.VIR_EVENT_HANDLE_HANGUP): # error/hangup # logger.debug('Received error on stream') self.close_console() return if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: # logger.debug('Write to stream') # logger.debug('Event %s', self.stream_handling) while True: try: to_send = self.from_tunnel.popleft() except IndexError: # update libvirt event mask self.stream_handling ^= libvirt.VIR_STREAM_EVENT_WRITABLE self.stream.eventUpdateCallback(self.stream_handling) break send_buffer = to_send total_sent = 0 while True: try: written = self.stream.send(send_buffer) except: # libvirt send error logger.exception('Error while writing to stream') self.close_console() return if written == -2: # equivalent to EAGAIN self.from_tunnel.appendleft(to_send[total_sent:]) break elif written == len(send_buffer): break total_sent += written send_buffer = buffer(to_send, total_sent) if not self.from_tunnel.is_full(): self.read_watcher.start() if events & libvirt.VIR_EVENT_HANDLE_READABLE: # logger.debug('Read from stream') # logger.debug('Event %s', self.stream_handling) while True: try: incoming = self.stream.recv(self.BUFFER_LEN) except: pass if incoming == -2: # equivalent to EAGAIN break elif not incoming: # EOF self.close_console() return self.from_stream.append(incoming) if self.from_stream.is_full(): # update libvirt event mask self.stream_handling ^= libvirt.VIR_STREAM_EVENT_READABLE self.stream.eventUpdateCallback(self.stream_handling) break if not self.from_stream.is_empty(): self.write_watcher.start() def read_from_tun_cb(self, watcher, revents): """Read data from tunnel and save into buffer.""" # logger.debug('Read from tunnel') # logger.debug('Event %s', self.stream_handling) while True: try: incoming = self.sock.recv(self.BUFFER_LEN) except socket.error as exc: if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK): break logger.exception('Error reading on socket for vm console') self.close_console() return if not incoming: # EOF (we could wait before closing console stream) self.close_console() return self.from_tunnel.append(incoming) if self.from_tunnel.is_full(): self.read_watcher.stop() break if not self.from_tunnel.is_empty(): # update libvirt event callback self.stream_handling |= libvirt.VIR_STREAM_EVENT_WRITABLE self.stream.eventUpdateCallback(self.stream_handling) def write_to_tun_cb(self, watcher, revents): """Write data from buffer to tunnel.""" # logger.debug('Write to tunnel') # logger.debug('Event %s', self.stream_handling) while True: try: to_send = self.from_stream.popleft() except IndexError: self.write_watcher.stop() break send_buffer = to_send total_sent = 0 while True: try: written = self.sock.send(send_buffer) except socket.error as exc: if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK): self.from_stream.appenleft(to_send[total_sent:]) break logger.exception('Error writing on socket for vm console') self.close_console() return if written == len(send_buffer): break total_sent += written send_buffer = buffer(to_send, total_sent) if not self.from_stream.is_full(): # update libvirt event callback self.stream_handling |= libvirt.VIR_STREAM_EVENT_READABLE self.stream.eventUpdateCallback(self.stream_handling) def close_console(self): logger.info('Closing console stream on VM %s', self.name) if self.stream is not None: try: self.stream.eventRemoveCallback() except Exception: logger.error('Error while removing callback on stream') try: self.stream.finish() except Exception: logger.error('Cannot finnish console stream') self.stream = None if self.sock is not None: try: self.sock.close() except socket.error: logger.error('Cannot close socket') self.sock = None if self.read_watcher is not None: self.read_watcher.stop() self.read_watcher = None if self.write_watcher is not None: self.write_watcher.stop() self.write_watcher = None self.from_tunnel = None self.from_stream = None Loading
cloudcontrol/node/exc.py +4 −0 Original line number Diff line number Diff line Loading @@ -28,3 +28,7 @@ class DRBDAllocationError(CCNodeError): class DRBDError(CCNodeError): pass class ConsoleAlreadyOpened(CCNodeError): pass
cloudcontrol/node/hypervisor/domains/__init__.py +231 −0 Original line number Diff line number Diff line import errno import logging import socket import weakref from StringIO import StringIO from xml.etree import cElementTree as et from collections import namedtuple import pyev import libvirt from cloudcontrol.node.tags import Tag, tag_inspector from cloudcontrol.node.hypervisor import lib as _libvirt from cloudcontrol.node.hypervisor.lib import DOMAIN_STATES as STATE from cloudcontrol.node.hypervisor.domains import vm_tags from cloudcontrol.node.utils import SocketBuffer from cloudcontrol.node.exc import ConsoleAlreadyOpened logger = logging.getLogger(__name__) Loading @@ -18,6 +25,10 @@ NetworkInterface = namedtuple('NetworkInterface', ('source', 'mac', 'model')) class VirtualMachine(object): """Represent a VM instance.""" #: buffer size for console connection handling BUFFER_LEN = 1024 def __init__(self, dom, hypervisor): """ :param dom: libvirt domain instance Loading Loading @@ -59,6 +70,15 @@ class VirtualMachine(object): #: keep record of CPU stats (libev timestamp, cpu time) self.cpu_stats = (hypervisor.handler.main.evloop.now(), dom.info()[4]) # attributes related to console handling self.stream = None self.sock = None # socketpair endpoint self.read_watcher = None # ev watcher for sock self.write_watcher = None # ev watcher for sock self.from_tunnel = None # buffer self.from_stream = None # buffer self.stream_handling = 0 # libvirt stream event mask @property def state(self): return self._state Loading Loading @@ -141,3 +161,214 @@ class VirtualMachine(object): source=source, model=model, ) def open_console(self): if self.stream is not None: raise ConsoleAlreadyOpened logger.info('Opening console stream on VM %s', self.name) try: self.stream = self.hypervisor.vir_con.newStream( libvirt.VIR_STREAM_NONBLOCK) except libvirt.libvirtError: logger.error('Cannot create new stream for console %s', self.name) self.close_console() raise self.stream_handling = libvirt.VIR_STREAM_EVENT_READABLE | ( libvirt.VIR_STREAM_EVENT_ERROR | libvirt.VIR_STREAM_EVENT_HANGUP) try: self.lv_dom.openConsole(None, self.stream, 0) self.stream.eventAddCallback(self.stream_handling, self.virt_console_stream_cb, None) except libvirt.libvirtError: logger.error('Cannot open console on domain %s', self.name) self.close_console() raise try: self.sock, tunnel_endpoint = socket.socketpair() except socket.error: logger.error('Cannot create socket pair for console on domain %s', self.name) self.close_console() raise try: self.sock.setblocking(0) except socket.error: logger.error('Cannot set socket to non blocking for console on' ' domain %s', self.name) self.close_console() raise self.read_watcher = self.hypervisor.handler.main.evloop.io( self.sock, pyev.EV_READ, self.read_from_tun_cb) self.read_watcher.start() self.write_watcher = self.hypervisor.handler.main.evloop.io( self.sock, pyev.EV_WRITE, self.write_to_tun_cb) # self.write_watcher.start() self.from_tunnel = SocketBuffer(4096) self.from_stream = SocketBuffer(4096) return tunnel_endpoint def virt_console_stream_cb(self, stream, events, opaque): """Handles read/write from/to libvirt stream.""" if events & libvirt.VIR_EVENT_HANDLE_ERROR or ( events & libvirt.VIR_EVENT_HANDLE_HANGUP): # error/hangup # logger.debug('Received error on stream') self.close_console() return if events & libvirt.VIR_EVENT_HANDLE_WRITABLE: # logger.debug('Write to stream') # logger.debug('Event %s', self.stream_handling) while True: try: to_send = self.from_tunnel.popleft() except IndexError: # update libvirt event mask self.stream_handling ^= libvirt.VIR_STREAM_EVENT_WRITABLE self.stream.eventUpdateCallback(self.stream_handling) break send_buffer = to_send total_sent = 0 while True: try: written = self.stream.send(send_buffer) except: # libvirt send error logger.exception('Error while writing to stream') self.close_console() return if written == -2: # equivalent to EAGAIN self.from_tunnel.appendleft(to_send[total_sent:]) break elif written == len(send_buffer): break total_sent += written send_buffer = buffer(to_send, total_sent) if not self.from_tunnel.is_full(): self.read_watcher.start() if events & libvirt.VIR_EVENT_HANDLE_READABLE: # logger.debug('Read from stream') # logger.debug('Event %s', self.stream_handling) while True: try: incoming = self.stream.recv(self.BUFFER_LEN) except: pass if incoming == -2: # equivalent to EAGAIN break elif not incoming: # EOF self.close_console() return self.from_stream.append(incoming) if self.from_stream.is_full(): # update libvirt event mask self.stream_handling ^= libvirt.VIR_STREAM_EVENT_READABLE self.stream.eventUpdateCallback(self.stream_handling) break if not self.from_stream.is_empty(): self.write_watcher.start() def read_from_tun_cb(self, watcher, revents): """Read data from tunnel and save into buffer.""" # logger.debug('Read from tunnel') # logger.debug('Event %s', self.stream_handling) while True: try: incoming = self.sock.recv(self.BUFFER_LEN) except socket.error as exc: if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK): break logger.exception('Error reading on socket for vm console') self.close_console() return if not incoming: # EOF (we could wait before closing console stream) self.close_console() return self.from_tunnel.append(incoming) if self.from_tunnel.is_full(): self.read_watcher.stop() break if not self.from_tunnel.is_empty(): # update libvirt event callback self.stream_handling |= libvirt.VIR_STREAM_EVENT_WRITABLE self.stream.eventUpdateCallback(self.stream_handling) def write_to_tun_cb(self, watcher, revents): """Write data from buffer to tunnel.""" # logger.debug('Write to tunnel') # logger.debug('Event %s', self.stream_handling) while True: try: to_send = self.from_stream.popleft() except IndexError: self.write_watcher.stop() break send_buffer = to_send total_sent = 0 while True: try: written = self.sock.send(send_buffer) except socket.error as exc: if exc.errno in (errno.EAGAIN, errno.EWOULDBLOCK): self.from_stream.appenleft(to_send[total_sent:]) break logger.exception('Error writing on socket for vm console') self.close_console() return if written == len(send_buffer): break total_sent += written send_buffer = buffer(to_send, total_sent) if not self.from_stream.is_full(): # update libvirt event callback self.stream_handling |= libvirt.VIR_STREAM_EVENT_READABLE self.stream.eventUpdateCallback(self.stream_handling) def close_console(self): logger.info('Closing console stream on VM %s', self.name) if self.stream is not None: try: self.stream.eventRemoveCallback() except Exception: logger.error('Error while removing callback on stream') try: self.stream.finish() except Exception: logger.error('Cannot finnish console stream') self.stream = None if self.sock is not None: try: self.sock.close() except socket.error: logger.error('Cannot close socket') self.sock = None if self.read_watcher is not None: self.read_watcher.stop() self.read_watcher = None if self.write_watcher is not None: self.write_watcher.stop() self.write_watcher = None self.from_tunnel = None self.from_stream = None