Newer
Older
'''
This module contains the RpcConnection class, more informations about this
class are located in it docstring.
'''
from __future__ import absolute_import
from sjrpc.core.protocols import Protocol, RpcProtocol, TunnelProtocol
from sjrpc.core.exceptions import (RpcError, NoFreeLabelError,
FallbackModeEnabledError)
import pyev
class RpcConnection(object):
'''
This class manage a single peer connection.
You can wrap an existing socket with the default constructor::
>>> conn = RpcConnection(mysocket)
Or create a new socket automatically with from_addr constructor::
>>> conn = RpcConnection.from_addr(host, port)
If you prefer SSL connection, you can use the from_addr_ssl constructor::
>>> conn = RpcConnection.from_addr_ssl(host, port)
By default, an :class:`RpcProtocol` is created on label 0, you can access
to this rpc through the `conn.rpc` shortcut::
>>> conn.rpc.call('ping')
Also, the connection object expose :meth:`call` and :meth:`async_call`
method from default rpc, so you can use it directly on connection::
>>> conn.call('ping') # Equivalent to the exemple before
.. seealso::
You can read the :ref:`Default rpc, aka Rpc0` section to know more about
the default rpc
:param sock: the socket object of this newly created :class:`RpcConnection`
:param \*args,\*\*kwargs: arguments to pass to the default rpc protocol
automatically registered on label 0.
NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK)
NONBLOCKING_SSL_ERRORS = (ssl.SSL_ERROR_WANT_READ)
MESSAGE_HEADER = '!HL'
MESSAGE_HEADER_FALLBACK = '!L'
MAX_LABEL = 2 ** 16
SHORTCUTS_MAINRPC = ('call', 'async_call')
def __init__(self, sock, loop=None, enable_tcp_keepalive=False,
*args, **kwargs):
sock.setblocking(False)
# Initialization requires fallback mode disabled:
self.fallback = False
# Get the pyev loop:
if loop is None:
self.loop = pyev.default_loop()
else:
self.loop = loop
# Activate TCP keepalive on the connection:
if enable_tcp_keepalive:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Watcher list:
self._watchers = set()
# Socket inbound/outbound buffers:
self._inbound_buffer = ''
self._outbound_buffer = ''
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
self._proto_receiving = None
# Initialize main read/write watchers:
self._sock_reader = self.create_watcher(pyev.Io,
fd=self._sock,
events=pyev.EV_READ,
callback=self._dispatch)
self._sock_reader.start()
self._sock_writer = self.create_watcher(pyev.Io,
fd=self._sock,
events=pyev.EV_WRITE,
callback=self._writer)
Antoine Millet
committed
# Is the RpcConnection connected to its peer:
self._connected = True
# "Need to send" loop signal:
self._need_to_send = self.create_watcher(pyev.Async,
callback=self._cb_need_to_send)
self._need_to_send.start()
self.logger = logging.getLogger('sjrpc.%s' % self.getpeername())
# Protocols registered on this connection:
self._protocols = {}
self.register_protocol(0, RpcProtocol, *args, **kwargs)
# Create shortcuts to the main rpc (protocol 0) for convenience:
for name in RpcConnection.SHORTCUTS_MAINRPC:
setattr(self, name, getattr(self.get_protocol(0), name))
# By default, enter in fallback mode, no label, all frames are
# redirected on Rpc0:
self.fallback = True
self._remote_capabilities = None
self._send_capabilities()
def from_addr(cls, addr, port, conn_timeout=30.0, *args, **kwargs):
'''
Construct the instance of :class:`RpcConnection` without providing
the :class:`socket` object. Socket is automatically created and passed
to the standard constructor before to return the new instance.
:param addr: the target ip address
:param port: the target port
:param conn_timeout: the connection operation timeout
:param \*args,\*\*kwargs: extra argument to pass to the constructor (see
constructor doctring)
'''
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(conn_timeout)
sock.setblocking(True)
return cls(sock, *args, **kwargs)
@classmethod
def from_addr_ssl(cls, addr, port, cert=None,
conn_timeout=30, *args, **kwargs):
'''
Construct :class:`RpcConnection` instance like :meth:`from_addr`, but
enable ssl on socket.
:param cert: ssl client certificate or None for ssl without certificat
'''
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(conn_timeout)
sock.connect((addr, port))
sock.setblocking(True)
req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED
sock = ssl.wrap_socket(sock, certfile=cert, cert_reqs=req,
ssl_version=ssl.PROTOCOL_TLSv1)
return cls(sock, *args, **kwargs)
Antoine Millet
committed
return '<RpcConnection object>'
def __hash__(self):
return self._sock.__hash__()
def __nonzero__(self):
return self._connected
def _send_capabilities(self):
'''
Send capabilities to the peer, only work in fallback mode for
compatibility with old sjRpc.
Send a special message through the Rpc0 with these fields:
- special: 'capabilities'
- capabilities: {'version': REMOTE_VERSION, 'capabilities': []}
'''
from sjrpc import __version__
cap = {'version': __version__, 'capabilities':['rpc', 'tunnel']}
rpc0 = self.get_protocol(0)
rpc0.send_special('capabilities', capabilities=cap)
def _dispatch(self, watcher, revents):
Read next message from socket and dispatch it to accoding protocol
handler.
# Try to received remaining data from the socket:
try:
buf = self._sock.recv(self._remains)
except socket.error as err:
if (isinstance(err, socket.error) and err.errno
in RpcConnection.NONBLOCKING_ERRORS):
return
elif (isinstance(err, ssl.SSLError) and err.errno
in RpcConnection.NONBLOCKING_SSL_ERRORS):
return
else:
raise
if not buf:
# Empty data on non-blocking socket means that the connection
# has been closed:
self.shutdown()
if self._proto_receiving is None:
self._inbound_buffer += buf
if self._remains == 0:
if self.fallback:
pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER_FALLBACK, self._inbound_buffer)[0]
label = 0
else:
label, pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER, self._inbound_buffer)
# Get the registered protocol for the specified label:
self._proto_receiving = self._protocols.get(label)
# If frame's label is not binded to a protocol, we create a
# dummy protocol to consume the payload:
if self._proto_receiving is None:
self._proto_receiving = Protocol(self, -1)
self._proto_receiving.start_message(pl_size)
self._inbound_buffer = ''
self._remains = pl_size
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
self._proto_receiving.feed(buf)
if self._remains == 0:
self._proto_receiving.end_of_message()
if self.fallback:
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
else:
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER)
self._inbound_buffer = ''
self._proto_receiving = None
def _writer(self, watcher, revent):
'''
Write data on the socket.
'''
if self._outbound_buffer:
try:
if self.fallback:
sent = self._sock.send(self._outbound_buffer[:4096])
else:
sent = self._sock.send(self._outbound_buffer)
except socket.error as err:
errmsg = 'Fatal error while sending through socket: %s' % err
self.logger.error(errmsg)
raise RpcError('SocketError', errmsg)
self._outbound_buffer = self._outbound_buffer[sent:]
if not self._outbound_buffer:
watcher.stop()
def _cb_need_to_send(self, watcher, revents):
self._sock_writer.start()
#
# Public API
#
@property
def rpc(self):
return self.get_protocol(0)
def run(self):
'''
Main loop execution.
'''
self.loop.start()
def create_watcher(self, watcher_class, **kwargs):
'''
Create a new pyev watcher and return it.
'''
kwargs['loop'] = self.loop
watcher = watcher_class(**kwargs)
self._watchers.add(watcher)
return watcher
def send(self, label, payload):
Low level method to send a message through the socket, generally
used by protocols.
Antoine Millet
committed
if not self._connected:
raise RpcError('RpcError', 'Not connected to the peer')
size = len(payload)
if self.fallback:
header = struct.pack(RpcConnection.MESSAGE_HEADER_FALLBACK, size)
else:
header = struct.pack(RpcConnection.MESSAGE_HEADER, label, size)
self._outbound_buffer += header + payload
self._need_to_send.send()
def set_capabilities(self, capabilities):
'''
Set capabilities of remote host (and disable fallback mode).
Should be called by Rpc0 when the peer send its capabilities message.
'''
self._remote_capabilities = capabilities
self.fallback = False
def register_protocol(self, label, protocol_class, *args, **kwargs):
Register a new protocol for the specified label.
if self.fallback:
raise FallbackModeEnabledError('Fallback mode is not compatible '
'with protocols')
if label is None:
for label in xrange(0, RpcConnection.MAX_LABEL):
if label not in self._protocols:
break
else:
raise NoFreeLabelError('No more label number are availables')
if label in self._protocols:
raise KeyError('A protocol is already registered for this label')
elif not isinstance(label, int):
raise ValueError('Label must be an integer')
self._protocols[label] = protocol_class(self, label, *args, **kwargs)
return self._protocols[label]
def unregister_protocol(self, label):
Unregister the specified protocol label for this connection.
if self.fallback:
raise FallbackModeEnabledError('Fallback mode is not compatible '
'with protocols')
if label in self._protocols and label != 0:
del self._protocols[label]
raise KeyError('No protocol registered for this label')
def create_rpc(self, label=None, *args, **kwargs):
'''
Shortcut which can be used to create rpc protocols.
'''
return self.register_protocol(label, RpcProtocol, *args, **kwargs)
def create_tunnel(self, label=None, *args, **kwargs):
'''
Shortcut which can be used to create tunnels protocols.
'''
return self.register_protocol(label, TunnelProtocol, *args, **kwargs)
def get_protocol(self, label):
Get the protocol registered for specified label.
proto = self._protocols.get(label)
if proto is None:
raise KeyError('No protocol registered for this label')
return proto
def shutdown(self):
# Shutdown each registered watcher:
for watcher in self._watchers:
watcher.stop()
# Shutdown each registered protocols:
for proto in self._protocols.itervalues():
proto.shutdown()
Antoine Millet
committed
# Close the connection socket:
self._connected = False
try:
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
except socket.error as err:
#self.logger.warn('Error while socket close: %s', err)
pass
def get_handler(self):
'''
Return the handler binded to the :class:`RpcConnection`.
:return: binded handler
'''
return self._handler
def set_handler(self, handler):
'''
Define a new handler for this connection.
:param handler: the new handler to define.
'''
self._handler = handler
def get_fd(self):
'''
Get the file descriptor of the socket managed by this connection.
:return: the file descriptor number of the socket
'''
try:
return self._sock.fileno()
except socket.error:
return None
def getpeername(self):
'''
Get the peer name.
:return: string representing the peer name
'''
return '%s:%s' % self._sock.getpeername()