Loading ccnode/hypervisor/__init__.py +59 −1 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import weakref from itertools import chain, imap import libvirt from sjrpc.utils import threadless from ccnode.host import Handler as HostHandler from ccnode.tags import Tag, tag_inspector, get_tags Loading @@ -13,7 +14,7 @@ from ccnode.hypervisor.lib import ( ) from ccnode.hypervisor.domains import VirtualMachine from ccnode.exc import UndefinedDomain, PoolStorageError from ccnode.hypervisor.jobs import ImportVolume, ExportVolume from ccnode.hypervisor.jobs import ImportVolume, ExportVolume, TCPTunnel logger = logging.getLogger(__name__) Loading Loading @@ -110,6 +111,10 @@ class Handler(HostHandler): self.main.reset_handler('vol_import', self.vol_import) self.main.reset_handler('vol_import_wait', self.vol_import_wait) self.main.reset_handler('vol_export', self.vol_export) self.main.reset_handler('tun_setup', self.tun_setup) self.main.reset_handler('tun_connect', self.tun_connect) self.main.reset_handler('tun_connect_hv', self.tun_connect_hv) self.main.reset_handler('tun_destroy', self.tun_destroy) # if everything went fine, unregister the timer self.timer.stop() Loading Loading @@ -156,6 +161,10 @@ class Handler(HostHandler): self.main.remove_handler('vol_import') self.main.remove_handler('vol_import_wait') self.main.remove_handler('vol_export') self.main.remove_handler('tun_setup') self.main.remove_handler('tun_connect') self.main.remove_handler('tun_connect_hv') self.main.remove_handler('tun_destroy') # launch connection timer self.timer.start() Loading Loading @@ -344,6 +353,55 @@ class Handler(HostHandler): logger.debug('Export volume successfull') return dict(id=job.id, log='', checksum=job.checksum) @threadless def tun_setup(self, local=True): """Set up local tunnel and listen on a random port. :param local: indicate if we should listen on localhost or all interfaces """ logger.debug('Tunnel setup: local = %s', local) # create job job = self.main.job_manager.create(TCPTunnel, self.main.evloop) job.setup_listen('127.0.0.1' if local else '0.0.0.0') return dict( jid=job.id, key='FIXME', port=job.port, ) @threadless def tun_connect(self, res, remote_res, remote_ip): """Connect tunnel to the other end. :param res: previous result of `tun_setup` handler :param remote_res: other end result of `tun_setup` handler :param remote_ip: where to connect """ logger.debug('Tunnel connect %s %s', res['jid'], remote_ip) job = self.main.job_manager.get(res['jid']) job.setup_connect((remote_ip, remote_res['port'])) @threadless def tun_connect_hv(self, res, migration=False): """Connect tunnel to local libvirt Unix socket. :param res: previous result of `tun_setup` handler """ logger.debug('Tunnel connect hypervisor %s', res['jid']) job = self.main.job_manager.get(res['jid']) job.setup_connect('/var/run/libvirt/libvirt-sock') @threadless def tun_destroy(self, res): """Close given tunnel. :param res: previous result as givent by `tun_setup` handler """ logger.debug('Tunnel destroy %s', res['jid']) self.main.job_manager.cancel(res['jid']) self.main.job_manager.remove(res['jid']) class Hypervisor(object): """Container for all hypervisor related state.""" Loading Loading
ccnode/hypervisor/__init__.py +59 −1 Original line number Diff line number Diff line Loading @@ -3,6 +3,7 @@ import weakref from itertools import chain, imap import libvirt from sjrpc.utils import threadless from ccnode.host import Handler as HostHandler from ccnode.tags import Tag, tag_inspector, get_tags Loading @@ -13,7 +14,7 @@ from ccnode.hypervisor.lib import ( ) from ccnode.hypervisor.domains import VirtualMachine from ccnode.exc import UndefinedDomain, PoolStorageError from ccnode.hypervisor.jobs import ImportVolume, ExportVolume from ccnode.hypervisor.jobs import ImportVolume, ExportVolume, TCPTunnel logger = logging.getLogger(__name__) Loading Loading @@ -110,6 +111,10 @@ class Handler(HostHandler): self.main.reset_handler('vol_import', self.vol_import) self.main.reset_handler('vol_import_wait', self.vol_import_wait) self.main.reset_handler('vol_export', self.vol_export) self.main.reset_handler('tun_setup', self.tun_setup) self.main.reset_handler('tun_connect', self.tun_connect) self.main.reset_handler('tun_connect_hv', self.tun_connect_hv) self.main.reset_handler('tun_destroy', self.tun_destroy) # if everything went fine, unregister the timer self.timer.stop() Loading Loading @@ -156,6 +161,10 @@ class Handler(HostHandler): self.main.remove_handler('vol_import') self.main.remove_handler('vol_import_wait') self.main.remove_handler('vol_export') self.main.remove_handler('tun_setup') self.main.remove_handler('tun_connect') self.main.remove_handler('tun_connect_hv') self.main.remove_handler('tun_destroy') # launch connection timer self.timer.start() Loading Loading @@ -344,6 +353,55 @@ class Handler(HostHandler): logger.debug('Export volume successfull') return dict(id=job.id, log='', checksum=job.checksum) @threadless def tun_setup(self, local=True): """Set up local tunnel and listen on a random port. :param local: indicate if we should listen on localhost or all interfaces """ logger.debug('Tunnel setup: local = %s', local) # create job job = self.main.job_manager.create(TCPTunnel, self.main.evloop) job.setup_listen('127.0.0.1' if local else '0.0.0.0') return dict( jid=job.id, key='FIXME', port=job.port, ) @threadless def tun_connect(self, res, remote_res, remote_ip): """Connect tunnel to the other end. :param res: previous result of `tun_setup` handler :param remote_res: other end result of `tun_setup` handler :param remote_ip: where to connect """ logger.debug('Tunnel connect %s %s', res['jid'], remote_ip) job = self.main.job_manager.get(res['jid']) job.setup_connect((remote_ip, remote_res['port'])) @threadless def tun_connect_hv(self, res, migration=False): """Connect tunnel to local libvirt Unix socket. :param res: previous result of `tun_setup` handler """ logger.debug('Tunnel connect hypervisor %s', res['jid']) job = self.main.job_manager.get(res['jid']) job.setup_connect('/var/run/libvirt/libvirt-sock') @threadless def tun_destroy(self, res): """Close given tunnel. :param res: previous result as givent by `tun_setup` handler """ logger.debug('Tunnel destroy %s', res['jid']) self.main.job_manager.cancel(res['jid']) self.main.job_manager.remove(res['jid']) class Hypervisor(object): """Container for all hypervisor related state.""" Loading