From ce0aad4337ca4567cb9fa7557f5d7f502a4e7333 Mon Sep 17 00:00:00 2001 From: Antoine Millet Date: Thu, 29 Sep 2011 15:19:39 +0200 Subject: [PATCH] Implementation of sjRpc tunnel protocol. --- sjrpc/core/protocols/tunnel.py | 96 ++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 sjrpc/core/protocols/tunnel.py diff --git a/sjrpc/core/protocols/tunnel.py b/sjrpc/core/protocols/tunnel.py new file mode 100644 index 0000000..a759a4a --- /dev/null +++ b/sjrpc/core/protocols/tunnel.py @@ -0,0 +1,96 @@ +from __future__ import absolute_import + +import socket + +from sjrpc.core.protocols import Protocol + +import pyev + +class TunnelProtocol(Protocol): + + ''' + A Tunneling protocol used to + ''' + + GET_SIZE = 1024 * 1024 # 1MB + DEFAULT_GET_SIZE = GET_SIZE + + def __init__(self, *args, **kwargs): + endpoint = kwargs.pop('endpoint', None) + super(TunnelProtocol, self).__init__(*args, **kwargs) + + if endpoint is None: + self._endpoint, self._socket = socket.socketpair() + else: + self._endpoint = endpoint + + self._from_tun_to_endpoint_buf = '' + self._asked = 0 # Data asked to the peer + self._ok_to_send = 0 # Data I can send to the peer + + # Set the endpoint as non-blocking socket: + self._endpoint.setblocking(False) + + # Create watcher to handle data coming from the endpoint: + props = dict(fd=self._endpoint, events=pyev.EV_READ, + callback=self._handle_from_endpoint) + self._endpoint_reader = self._connection.create_watcher(pyev.Io, **props) + + # Create watcher to handle data going to the endpoint: + props = dict(fd=self._endpoint, events=pyev.EV_WRITE, + callback=self._handle_from_tunnel) + self._endpoint_writer = self._connection.create_watcher(pyev.Io, **props) + + # Ask some data to the peer: + self._send_get(TunnelProtocol.GET_SIZE) + + def _handle_from_endpoint(self, watcher, revents): + ''' + Handle data coming from the endpoint socket and push it through the + sjRpc tunnel. + ''' + read = self._endpoint.recv(self._ok_to_send) + self._ok_to_send -= len(read) + self.send(read) + if not self._ok_to_send: + watcher.stop() + + def _handle_from_tunnel(self, watcher, revent): + ''' + Handle writing of the data already received from the tunnel and stored + into the incoming buffer. Data are writed only when the endpoint socket + is ready to write. + + This method is also responsible of asking more data to the peer when + the incoming buffer is empty. + ''' + sent = self._endpoint.send(self._from_tun_to_endpoint_buf) + self._from_tun_to_endpoint_buf = self._from_tun_to_endpoint_buf[sent:] + self._asked -= sent + if self._asked < TunnelProtocol.GET_SIZE * 2: + self._send_get(TunnelProtocol.GET_SIZE) + if not self._from_tun_to_endpoint_buf: + watcher.stop() + + def _send_get(self, size): + self._connection.rpc.send_special('protoctl', label=self._label, + type='get', payload=dict(size=size)) + self._asked += size + +# +# Public methods: +# + + def end_of_message(self): + ''' + Handle inbound data from the :class:`RpcConnection` peer and place it + on the incoming buffer. + ''' + self._from_tun_to_endpoint_buf += self._incoming_buf + self._endpoint_writer.start() + + def handle_control(self, control_type, payload): + if control_type == 'get': + size = payload.get('size', TunnelProtocol.DEFAULT_GET_SIZE) + self._ok_to_send += size + self._endpoint_reader.start() -- GitLab