Loading sjrpc/core/protocols/tunnel.py +18 −17 Original line number Diff line number Diff line from __future__ import absolute_import import threading import socket from sjrpc.core.protocols import Protocol Loading @@ -16,7 +17,7 @@ class TunnelProtocol(Protocol): def __init__(self, *args, **kwargs): endpoint = kwargs.pop('endpoint', None) autostart = kwargs.pop('autostart', True) self._cb_on_close = kwargs.pop('on_close', self.cb_default_on_close) self._cb_on_shutdown = kwargs.pop('on_shutdown', self.cb_default_on_shutdown) super(TunnelProtocol, self).__init__(*args, **kwargs) if endpoint is None: Loading @@ -25,7 +26,7 @@ class TunnelProtocol(Protocol): self._endpoint = endpoint self._socket = None self._is_closed = False self._is_shutdown = threading.Event() self._is_started = autostart self._from_tun_to_endpoint_buf = '' self._asked = 0 # Data asked to the peer Loading Loading @@ -62,12 +63,12 @@ class TunnelProtocol(Protocol): if err.errno in self.connection.NONBLOCKING_ERRORS: return else: self.close() self.shutdown() return # Empty read means the connection has been closed on other side: if not read: self.close() self.shutdown() return self._ok_to_send -= len(read) Loading @@ -90,7 +91,7 @@ class TunnelProtocol(Protocol): if err.errno in self.connection.NONBLOCKING_ERRORS: return else: self.close() self.shutdown() return self._from_tun_to_endpoint_buf = self._from_tun_to_endpoint_buf[sent:] Loading @@ -105,8 +106,8 @@ class TunnelProtocol(Protocol): type='get', payload=dict(size=size)) self._asked += size def cb_default_on_close(self, tun): """ Action to do on the endpoint when the connection is closed. def cb_default_on_shutdown(self, tun): """ Action to do on the endpoint when the connection is shutdown. """ tun.endpoint.close() Loading @@ -127,23 +128,23 @@ class TunnelProtocol(Protocol): def start(self): """ Start the reader on the endpoint. """ if not self._is_closed: if not self._is_shutdown.is_set(): self._is_started = True self._endpoint_reader.start() def close(self): """ Close the tunnel and unregister it from connection. def shutdown(self): """ Shutdown the tunnel. """ if not self._is_closed: self._is_closed = True if not self._is_shutdown.is_set(): self._is_shutdown.set() # Stop watchers: self._endpoint_reader.stop() self._endpoint_writer.stop() # Send the end of stream message to the peer: self._connection.rpc.send_special('protoctl', label=self._label, type='eos') # Unregister the tunnel: self._connection.unregister_protocol(self._label) self._cb_on_close(self) # Execute the callback: self._cb_on_shutdown(self) super(TunnelProtocol, self).shutdown() def end_of_message(self): """ Handle inbound data from the :class:`RpcConnection` peer and place Loading @@ -153,7 +154,7 @@ class TunnelProtocol(Protocol): self._endpoint_writer.start() def handle_control(self, control_type, payload): if not self._is_closed: if not self._is_shutdown.is_set(): if control_type == 'get': size = payload.get('size', TunnelProtocol.DEFAULT_GET_SIZE) self._ok_to_send += size Loading @@ -166,4 +167,4 @@ class TunnelProtocol(Protocol): self._endpoint_reader.start() elif control_type == 'eos': self.logger.debug('Received EOS event') self.close() self.shutdown() Loading
sjrpc/core/protocols/tunnel.py +18 −17 Original line number Diff line number Diff line from __future__ import absolute_import import threading import socket from sjrpc.core.protocols import Protocol Loading @@ -16,7 +17,7 @@ class TunnelProtocol(Protocol): def __init__(self, *args, **kwargs): endpoint = kwargs.pop('endpoint', None) autostart = kwargs.pop('autostart', True) self._cb_on_close = kwargs.pop('on_close', self.cb_default_on_close) self._cb_on_shutdown = kwargs.pop('on_shutdown', self.cb_default_on_shutdown) super(TunnelProtocol, self).__init__(*args, **kwargs) if endpoint is None: Loading @@ -25,7 +26,7 @@ class TunnelProtocol(Protocol): self._endpoint = endpoint self._socket = None self._is_closed = False self._is_shutdown = threading.Event() self._is_started = autostart self._from_tun_to_endpoint_buf = '' self._asked = 0 # Data asked to the peer Loading Loading @@ -62,12 +63,12 @@ class TunnelProtocol(Protocol): if err.errno in self.connection.NONBLOCKING_ERRORS: return else: self.close() self.shutdown() return # Empty read means the connection has been closed on other side: if not read: self.close() self.shutdown() return self._ok_to_send -= len(read) Loading @@ -90,7 +91,7 @@ class TunnelProtocol(Protocol): if err.errno in self.connection.NONBLOCKING_ERRORS: return else: self.close() self.shutdown() return self._from_tun_to_endpoint_buf = self._from_tun_to_endpoint_buf[sent:] Loading @@ -105,8 +106,8 @@ class TunnelProtocol(Protocol): type='get', payload=dict(size=size)) self._asked += size def cb_default_on_close(self, tun): """ Action to do on the endpoint when the connection is closed. def cb_default_on_shutdown(self, tun): """ Action to do on the endpoint when the connection is shutdown. """ tun.endpoint.close() Loading @@ -127,23 +128,23 @@ class TunnelProtocol(Protocol): def start(self): """ Start the reader on the endpoint. """ if not self._is_closed: if not self._is_shutdown.is_set(): self._is_started = True self._endpoint_reader.start() def close(self): """ Close the tunnel and unregister it from connection. def shutdown(self): """ Shutdown the tunnel. """ if not self._is_closed: self._is_closed = True if not self._is_shutdown.is_set(): self._is_shutdown.set() # Stop watchers: self._endpoint_reader.stop() self._endpoint_writer.stop() # Send the end of stream message to the peer: self._connection.rpc.send_special('protoctl', label=self._label, type='eos') # Unregister the tunnel: self._connection.unregister_protocol(self._label) self._cb_on_close(self) # Execute the callback: self._cb_on_shutdown(self) super(TunnelProtocol, self).shutdown() def end_of_message(self): """ Handle inbound data from the :class:`RpcConnection` peer and place Loading @@ -153,7 +154,7 @@ class TunnelProtocol(Protocol): self._endpoint_writer.start() def handle_control(self, control_type, payload): if not self._is_closed: if not self._is_shutdown.is_set(): if control_type == 'get': size = payload.get('size', TunnelProtocol.DEFAULT_GET_SIZE) self._ok_to_send += size Loading @@ -166,4 +167,4 @@ class TunnelProtocol(Protocol): self._endpoint_reader.start() elif control_type == 'eos': self.logger.debug('Received EOS event') self.close() self.shutdown()