Skip to content
jobs.py 37.6 KiB
Newer Older
Thibault VINCENT's avatar
Thibault VINCENT committed
                if not connected:
                    continue
                
                ## tunnel data between sockets
                # init socket poller
                mask_ro = select.EPOLLIN ^ select.EPOLLERR ^ select.EPOLLHUP
                mask_rw = mask_ro ^ select.EPOLLOUT
                poller = select.epoll()
                poller.register(self._sock_server, mask_ro)
                poller.register(self._sock_client, mask_ro)
                # forward data until a connection is closed
                buf_server = ''
                buf_client = ''
                connected = True
                while not self._cancelled and connected:
                    # wait for events on both sockets
                    poll = poller.poll()
                    ## process events for read/write operations only
                    for fd, events in poll:
                        # events are for server socket
                        if fd == self._sock_server.fileno():
                            # read available
                            if events & select.EPOLLIN:
                                # read incoming data
                                try:
                                    read = self._sock_server.recv(4096)
                                except socket.error:
                                    connected = False
                                buf_server += read
                                # set the other socket to notify us when it's
                                # available for writing
                                if len(buf_server):
                                    poller.modify(self._sock_client, mask_rw)
                            # write available
                            if events & select.EPOLLOUT:
                                # try to send the whole buffer
                                try:
                                    sent = self._sock_server.send(buf_client)
                                except socket.error:
                                    connected = False
                                # drop sent data from the buffer
                                buf_client = buf_client[sent:]
                                # if the buffer becomes empty, stop write polling
                                if not len(buf_client):
                                    poller.modify(self._sock_server, mask_ro)
                        # events for client socket
                        else:
                            # read available
                            if events & select.EPOLLIN:
                                # read incoming data
                                try:
                                    read = self._sock_client.recv(4096)
                                except socket.error:
                                    connected = False
                                buf_client += read
                                # set the other socket to notify us when it's
                                # available for writing
                                if len(buf_client):
                                    poller.modify(self._sock_server, mask_rw)
                            # write available
                            if events & select.EPOLLOUT:
                                # try to send the whole buffer
                                try:
                                    sent = self._sock_client.send(buf_server)
                                except socket.error:
                                    connected = False
                                # drop sent data from the buffer
                                buf_server = buf_server[sent:]
                                # if the buffer becomes empty, stop write polling
                                if not len(buf_server):
                                    poller.modify(self._sock_client, mask_ro)
                if connected is False:
                    self._log.append('disconnected')
                try:
                    self._sock_server.close()
                except: pass
                try:
                    self._sock_client.close()
                except: pass
        except Exception as err:
            traceback.print_exc()
            raise err
    
    def tunnel_get_server_port(self):
        '''
        '''
        self._event_server_bound.wait()
        return self._server_port
    
    def tunnel_connect(self, endpoint):
        '''
        '''
        with self._mutex:
            if not self._event_client_conf.is_set():
                self._endpoint = endpoint
                self._event_client_conf.set()
            else:
                raise TCPTunnelJobError('remote endpoint already set')
    
    def tunnel_listen(self, local):
        '''
        '''
        with self._mutex:
            if not self._event_server_conf.is_set():
                self._server_local = local is True
                self._event_server_conf.set()
            else:
                raise TCPTunnelJobError('server parameters already set')