Newer
Older
from __future__ import absolute_import
import threading
import socket
from sjrpc.core.protocols import Protocol
import pyev
class TunnelProtocol(Protocol):
GET_SIZE = 1024 * 1024 # 1MB
DEFAULT_GET_SIZE = GET_SIZE
def __init__(self, *args, **kwargs):
endpoint = kwargs.pop('endpoint', None)
autostart = kwargs.pop('autostart', True)
self._cb_on_shutdown = kwargs.pop('on_shutdown', self.cb_default_on_shutdown)
super(TunnelProtocol, self).__init__(*args, **kwargs)
if endpoint is None:
self._endpoint, self._socket = socket.socketpair()
else:
self._endpoint = endpoint
self._socket = None
self._is_shutdown = threading.Event()
self._is_started = autostart
self._from_tun_to_endpoint_buf = ''
self._asked = 0 # Data asked to the peer
self._ok_to_send = 0 # Data I can send to the peer
# Set the endpoint as non-blocking socket:
self._endpoint.setblocking(False)
# Create watcher to handle data coming from the endpoint:
props = dict(fd=self._endpoint, events=pyev.EV_READ,
callback=self._handle_from_endpoint)
Antoine Millet
committed
self._endpoint_reader = self.create_watcher(pyev.Io, **props)
# Create watcher to handle data going to the endpoint:
props = dict(fd=self._endpoint, events=pyev.EV_WRITE,
callback=self._handle_from_tunnel)
Antoine Millet
committed
self._endpoint_writer = self.create_watcher(pyev.Io, **props)
self._connection.rpc.send_special('protoctl', label=self._label,
type='ready')
def _handle_from_endpoint(self, watcher, revents):
""" Handle data coming from the endpoint socket and push it through the
sjRpc tunnel.
"""
# Abort if peer don't want more data:
if self._ok_to_send <= 0:
watcher.stop
return
try:
read = self._endpoint.recv(self._ok_to_send)
except (IOError, socket.error) as err:
if err.errno in self.connection.NONBLOCKING_ERRORS:
return
else:
self.shutdown()
return
# Empty read means the connection has been closed on other side:
if not read:
self.shutdown()
self._ok_to_send -= len(read)
self.send(read)
if not self._ok_to_send:
watcher.stop()
def _handle_from_tunnel(self, watcher, revent):
""" Handle writing of the data already received from the tunnel and stored
into the incoming buffer.
Data are writed only when the endpoint socket is ready to write.
This method is also responsible of asking more data to the peer when
the incoming buffer is empty.
try:
sent = self._endpoint.send(self._from_tun_to_endpoint_buf)
except (IOError, socket.error) as err:
if err.errno in self.connection.NONBLOCKING_ERRORS:
return
else:
self.shutdown()
self._from_tun_to_endpoint_buf = self._from_tun_to_endpoint_buf[sent:]
self._asked -= sent
if self._asked < TunnelProtocol.GET_SIZE * 2:
self._send_get(TunnelProtocol.GET_SIZE)
if not self._from_tun_to_endpoint_buf:
watcher.stop()
def _send_get(self, size):
self._connection.rpc.send_special('protoctl', label=self._label,
type='get', payload=dict(size=size))
self._asked += size
def cb_default_on_shutdown(self, tun):
""" Action to do on the endpoint when the connection is shutdown.
#
# Public methods:
#
@property
def socket(self):
return self._socket
""" Return the tunnel endpoint.
"""
""" Start the reader on the endpoint.
"""
if not self._is_shutdown.is_set():
self._is_started = True
self._endpoint_reader.start()
def shutdown(self):
""" Shutdown the tunnel.
if not self._is_shutdown.is_set():
self._is_shutdown.set()
# Stop watchers:
self._endpoint_reader.stop()
self._endpoint_writer.stop()
# Send the end of stream message to the peer:
self._connection.rpc.send_special('protoctl', label=self._label, type='eos')
# Execute the callback:
self._cb_on_shutdown(self)
super(TunnelProtocol, self).shutdown()
""" Handle inbound data from the :class:`RpcConnection` peer and place
it on the incoming buffer.
"""
self._from_tun_to_endpoint_buf += self._incoming_buf
self._endpoint_writer.start()
def handle_control(self, control_type, payload):
if not self._is_shutdown.is_set():
if control_type == 'get':
size = payload.get('size', TunnelProtocol.DEFAULT_GET_SIZE)
self._ok_to_send += size
if self._is_started:
self._endpoint_reader.start()
elif control_type == 'ready':
self._send_get(TunnelProtocol.GET_SIZE)
self._ok_to_send += TunnelProtocol.GET_SIZE
if self._is_started:
self._endpoint_reader.start()
elif control_type == 'eos':
self.logger.debug('Received EOS event')
self.shutdown()