Loading sjrpc/core/protocols/tunnel.py +19 −3 Original line number Diff line number Diff line Loading @@ -11,7 +11,6 @@ __all__ = ['TunnelProtocol'] class TunnelProtocol(Protocol): ''' A Tunneling protocol used to ''' GET_SIZE = 1024 * 1024 # 1MB Loading @@ -19,6 +18,7 @@ class TunnelProtocol(Protocol): def __init__(self, *args, **kwargs): endpoint = kwargs.pop('endpoint', None) autostart = kwargs.pop('autostart', True) super(TunnelProtocol, self).__init__(*args, **kwargs) if endpoint is None: Loading @@ -29,6 +29,7 @@ class TunnelProtocol(Protocol): self._cb_on_close = kwargs.pop('on_close', self._cb_default_on_close) self._is_closed = False self._is_started = autostart self._from_tun_to_endpoint_buf = '' self._asked = 0 # Data asked to the peer self._ok_to_send = 0 # Data I can send to the peer Loading @@ -54,6 +55,11 @@ class TunnelProtocol(Protocol): Handle data coming from the endpoint socket and push it through the sjRpc tunnel. ''' # Abort if peer don't want more data: if self._ok_to_send <= 0: watcher.stop return try: read = self._endpoint.recv(self._ok_to_send) except (IOError, socket.error) as err: Loading Loading @@ -124,6 +130,14 @@ class TunnelProtocol(Protocol): ''' return self._endpoint def start(self): ''' Start the reader on the endpoint. ''' if not self._is_closed: self._is_started = True self._endpoint_reader.start() def close(self): ''' Close the tunnel and unregister it from connection. Loading Loading @@ -152,10 +166,12 @@ class TunnelProtocol(Protocol): if control_type == 'get': size = payload.get('size', TunnelProtocol.DEFAULT_GET_SIZE) self._ok_to_send += size if self._is_started: self._endpoint_reader.start() elif control_type == 'ready': self._send_get(TunnelProtocol.GET_SIZE) self._ok_to_send += TunnelProtocol.GET_SIZE if self._is_started: self._endpoint_reader.start() elif control_type == 'eos': self.logger.debug('Received EOS event') Loading Loading
sjrpc/core/protocols/tunnel.py +19 −3 Original line number Diff line number Diff line Loading @@ -11,7 +11,6 @@ __all__ = ['TunnelProtocol'] class TunnelProtocol(Protocol): ''' A Tunneling protocol used to ''' GET_SIZE = 1024 * 1024 # 1MB Loading @@ -19,6 +18,7 @@ class TunnelProtocol(Protocol): def __init__(self, *args, **kwargs): endpoint = kwargs.pop('endpoint', None) autostart = kwargs.pop('autostart', True) super(TunnelProtocol, self).__init__(*args, **kwargs) if endpoint is None: Loading @@ -29,6 +29,7 @@ class TunnelProtocol(Protocol): self._cb_on_close = kwargs.pop('on_close', self._cb_default_on_close) self._is_closed = False self._is_started = autostart self._from_tun_to_endpoint_buf = '' self._asked = 0 # Data asked to the peer self._ok_to_send = 0 # Data I can send to the peer Loading @@ -54,6 +55,11 @@ class TunnelProtocol(Protocol): Handle data coming from the endpoint socket and push it through the sjRpc tunnel. ''' # Abort if peer don't want more data: if self._ok_to_send <= 0: watcher.stop return try: read = self._endpoint.recv(self._ok_to_send) except (IOError, socket.error) as err: Loading Loading @@ -124,6 +130,14 @@ class TunnelProtocol(Protocol): ''' return self._endpoint def start(self): ''' Start the reader on the endpoint. ''' if not self._is_closed: self._is_started = True self._endpoint_reader.start() def close(self): ''' Close the tunnel and unregister it from connection. Loading Loading @@ -152,10 +166,12 @@ class TunnelProtocol(Protocol): if control_type == 'get': size = payload.get('size', TunnelProtocol.DEFAULT_GET_SIZE) self._ok_to_send += size if self._is_started: self._endpoint_reader.start() elif control_type == 'ready': self._send_get(TunnelProtocol.GET_SIZE) self._ok_to_send += TunnelProtocol.GET_SIZE if self._is_started: self._endpoint_reader.start() elif control_type == 'eos': self.logger.debug('Received EOS event') Loading