Skip to content
Snippets Groups Projects
Commit 5c471b8c authored by Antoine Millet's avatar Antoine Millet
Browse files

Added fallback_timeout parameter on RpcConnection

parent faf1306a
No related branches found
No related tags found
No related merge requests found
......@@ -11,6 +11,7 @@ import errno
import struct
import socket
import logging
import threading
from sjrpc.core.protocols import Protocol, RpcProtocol, TunnelProtocol
from sjrpc.core.exceptions import (RpcError, NoFreeLabelError,
......@@ -49,6 +50,9 @@ class RpcConnection(object):
the default rpc
:param sock: the socket object of this newly created :class:`RpcConnection`
:param fallback_timeout: set the maximum time to wait the "capabilities"
message before to send anything to the peer. 0 to wait indefinitly, -1
to disable the fallback mode.
:param \*args,\*\*kwargs: arguments to pass to the default rpc protocol
automatically registered on label 0.
'''
......@@ -63,7 +67,7 @@ class RpcConnection(object):
SHORTCUTS_MAINRPC = ('call', 'async_call')
def __init__(self, sock, loop=None, enable_tcp_keepalive=False,
*args, **kwargs):
fallback_timeout=1.0, *args, **kwargs):
# Sock of this connection:
self._sock = sock
sock.setblocking(False)
......@@ -87,7 +91,10 @@ class RpcConnection(object):
# Socket inbound/outbound buffers:
self._inbound_buffer = ''
self._outbound_buffer = ''
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
if fallback_timeout == -1:
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER)
else:
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
self._proto_receiving = None
# Initialize main read/write watchers:
......@@ -120,13 +127,23 @@ class RpcConnection(object):
for name in RpcConnection.SHORTCUTS_MAINRPC:
setattr(self, name, getattr(self.get_protocol(0), name))
self._event_fallback = threading.Event()
# By default, enter in fallback mode, no label, all frames are
# redirected on Rpc0:
self.fallback = True
if fallback_timeout != -1:
self.fallback = True
self.create_watcher(pyev.Timer, after=fallback_timeout, repeat=0,
callback=self._cb_set_event_fallback).start()
# Set the event fallback just to send the capability message:
self._event_fallback.set()
# Send our capabilities to the peer:
self._remote_capabilities = None
self._send_capabilities()
# And clear it after:
self._event_fallback.clear()
@classmethod
def from_addr(cls, addr, port, conn_timeout=30.0, *args, **kwargs):
......@@ -285,6 +302,8 @@ class RpcConnection(object):
def _cb_need_to_send(self, watcher, revents):
self._sock_writer.start()
def _cb_set_event_fallback(self, watcher, revents):
self._event_fallback.set()
#
# Public API
#
......@@ -313,6 +332,7 @@ class RpcConnection(object):
Low level method to send a message through the socket, generally
used by protocols.
'''
self._event_fallback.wait()
if not self._connected:
raise RpcError('RpcError', 'Not connected to the peer')
size = len(payload)
......@@ -331,6 +351,7 @@ class RpcConnection(object):
'''
self._remote_capabilities = capabilities
self.fallback = False
self._event_fallback.set()
def register_protocol(self, label, protocol_class, *args, **kwargs):
'''
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment