pax_global_header 0000666 0000000 0000000 00000000064 13437517267 0014531 g ustar 00root root 0000000 0000000 52 comment=4616ac693c088871ff597a3493a288e9c2fb9cc3
sjrpc-master/ 0000775 0000000 0000000 00000000000 13437517267 0013507 5 ustar 00root root 0000000 0000000 sjrpc-master/.gitignore 0000664 0000000 0000000 00000000051 13437517267 0015473 0 ustar 00root root 0000000 0000000 *.pyc
doc/_build/*
*.swp
*.log
test_*.py
sjrpc-master/CHANGELOG 0000664 0000000 0000000 00000000000 13437517267 0014707 0 ustar 00root root 0000000 0000000 sjrpc-master/COPYRIGHT 0000664 0000000 0000000 00000000040 13437517267 0014774 0 ustar 00root root 0000000 0000000 Copytight © 2010-2012 Smartjog
sjrpc-master/LICENSE 0000664 0000000 0000000 00000016743 13437517267 0014527 0 ustar 00root root 0000000 0000000 GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc.
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.
sjrpc-master/README 0000664 0000000 0000000 00000000000 13437517267 0014355 0 ustar 00root root 0000000 0000000 sjrpc-master/doc/ 0000775 0000000 0000000 00000000000 13437517267 0014254 5 ustar 00root root 0000000 0000000 sjrpc-master/doc/Makefile 0000664 0000000 0000000 00000006064 13437517267 0015722 0 ustar 00root root 0000000 0000000 # Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
.PHONY: help clean html dirhtml pickle json htmlhelp qthelp latex changes linkcheck doctest
help:
@echo "Please use \`make ' where is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
clean:
-rm -rf $(BUILDDIR)/*
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/sjRpc.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/sjRpc.qhc"
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make all-pdf' or \`make all-ps' in that directory to" \
"run these through (pdf)latex."
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
sjrpc-master/doc/api/ 0000775 0000000 0000000 00000000000 13437517267 0015025 5 ustar 00root root 0000000 0000000 sjrpc-master/doc/api/core.protocols.rst 0000664 0000000 0000000 00000000137 13437517267 0020533 0 ustar 00root root 0000000 0000000 Protocols
---------
.. automodule:: sjrpc.core.protocols
:members:
:inherited-members:
sjrpc-master/doc/api/core.rst 0000664 0000000 0000000 00000000337 13437517267 0016512 0 ustar 00root root 0000000 0000000 Core library
------------
.. toctree::
:maxdepth: 2
:hidden:
core.protocols
.. seealso::
API documentation on the sub-package :doc:`core.protocols`.
.. automodule:: sjrpc.core
:members:
:inherited-members:
sjrpc-master/doc/api/index.rst 0000664 0000000 0000000 00000000165 13437517267 0016670 0 ustar 00root root 0000000 0000000 sjRpc API
=========
.. automodule:: sjrpc
Sub-packages:
.. toctree::
:maxdepth: 3
core
server
utils
sjrpc-master/doc/api/server.rst 0000664 0000000 0000000 00000000153 13437517267 0017064 0 ustar 00root root 0000000 0000000
Server side library
-------------------
.. automodule:: sjrpc.server
:members:
:inherited-members:
sjrpc-master/doc/api/utils.rst 0000664 0000000 0000000 00000000116 13437517267 0016715 0 ustar 00root root 0000000 0000000
Utils
-----
.. automodule:: sjrpc.utils
:members:
:inherited-members:
sjrpc-master/doc/conf.py 0000664 0000000 0000000 00000014706 13437517267 0015563 0 ustar 00root root 0000000 0000000 # -*- coding: utf-8 -*-
#
# sjRpc documentation build configuration file, created by
# sphinx-quickstart on Mon Dec 20 15:00:55 2010.
#
# This file is execfile()d with the current directory set to its containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
sys.path.append(os.path.abspath('../'))
# -- General configuration -----------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinx.ext.todo',
'sphinx.ext.pngmath', 'sphinx.ext.ifconfig', 'sphinx.ext.autosummary']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'sjRpc'
copyright = u'2010, Smartjog'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
from sjrpc import __version__
version = __version__
# The full version, including alpha/beta/rc tags.
release = version
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# List of documents that shouldn't be included in the build.
#unused_docs = []
# List of directories, relative to source directory, that shouldn't be searched
# for source files.
exclude_trees = ['_build']
# The reST default role (used for this markup: `text`) to use for all documents.
#default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# -- Options for HTML output ---------------------------------------------------
# The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'.
html_theme = 'nature'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# " v documentation".
#html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# If false, no module index is generated.
#html_use_modindex = True
# If false, no index is generated.
#html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = ''
# Output file base name for HTML help builder.
htmlhelp_basename = 'sjRpcdoc'
# -- Options for LaTeX output --------------------------------------------------
# The paper size ('letter' or 'a4').
#latex_paper_size = 'letter'
# The font size ('10pt', '11pt' or '12pt').
#latex_font_size = '10pt'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
('index', 'sjRpc.tex', u'sjRpc Documentation',
u'Smartjog', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# Additional stuff for the LaTeX preamble.
#latex_preamble = ''
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# If false, no module index is generated.
#latex_use_modindex = True
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'http://docs.python.org/': None}
sjrpc-master/doc/examples.rst 0000664 0000000 0000000 00000000345 13437517267 0016626 0 ustar 00root root 0000000 0000000 Examples
========
Client example
--------------
.. literalinclude:: examples/client.py
:language: python
:linenos:
Server example
--------------
.. literalinclude:: examples/server.py
:language: python
:linenos:
sjrpc-master/doc/examples/ 0000775 0000000 0000000 00000000000 13437517267 0016072 5 ustar 00root root 0000000 0000000 sjrpc-master/doc/examples/client.py 0000664 0000000 0000000 00000001555 13437517267 0017730 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
'''
Simple sjRpc client example.
'''
from __future__ import absolute_import
import sys
import random
import threading
from sjrpc.core import RpcConnection
from sjrpc.utils import RpcHandler
class MyClientHandler(RpcHandler):
def client_random(self, min=0, max=100):
print 'Local call to client_random'
return random.randint(min, max)
# Get arguments from the command line:
if len(sys.argv) < 3:
print 'Usage: %s ' % sys.argv[0]
sys.exit(2)
address = sys.argv[1]
port = int(sys.argv[2])
# Create the rpc connection:
conn = RpcConnection.from_addr(address, port, handler=MyClientHandler())
# Run the connection mainloop in another thread:
t = threading.Thread(target=conn.run)
t.daemon = True
t.start()
print 'Random = %s' % (conn.call('proxy', 'client_random'), )
conn.shutdown()
sjrpc-master/doc/examples/client_perf.py 0000664 0000000 0000000 00000002242 13437517267 0020736 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
'''
sjRpc client example.
'''
from __future__ import absolute_import
import sys
import threading
import time
from sjrpc.core import RpcConnection
from sjrpc.core.protocols import TunnelProtocol
bytes_received = 0
def perf(socket):
global bytes_received
while True:
read = socket.recv(1024 * 1024)
bytes_received += len(read)
# Get arguments from the command line:
if len(sys.argv) < 3:
print 'Usage: %s ' % sys.argv[0]
sys.exit(2)
address = sys.argv[1]
port = int(sys.argv[2])
# Create the rpc connection:
conn = RpcConnection.from_addr_ssl(address, port)
print 'connected.'
# Run the connection mainloop in another thread:
threading.Thread(target=conn.run).start()
time.sleep(0.1)
label = conn.call('launch_perf')
tun = conn.register_protocol(label, TunnelProtocol)
t = threading.Thread(target=perf, args=(tun._socket,))
t.daemon = True
t.start()
last = 0
while True:
bytes_received, b = 0, bytes_received
bw = ((b / 1024.0 / 1024) / 2)
print '%.2f MB/s\t\t\t%.2f Mb/s\t\t\tDelta: %+0.2f' % (bw, bw * 8, (bw - last) * 8)
last = bw
time.sleep(2)
conn.shutdown()
sjrpc-master/doc/examples/client_tunnel.py 0000664 0000000 0000000 00000004604 13437517267 0021313 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
'''
sjRpc client example.
'''
from __future__ import absolute_import
import sys
import threading
import logging
import time
import fcntl
import os
import tty
from sjrpc.core import RpcConnection
from sjrpc.core.protocols import TunnelProtocol
#logger = logging.getLogger()
#logger.setLevel(logging.DEBUG)
#fmt = logging.Formatter('%(levelname)s %(name)s %(message)s')
#handler = logging.StreamHandler()
#handler.setFormatter(fmt)
#logger.addHandler(handler)
def iperf(socket):
f = open('/dev/null', 'w')
while True:
read = socket.recv(1024 * 64)
f.write(read)
class _FakeStdio(object):
'''
A fake standard io to make stdin/stdout look like a socket.
'''
def __init__(self):
tty.setraw(sys.stdin.fileno())
def recv(self, size):
return sys.stdin.read(size)
def send(self, data):
sys.stdout.write(data)
sys.stdout.flush()
return len(data)
def fileno(self):
return sys.stdin.fileno()
def setblocking(self, blocking):
if not blocking:
# Disable blocking mode on stdin:
current_flags = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
else:
raise NotImplementedError('Not implemented')
# Get arguments from the command line:
if len(sys.argv) < 3:
print 'Usage: %s ' % sys.argv[0]
sys.exit(2)
address = sys.argv[1]
port = int(sys.argv[2])
# Create the rpc connection:
conn = RpcConnection.from_addr(address, port)
print 'connected.'
# Run the connection mainloop in another thread:
threading.Thread(target=conn.run).start()
time.sleep(0.1)
# Create the tunnel on the remote server:
label = conn.call('launch_remote_console')
# Create a fake socket object wrapping stdin and stdout:
fake_stdio = _FakeStdio()
# Create the tunnel on the local side:
conn.register_protocol(label, TunnelProtocol, endpoint=fake_stdio)
#label = conn.call('iperf')
#tun = conn.register_protocol(label, TunnelProtocol)
#t = threading.Thread(target=iperf, args=(tun._socket,))
#t.daemon = True
#t.start()
#f = open('/dev/null', 'w')
#while True:
# read = tun._socket.recv(1024 * 64)
# f.write(read)
# Start the shell
rcode = conn.call('wait_for_pshell', _timeout=None)
print '\nremote process returned with code %s\n' % rcode
conn.shutdown()
sjrpc-master/doc/examples/client_vpn.py 0000664 0000000 0000000 00000002076 13437517267 0020612 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
'''
sjRpc client example.
'''
from __future__ import absolute_import
import sys
import threading
import logging
import time
from sjrpc.core import RpcConnection
from sjrpc.core.protocols import VpnProtocol
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
fmt = logging.Formatter('%(levelname)s %(name)s %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(fmt)
logger.addHandler(handler)
# Get arguments from the command line:
if len(sys.argv) < 3:
print 'Usage: %s ' % sys.argv[0]
sys.exit(2)
address = sys.argv[1]
port = int(sys.argv[2])
# Create the rpc connection:
conn = RpcConnection.from_addr(address, port)
print 'connected.'
# Run the connection mainloop in another thread:
threading.Thread(target=conn.run).start()
time.sleep(0.1)
# Create the tunnel on the remote server:
label = conn.call('launch_vpn')
# Create the vpn on the local side:
conn.register_protocol(label, VpnProtocol, tun_prefix='vpnc')
raw_input('Type enter to quit.')
print 'shutdown'
conn.shutdown()
sjrpc-master/doc/examples/server.py 0000664 0000000 0000000 00000001762 13437517267 0017760 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
'''
Simple sjRpc server.
'''
from __future__ import absolute_import
import sys
import random
from sjrpc.server import RpcServer
from sjrpc.utils import RpcHandler, pass_connection
class MyHandler(RpcHandler):
def random(self, min=0, max=100):
return random.randint(min, max)
@pass_connection
def proxy(self, conn, method, *args, **kwargs):
'''
Example of bidirectionnal RPC. When the peer call this method, the
server forward the call of the specified method the peer, and return
the value returned by the peer.
'''
return conn.call(method, *args, **kwargs)
# Get arguments from the command line:
if len(sys.argv) < 3:
print 'Usage: %s ' % sys.argv[0]
sys.exit(2)
address = sys.argv[1]
port = int(sys.argv[2])
# Create the server instance:
rpcserver = RpcServer.from_addr(address, port, conn_kw=dict(handler=MyHandler()))
rpcserver.loop.debug = True
rpcserver.run()
sjrpc-master/doc/examples/server_perf.py 0000664 0000000 0000000 00000003775 13437517267 0021002 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
'''
Server mode using Gevent, using tunnelling feature to give a remote shell.
'''
from __future__ import absolute_import
import os
import sys
import fcntl
from sjrpc.server import RpcServer, SSLRpcServer
from sjrpc.utils import RpcHandler, pass_connection
from sjrpc.core.protocols import TunnelProtocol
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
fmt = logging.Formatter('%(levelname)s %(name)s %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(fmt)
logger.addHandler(handler)
class _FakeDevZeroSocket(object):
def __init__(self):
self._devzero = open('/dev/zero', 'rb')
def recv(self, size):
return '\0' * size#self._devzero.read(size)
def send(self, data):
pass # Cannot write on /dev/zero
def fileno(self):
return self._devzero.fileno()
def setblocking(self, blocking):
if not blocking:
# Disable blocking mode on devzero:
current_flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFL)
fcntl.fcntl(self.fileno(), fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
pass
else:
raise NotImplementedError('Not implemented')
class PerfHandler(RpcHandler):
@pass_connection
def launch_perf(self, connection):
devzero = _FakeDevZeroSocket()
connection.register_protocol(2, TunnelProtocol, endpoint=devzero)
return 2
# Get arguments from the command line:
if len(sys.argv) < 3:
print 'Usage: %s ' % sys.argv[0]
sys.exit(2)
address = sys.argv[1]
port = int(sys.argv[2])
# Create the server instance:
import pyev
loop = pyev.default_loop(flags=~ pyev.EVBACKEND_EPOLL)
rpcserver = SSLRpcServer.from_addr(address, port, loop=loop, certfile='/var/lib/cc-server/ccserver.crt', keyfile='/var/lib/cc-server/ccserver.key', conn_kw=dict(handler=PerfHandler()))
#rpcserver = RpcServer.from_addr(address, port, loop=loop, conn_kw=dict(handler=PerfHandler()))
rpcserver.run()
sjrpc-master/doc/examples/server_tunnel.py 0000664 0000000 0000000 00000006541 13437517267 0021345 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
'''
Server mode using Gevent, using tunnelling feature to give a remote shell.
'''
from __future__ import absolute_import
import os
import sys
import pty
import time
import subprocess
import threading
import logging
import fcntl
import socket
from sjrpc.server import RpcServer
from sjrpc.utils import RpcHandler, pass_connection
from sjrpc.core.protocols import TunnelProtocol
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
fmt = logging.Formatter('%(levelname)s %(name)s %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(fmt)
logger.addHandler(handler)
class _FakePtySocket(object):
'''
A fake socket object wrapping a :class:`subprocess.Popen` object standard
input/output.
'''
def __init__(self, fd):
self._fd = fd
def recv(self, size):
return os.read(self._fd, size)
def send(self, data):
return os.write(self._fd, data)
def fileno(self):
return self._fd
def setblocking(self, blocking):
if not blocking:
# Disable blocking mode on stdin:
current_flags = fcntl.fcntl(self._fd, fcntl.F_GETFL)
fcntl.fcntl(self._fd, fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
pass
else:
raise NotImplementedError('Not implemented')
class _FakeDevZeroSocket(object):
def __init__(self):
self._devzero = open('/dev/zero', 'rb')
def recv(self, size):
return self._devzero.read(size)
def send(self, data):
pass # Cannot write on /dev/zero
def fileno(self):
return self._devzero.fileno()
def setblocking(self, blocking):
if not blocking:
# Disable blocking mode on devzero:
current_flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFL)
fcntl.fcntl(self.fileno(), fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
pass
else:
raise NotImplementedError('Not implemented')
class RemoteShellHandler(RpcHandler):
@pass_connection
def launch_remote_console(self, connection, shell='/bin/bash'):
master, slave = pty.openpty()
endpoint = _FakePtySocket(master)
# Create the tunnel on the label 1
connection.register_protocol(1, TunnelProtocol, endpoint=endpoint)
# Launch the shell process:
self.pshell = subprocess.Popen(shell, stdout=slave, bufsize=0,
stdin=slave, cwd='/',
stderr=subprocess.STDOUT)
return 1 # Return the tunnel id.
#@pass_connection
#def iperf(self, connection):
# devzero = _FakeDevZeroSocket()
# tun = connection.register_protocol(2, TunnelProtocol, endpoint=devzero)
# return 2
@pass_connection
def wait_for_pshell(self, conn):
#conn.get_protocol(1)._send_get(1024 * 1024)
return self.pshell.wait()
def hup(self, *args, **kwargs):
self.pshell.kill()
# Get arguments from the command line:
if len(sys.argv) < 3:
print 'Usage: %s ' % sys.argv[0]
sys.exit(2)
address = sys.argv[1]
port = int(sys.argv[2])
# Create the server instance:
import pyev
loop = pyev.default_loop(flags=~ pyev.EVBACKEND_EPOLL)
rpcserver = RpcServer.from_addr(address, port, loop=loop, conn_kw=dict(handler=RemoteShellHandler(), on_disconnect='hup'))
rpcserver.run()
sjrpc-master/doc/examples/server_vpn.py 0000664 0000000 0000000 00000002132 13437517267 0020633 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
'''
Server mode using Gevent, using tunnelling feature to give a remote shell.
'''
from __future__ import absolute_import
import sys
import logging
from sjrpc.server import RpcServer
from sjrpc.utils import RpcHandler, pass_connection
from sjrpc.core.protocols import VpnProtocol
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
fmt = logging.Formatter('%(levelname)s %(name)s %(message)s')
handler = logging.StreamHandler()
handler.setFormatter(fmt)
logger.addHandler(handler)
class VpnHandler(RpcHandler):
@pass_connection
def launch_vpn(self, connection, name='vpns'):
connection.register_protocol(1, VpnProtocol, tun_prefix=name)
return 1
# Get arguments from the command line:
if len(sys.argv) < 3:
print 'Usage: %s ' % sys.argv[0]
sys.exit(2)
address = sys.argv[1]
port = int(sys.argv[2])
# Create the server instance:
import pyev
loop = pyev.default_loop(flags=~ pyev.EVBACKEND_EPOLL)
rpcserver = RpcServer.from_addr(address, port, loop=loop, conn_kw=dict(handler=VpnHandler()))
rpcserver.run()
sjrpc-master/doc/fundamentals.rst 0000664 0000000 0000000 00000012314 13437517267 0017470 0 ustar 00root root 0000000 0000000 Fundamentals
============
sjRpc is a RPC (Remote Procedure Call) library written with Python. It was
originally created for the needs of CloudControl project but can be reused
in other projects.
Features
--------
* **Bidirectional:** remote function can be called by both side of the
connection, so a client can connect to a server and export functions to it.
* **Fully event based**: sjRpc takes profits of asynchronous io, allowing a
server to handle thousands of client connection with a reasonable memory
usage.
* **Multiplexed:** sjRpc can run many "protocols" on the same connection,
read more about protocols in `Multiplexing & protocols`_ section.
* **Fallback mode:** for compatibility with olders sjRpc.
Basic usage
-----------
Server side, create the handler
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The handler is a simple object with a dict interface (getitem) responsible of
the association between the remote-function name and the locally executed
callable. The :class:`sjrpc.utils.Handler` class help you to define
an handle with a simple class extending this one::
>>> class MyHandler(RpcHandler):
... def random(self, min=0, max=100):
... return random.randint(min, max)
...
>>> handler = MyHandler()
But if you want to use a standard dictionnary, this is exactly the same::
>>> handler = {'random': lambda min, max: random.randint(min, max)}
Server side, create the :class:`~sjrpc.core.RpcServer` instance
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The last thing to do on the server side is to launch the server itself::
>>> from sjrpc.server import RpcServer
>>> serv = RpcServer.from_addr('127.0.0.1', 1337, conn_kw=dict(handler=handler))
>>> serv.run()
.. note::
`conn_args` and `conn_kw` are the arguments which are automatically passed
to each client :class:`~sjrpc.core.RpcConnection` instanciation. In this
example, we just pass a default handler.
Client side, just connect !
^^^^^^^^^^^^^^^^^^^^^^^^^^^
For a basic client usage, the only thing you have to do is to create the
:class:`~sjrpc.core.RpcConnection` instance to the server and start the
:class:`~sjrpc.core.RpcConnection` main-loop in another thread to keep the
hands on your term::
>>> conn = RpcConnection.from_addr('127.0.0.1', 1337)
>>> threading.Thread(target=conn.run).start()
>>> print conn.call('random')
42
You can also use a proxy to simplify remote calls::
>>> from sjrpc.utils import ConnectionProxy
>>> proxy = ConnectionProxy(conn)
>>> print proxy.random(min=42, max=1000)
587
Proxy will also restore built-in exceptions embedded in
:class:`~sjrpc.core.RpcError`.
Multiplexing & protocols
------------------------
Protocol of sjRpc use channels to multiplexe many protocols on the
same connection. Each channel is binded to a protocol handler on each side, and
each channel have a label which identify it on the wire. Actually, the sjRpc
protocol look like this::
+------------+------------------------+---------------------------------+
| Label (2) | Payload size (4) | Payload (variable) |
+------------+------------------------+---------------------------------+
For the moment, two types of protocols are implemented:
* **Rpc:** protocol which allow to make remote function call easily.
* **Tunnel:** protocol which allow to tunnel a socket through the sjRpc
connection.
To register new protocols, you can use the :meth:`register_protocol` on
:class:`RpcConnection` instances, like this::
>>> from sjrpc.core.protocols import RpcProtocol, TunnelProtocol
>>> my_tunnel = myconn.register_protocol(1, TunnelProtocol)
do the same on other side, then
>>> mytunnel.send('ehlo !!')
>>> anwser = mytunnel.recv()
You can also use shortcuts::
>>> my_tunnel = myconn.create_tunnel(label=12)
or
>>> my_tunnel = myconn.create_tunnel() # Bind to the first free label
The same shortcut is available for rpc with :meth:`create_rpc`.
Default rpc, aka Rpc0
---------------------
Rpc0 is an RpcProtocol binded by default with the "0" label. You can't
unregister this protocol, and you can't disable this feature. Rpc0 is used
internally by sjRpc to share special messages, and for compatibility with
olders sjRpc (see `Fallback mode`_ below).
However, you can use this rpc like any other, with your own rpc handler.
Fallback mode
-------------
The fallback mode makes the *sjrpc >= 14* compatible with olders version where
channels and protocols doesn't exists. Old and new protocols are very similar,
the new one just add a label field on each frame which allow multiplexing, but
the rpc protocol itself has not changed, and is always using json messaging.
The fallback mode is enabled by default when a connection is established (you
can disable this behavior with the `disable_fallback` parameter of the
:class:`RpcConnection` class constructor) and is automatically disabled when
a special rpc message "capabilities" is received.
.. note::
When the fallback mode is enabled, you can't use another protocols than the
default rpc. All calls to :meth:`RpcConnection.register_protocol`,
:meth:`RpcConnection.unregister_protocol`, :meth:`RpcConnection.create_rpc`
and :meth:`RpcConnection.create_tunnel`, will fail with a
:exc:`FallbackModeEnabledError`.
sjrpc-master/doc/index.rst 0000664 0000000 0000000 00000000364 13437517267 0016120 0 ustar 00root root 0000000 0000000 Welcome to sjRpc's documentation!
=================================
Contents:
.. toctree::
:maxdepth: 3
fundamentals
examples
api/index
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
sjrpc-master/setup.py 0000664 0000000 0000000 00000002504 13437517267 0015222 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
# This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
from setuptools import setup
import os
ldesc = open(os.path.join(os.path.dirname(__file__), 'README')).read()
# Retrieval of version:
from sjrpc import __version__
setup(
name='sjrpc',
version=__version__,
description='Smartjog RPC',
long_description=ldesc,
author='Antoine Millet',
author_email='antoine.millet@smartjog.com',
license='GPL2',
packages=['sjrpc', 'sjrpc.server', 'sjrpc.core', 'sjrpc.core.protocols',
'sjrpc.utils'],
classifiers=[
'Intended Audience :: Developers',
'Operating System :: Unix',
'Programming Language :: Python',
],
)
sjrpc-master/sjrpc/ 0000775 0000000 0000000 00000000000 13437517267 0014630 5 ustar 00root root 0000000 0000000 sjrpc-master/sjrpc/__init__.py 0000664 0000000 0000000 00000002561 13437517267 0016745 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
""" This module implements a Remote Procedure Call system using socket objects
as transport. The main feature of this RPC is to be bidirectionnal: both
client and server can serve remote procedure for its peer.
Features:
* Multiplexed: you can run tunnels or many RPC through the same socket.
* Gevent: sjRpc is compatible with gevent and use it for server feature.
* Bidirectionnal: each peer can call remote function on other.
The library is separated into three parts:
* **core** package contains all common classes.
* **server** package contains all the server side related stuff.
* **utils** package contains some helpers used in previous libraries.
"""
__version__ = '20'
sjrpc-master/sjrpc/core/ 0000775 0000000 0000000 00000000000 13437517267 0015560 5 ustar 00root root 0000000 0000000 sjrpc-master/sjrpc/core/__init__.py 0000664 0000000 0000000 00000002705 13437517267 0017675 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
""" The **core** package contains all classes used by both client mode and
server mode.
This packages export following function/classes:
* :class:`RpcConnection` which handle the connection to the peer.
* :class:`RpcCaller`, :class:`ThreadedRpcCaller`: which are used internally
by rpc protocol to execute handlers' functions.
* :class:`RpcError` which is the exception raised by rpc to wrap remote-side
exceptions.
* :class:`AsyncWatcher` which allow to make asynchronous calls.
It also contains a sub-package containing protocols: :mod:`core.protocols`.
"""
from __future__ import absolute_import
from sjrpc.core.rpcconnection import *
from sjrpc.core.exceptions import *
from sjrpc.core.async import *
__all__ = ('RpcConnection', 'RpcError', 'AsyncWatcher')
sjrpc-master/sjrpc/core/async.py 0000664 0000000 0000000 00000010740 13437517267 0017251 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
import datetime
from Queue import Queue, Empty
#TODO: implement iterable interface
#TODO: implement priority responses
class AsyncWatcher(object):
""" Asynchronous call watcher -- Handle asynchronous calls and responses.
Usage example::
>>> watcher = AsyncWatcher()
>>> watcher.register(conn1, 'ping')
>>> watcher.register(conn2, 'ping')
>>> responses = watcher.wait() # Wait for all responses
"""
def __init__(self):
self._response_queue = Queue()
self._expected_responses = {}
def _get_in_queue(self, *args, **kwargs):
""" Get an item in response queue and return it with the time waited to
its comming. Accept all arguments accepted by :meth:`Queue.get`
"""
start = datetime.datetime.now()
item = self._response_queue.get(*args, **kwargs)
dt = datetime.datetime.now() - start
dt = dt.seconds + dt.days * 86400 + dt.microseconds * 0.000001
return (dt, item)
#
# Public API:
#
@property
def remains(self):
""" Remaining expected responses.
"""
return len(self._expected_responses)
def register(self, rpc, method, *args, **kwargs):
""" Call specified method on specified rpc with specified arguments
and register the call on this object.
:param rpc: the rpc on which made the call
:param method: the rpc method to call
:param \*args, \*\*kwargs: the arguments to pass to the rpc method
"""
msg_id = rpc.async_call(self._response_queue, method, *args, **kwargs)
self._expected_responses[msg_id] = kwargs.pop('_data', None)
# Data are stored directly on the AsyncWatcher instead of RpcProtocol.
def wait(self, timeout=None, max_wait=None, raise_timeout=False):
""" Wait responses for calls registered on this :class:`AsyncWatcher`
instance and return them.
:param timeout: timeout value or None to disable timeout (default: None)
:param max_wait: maximum waited responses
:return: received messages in a list
.. note::
You can repeat call to this method on a single
:class:`AsyncWatcher`. For example, if you want to process the
first arrived message, then wait others for a minute, you can do::
>>> msgs = watcher.wait(max_wait=1)
>>> process_speedy(msgs[0])
>>> for msg in watcher.wait(timeout=60):
>>> process(msg)
"""
return list(self.iter(timeout, max_wait, raise_timeout))
def iter(self, timeout=None, max_wait=None, raise_timeout=False):
""" Work like :meth:`AsyncWatcher.wait` but return an iterable, instead
of a list object.
.. note::
Responses are yielded by the iterable when they are received, so you
can start the processing of response before receiving all.
"""
while self.remains:
try:
dt, response = self._get_in_queue(timeout=timeout)
except Empty:
break
if timeout is not None:
timeout -= dt
response['data'] = self._expected_responses[response['id']]
del self._expected_responses[response['id']]
yield response
# Check for max_wait:
if max_wait is not None:
max_wait -= 1
if not max_wait:
break
if timeout is not None and raise_timeout:
for expected, data in self._expected_responses.iteritems():
yield {'error': {'exception': 'TimeoutError',
'message': 'Response not received on time'},
'data': data,
'id': expected}
self._expected_responses = {}
sjrpc-master/sjrpc/core/exceptions.py 0000664 0000000 0000000 00000003234 13437517267 0020315 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
""" Contains sjRpc exceptions.
"""
class RpcError(Exception):
""" Exception raised by caller when an error occurs while execution of
the remote procedure call.
"""
def __init__(self, exception, message):
self.exception = exception
self.message = message
def __str__(self):
return '%s' % self.message
class RpcConnectionError(Exception):
""" Base class for RpcConnection errors.
"""
class SocketError(RpcConnectionError):
""" Exception used internally to raise a socket fault.
"""
class NoFreeLabelError(RpcConnectionError):
""" Exception raised when no more free labels are available for protocol
allocation.
"""
class FallbackModeEnabledError(RpcConnectionError):
""" Exception raised when a feature which is not compatible with fallback
mode is used.
"""
class AlreadyAnsweredError(Exception):
""" Exception raised when a deferred response has already been answered.
""" sjrpc-master/sjrpc/core/protocols/ 0000775 0000000 0000000 00000000000 13437517267 0017604 5 ustar 00root root 0000000 0000000 sjrpc-master/sjrpc/core/protocols/__init__.py 0000664 0000000 0000000 00000007440 13437517267 0021722 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
""" Protocols can be binded on a specific label of a :class:`RpcConnection`
(see `Multiplexing & protocols`_ for more informations).
Following protocols are provided with standard distribution of sjRpc, but you
can create yours if you needs:
- :class:`RpcProtocol`: the standard rpc protocol
- :class:`TunnelProtocol`: a protocol which allow to tunnel a socket traffic
through the sjRpc connection
- :class:`VpnProtocol` (experimental): like :class:`TunnelProtocol` but work
with a network interface instead of a socket.
"""
from __future__ import absolute_import
import logging
class Protocol(object):
""" Base class for all protocols.
"""
def __init__(self, connection, label, logger=None):
self._connection = connection
self._label = label
if logger is None:
logger_name = '%s.protos.%s' % (connection.logger.name, label)
self.logger = logging.getLogger(logger_name)
else:
self.logger = logger
@property
def connection(self):
""" The :class:`~sjrpc.core.RpcConnection` instance which handle
this protocol.
"""
return self._connection
@property
def label(self):
""" The label binded to this protocol in
the :class:`~sjrpc.core.RpcConnection` instance.
"""
return self._label
def create_watcher(self, watcher_class, **kwargs):
""" Create a new pyev watcher for this protocol and return it.
"""
kwargs['loop'] = self._connection.loop
watcher = watcher_class(**kwargs)
return watcher
def send(self, payload):
""" Send a message through the sjRpc connection.
"""
self._connection.send(self._label, payload)
def start_message(self, payload_size):
""" Start a new incoming message receipt. By default, this method create
a new empty buffer on self._incoming_buf variable.
"""
self._incoming_buf = ''
def feed(self, data):
""" Handle a chunk of data received from the tunnel.
By default, this method append this chunk to the end of the incoming
buffer created by default by :meth:`start_message` method.
"""
self._incoming_buf += data
def end_of_message(self):
""" Signal the end of the currently received message.
With default :meth:`start_message` and :meth:`feed` methods, it's a
good place to implements the processing of the incoming message.
"""
pass
def handle_control(self, payload):
""" Handle a control message received from the Rpc0.
"""
pass
def shutdown(self):
""" Shutdown the protocol.
By default this method unregister the protocol from the connection.
"""
if self._label != 0: # Label 0 protocol can't be shutdown.
self._connection.unregister_protocol(self._label)
from sjrpc.core.protocols.rpc import RpcProtocol
from sjrpc.core.protocols.tunnel import TunnelProtocol
from sjrpc.core.protocols.vpn import VpnProtocol
__all__ = ['Protocol', 'RpcProtocol', 'TunnelProtocol', 'VpnProtocol']
sjrpc-master/sjrpc/core/protocols/rpc.py 0000664 0000000 0000000 00000043223 13437517267 0020746 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
from __future__ import absolute_import
import json
from uuid import uuid4
from threading import Event, Thread
from sjrpc.core.exceptions import RpcError, RpcConnectionError, AlreadyAnsweredError
from sjrpc.core.protocols import Protocol
__all__ = ['RpcProtocol']
ERRMSG_RPCERR = ('Unable to send reply to the peer: %s (this error is usualy '
'raised when connection is lost while handler function '
'execution)')
class DeferredResponse(object):
""" Allow to defer an RPC response.
"""
def __init__(self, rpc, msg_id):
self._rpc = rpc
self._msg_id = msg_id
self._already_answered = False
def error(self, message, error):
""" Return an error.
"""
if not self._already_answered:
self._already_answered = True
self._rpc.error(self._msg_id, message=message, error=error)
else:
raise AlreadyAnsweredError('You already answered to this request')
def error_exception(self, exception):
""" Return an error using an exception.
"""
self.error(str(exception), error=exception.__class__.__name__)
def response(self, value):
""" Return a value.
"""
if not self._already_answered:
self._already_answered = True
self._rpc.response(self._msg_id, returned=value)
else:
raise AlreadyAnsweredError('You already answered to this request')
def __enter__(self):
return self.response
def __exit__(self, exc_type, exc_value, tb):
if exc_type is not None:
self.error(str(exc_value), exc_type.__name__)
return True
class RpcCaller(object):
""" A caller execute a callable (function, method, class which implement the
:meth:`__call__` method...) in a particular context (threaded or
"timeouted" for example), and return the result (or the exception) to
the peer through it :class:`RpcConnection` object.
"""
def __init__(self, request, protocol, func):
self._request = request
self._protocol = protocol
self._func = func
def run(self):
""" Run the callable and return the result (or the exception) to
the peer.
"""
msg_id = self._request['id']
args = self._request['args']
kwargs = self._request['kwargs']
if getattr(self._func, '__pass_rpc__', False):
args.insert(0, self._protocol)
if getattr(self._func, '__pass_connection__', False):
args.insert(0, self._protocol.connection)
if getattr(self._func, '__pass_deferred_response__', False):
args.insert(0, DeferredResponse(self._protocol, msg_id))
try:
returned = self._func(*args, **kwargs)
except Exception as err:
try:
self._protocol.error(msg_id, message=str(err),
error=err.__class__.__name__)
except RpcError as err:
self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
else:
if not isinstance(returned, DeferredResponse):
try:
self._protocol.response(msg_id, returned=returned)
except RpcError as err:
self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
def start(self):
""" Start execution of the callable, the most of time, it just call
:meth:`run` method.
"""
self.run()
class ThreadedRpcCaller(RpcCaller):
""" A caller which make the call into a separated thread.
"""
def __init__(self, *args, **kwargs):
super(ThreadedRpcCaller, self).__init__(*args, **kwargs)
self._thread = Thread(target=self.run)
self._thread.name = 'Processing of call: %s' % self._request['id']
self._thread.daemon = True
def start(self):
self._thread.start()
class RpcProtocol(Protocol):
""" The standard protocol used to do RPC request/responses.
:param connection: the connection serving this :class:`RpcProtocol`
:param label: the label of this :class:`RpcProtocol` instance
:param handler: command handler to bind by default
:param on_disconnect: callback called when client disconnect, could be the
name of an RPC handler or a callable
:param timeout: global command timeout
"""
REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}}
RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None}
SPECIAL_MESSAGE = {'special': None}
def __init__(self, connection, label, handler=None, on_disconnect=None,
timeout=30, *args, **kwargs):
super(RpcProtocol, self).__init__(connection, label, *args, **kwargs)
self._handler = handler
self._on_disconnect = on_disconnect
self._call_timeout = timeout
# Store all calls sent to the peer. Key is the id of the call and value
# the event to raise when call is finished.
self._calls = {}
def _dispatch(self, message):
""" Dispatch a received message according to it type.
:param message: the received message to dispatch
.. note::
When dispatching a call, the responsability of response is delegated
to the caller, except for the case where the method isn't found on
the handler.
"""
self.logger.debug('Received: %s', message)
if set(RpcProtocol.REQUEST_MESSAGE) <= set(message):
self._handle_request(message)
elif set(RpcProtocol.RESPONSE_MESSAGE) <= set(message):
self._handle_response(message)
elif set(RpcProtocol.SPECIAL_MESSAGE) <= set(message):
self._handle_special(message)
else:
self.logger.debug('Malformed message received: %s', message)
def _handle_request(self, message):
""" Handle an inbound request message.
"""
if self._handler is not None:
try:
func = self._handler[message['method']]
except KeyError:
self.error(message['id'], 'NameError',
"remote name '%s' is not defined" % message['method'])
else:
if getattr(func, '__threaded__', True):
ThreadedRpcCaller(message, self, func).start()
else:
RpcCaller(message, self, func).start()
else:
self.error(message['id'], 'NameError',
"remote name '%s' is not defined" % message['method'])
def _handle_response(self, message):
""" Handle an inbound response message
"""
# Handle response message from the peer:
call = self._calls.get(message['id'])
if call is not None:
# Call exists in call list
if message['error'] is None:
call['return'] = message['return']
else:
call['error'] = message['error']
if 'event' in call:
# Release the call if its synchronous:
call['event'].set()
elif 'async_cb' in call:
call['async_cb'](call_id=message['id'],
response=call.get('return'),
error=call.get('error'))
else:
# Else, it's an asynchonous call, we need to push the answer
# on the queue:
queue = call['queue']
del call['queue']
queue.put(call)
# Finally, delete the call from the current running call list:
del self._calls[message['id']]
def _handle_special(self, message):
""" Handle special message.
"""
if message['special'] == 'capabilities':
if self._label == 0:
self._connection.set_capabilities(message.get('capabilities'))
else:
self.logger.warning('Capabilities message received by non-zero'
' rpc.')
elif message['special'] == 'protoctl':
label = message.get('label')
if label is None:
self.logger.warning('Protoctl message received without label.')
return
try:
proto = self._connection.get_protocol(label)
except KeyError:
self.logger.debug('Protoctl message received for unknown label')
else:
try:
proto.handle_control(message.get('type'),
message.get('payload'))
except Exception as err:
self.logger.error('Protoctl handler failed for proto %s: ',
'%s' % err)
def _send(self, message):
""" Low level method to encode a message in json, calculate it size, and
place result on outbound buffer.
.. warning::
Message must be a jsonisable structure.
"""
self.logger.debug('Sending: %s', message)
json_msg = json.dumps(message)
try:
self._connection.send(self._label, payload=json_msg)
except RpcConnectionError as err:
raise RpcError('RpcConnectionError', str(err))
def _send_call(self, method_name, *args, **kwargs):
""" Create the message for the call and push them to the outbound queue.
:param method_name: the name of the method to call on the peer
:param *args: arguments to pass to the remote method
:param **kwargs: keyword arguments to pass to the remote method
:return: the generated id for the request
:rtype: :class:`str` object
"""
msg = RpcProtocol.REQUEST_MESSAGE.copy()
msg['method'] = method_name
msg['args'] = args
msg['kwargs'] = kwargs
msg['id'] = str(uuid4())
self._send(msg)
return msg['id']
def _send_response(self, msg_id, returned=None, error=None):
""" Low level method to send a response message to the peer.
:param msg_id: the id of the replied message
:param returned: returned data
:type returned: returned data or None if errors have been raised
:param error: raised errors
:type error: raised error or None if no error have been raised
"""
msg = RpcProtocol.RESPONSE_MESSAGE.copy()
msg['id'] = msg_id
msg['return'] = returned
msg['error'] = error
self._send(msg)
#
# Public methods:
#
def end_of_message(self):
""" When the message is fully received, decode the json and dispatch it.
"""
msg = json.loads(self._incoming_buf)
self._dispatch(msg)
def shutdown(self):
""" Handle the shutdown process of this protocol instance:
* Release all waiting calls with a "Connection reset by peer" error.
* Execute the on_disconnect callback.
"""
super(RpcProtocol, self).shutdown()
# Release all waiting calls from this rpc:
for cid in self._calls.keys():
err = {'exception': 'RpcError',
'message': 'Connection reset by peer'}
self._handle_response({'id': cid, 'return': None, 'error': err})
# Execute on_disconnect callback:
if self._on_disconnect is None:
return
callback = None
if not callable(self._on_disconnect):
if self._handler is not None:
try:
callback = self._handler[self._on_disconnect]
except KeyError:
self.logger.warn('Shutdown callback not found in current '
'rpc attached handler, ignoring')
callback = None
else:
self.logger.warn('Shutdown callback specified but no handler '
'binded on rpc, ignoring')
callback = None
else:
callback = self._on_disconnect
if callback is not None:
try:
callback(self._connection)
except Exception as err:
self.logger.debug('Error while execution of shutdown '
'callback: %s', err)
def get_handler(self):
""" Return the handler binded to the :class:`RpcConnection`.
:return: binded handler
"""
return self._handler
def set_handler(self, handler):
""" Define a new handler for this connection.
:param handler: the new handler to define.
"""
self._handler = handler
def send_special(self, special, **kwargs):
""" Send a "special" message to the peer.
:param special: type of the special message
:param \*\*kwargs: fields of the special message
"""
msg = {'special': special}
msg.update(kwargs)
self._send(msg)
def response(self, msg_id, returned):
""" Send an "return" response to the peer.
:param msg_id: the id of the replied message
:param returned: the value returned by the function
.. warning::
In case of raised error, use the :meth:`error` method instead of
this one.
"""
self._send_response(msg_id, returned=returned)
def error(self, msg_id, error, message, traceback=None):
""" Send an error response to the peer.
:param msg_id: the id of the replied message
:param error: the name of the raised exception
:param message: human readable error for the exception
"""
err = {'exception': error, 'message': message}
self._send_response(msg_id, error=err)
def pop_call(self, msg_id, default=None):
""" Pop a call from the expected call dict.
:param msg_id: id of the call to pop
:param default: value to get if no call is found with the provided id
"""
return self._calls.pop(msg_id, default)
def call(self, method_name, *args, **kwargs):
""" Make a new remote call on the peer.
:param method_name: the method to call on the peer
:param \*args: the arguments for the call
:param \*\*kwargs: the keyword arguments for the call
:return: the data returned by the peer for the call
.. note::
This function will block until the peer response is received. You
can also specify a ``timeout`` argument to specify a number of
seconds before to raise an :exc:`CallTimeout` exception if the peer
didnt respond.
"""
timeout = kwargs.pop('_timeout', self._call_timeout)
# Send the call to the peer:
msg_id = self._send_call(method_name, *args, **kwargs)
# Create an item in calls dict with reference to the event to raise:
call = {'return': None, 'error': None, 'event': Event(), 'id': msg_id}
self._calls[msg_id] = call
# Wait for the response:
call['event'].wait(timeout)
# Check if timeout occured:
if not call['event'].is_set():
raise RpcError('TimeoutError', 'remote method timeout')
# Check if error occured while execution:
if call['error'] is not None:
raise RpcError(call['error']['exception'],
call['error']['message'])
return call['return']
def async_call(self, queue, method_name, *args, **kwargs):
""" Make a new asynchronous call on the peer.
:param queue: the queue where to push the response when received
:param method_name: the method to call on the peer
:param _data: local data to give back on the response
:param \*args: the arguments for the call
:param \*\*kwargs: the keyword arguments for the call
:return: the message id of the call
"""
# Extract _data from argument:
data = kwargs.pop('_data', None)
# Send the call to the peer:
msg_id = self._send_call(method_name, *args, **kwargs)
# Register the call but don't wait for the response:
self._calls[msg_id] = {'id': msg_id, 'async': True,
'data': data, 'queue': queue}
return msg_id
def async_call_cb(self, callback, method_name, *args, **kwargs):
""" Make an asynchronous call on the peer and callback when call
is done.
:param callable callback: callback must have the following named
arguments:
* **call_id** (*int*) - unique identifier for the async call made
* **response** - response as returned by the remote function
* **error** (*dict*) - error indicator in case remote call throwed exception
:param str method_name: the method to call on the peer
:param \*args: the arguments for the call
:param \*\*kwargs: the keyword arguments for the call
:return: the message id of the call
"""
msg_id = self._send_call(method_name, *args, **kwargs)
self._calls[msg_id] = {'id': msg_id, 'async_cb': callback}
return msg_id
sjrpc-master/sjrpc/core/protocols/tunnel.py 0000664 0000000 0000000 00000016415 13437517267 0021472 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
from __future__ import absolute_import
import threading
import socket
from sjrpc.core.protocols import Protocol
from sjrpc.core.exceptions import RpcConnectionError
import pyev
__all__ = ['TunnelProtocol']
class TunnelProtocol(Protocol):
GET_SIZE = 1024 * 1024 # 1MB
DEFAULT_GET_SIZE = GET_SIZE
def __init__(self, *args, **kwargs):
endpoint = kwargs.pop('endpoint', None)
autostart = kwargs.pop('autostart', True)
self._cb_on_shutdown = kwargs.pop('on_shutdown', None)
self._close_endpoint_on_shutdown = kwargs.pop('close_endpoint_on_shutdown', True)
super(TunnelProtocol, self).__init__(*args, **kwargs)
if endpoint is None:
self._endpoint, self._socket = socket.socketpair()
else:
self._endpoint = endpoint
self._socket = None
self._is_shutdown = threading.Event()
self._is_started = False
self._from_tun_to_endpoint_buf = ''
self._asked = 0 # Data asked to the peer
self._ok_to_send = 0 # Data I can send to the peer
# Set the endpoint as non-blocking socket:
self._endpoint.setblocking(False)
# Create watcher to handle data coming from the endpoint:
props = dict(fd=self._endpoint, events=pyev.EV_READ,
callback=self._handle_from_endpoint)
self._endpoint_reader = self.create_watcher(pyev.Io, **props)
# Create watcher to handle data going to the endpoint:
props = dict(fd=self._endpoint, events=pyev.EV_WRITE,
callback=self._handle_from_tunnel)
self._endpoint_writer = self.create_watcher(pyev.Io, **props)
if autostart:
self.start()
def _handle_from_endpoint(self, watcher, revents):
""" Handle data coming from the endpoint socket and push it through the
sjRpc tunnel.
"""
# Abort if peer don't want more data:
if self._ok_to_send <= 0:
watcher.stop()
return
try:
read = self._endpoint.recv(self._ok_to_send)
except (IOError, socket.error) as err:
if err.errno in self.connection.NONBLOCKING_ERRORS:
return
else:
self.shutdown()
return
# Empty read means the connection has been closed on other side:
if not read:
self.shutdown()
return
self._ok_to_send -= len(read)
self.send(read)
if not self._ok_to_send:
watcher.stop()
def _handle_from_tunnel(self, watcher, revent):
""" Handle writing of the data already received from the tunnel and stored
into the incoming buffer.
Data are writed only when the endpoint socket is ready to write.
This method is also responsible of asking more data to the peer when
the incoming buffer is empty.
"""
try:
sent = self._endpoint.send(self._from_tun_to_endpoint_buf)
except (IOError, socket.error) as err:
if err.errno in self.connection.NONBLOCKING_ERRORS:
return
else:
self.shutdown()
return
self._from_tun_to_endpoint_buf = self._from_tun_to_endpoint_buf[sent:]
self._asked -= sent
if self._asked < TunnelProtocol.GET_SIZE * 2:
self._send_get(TunnelProtocol.GET_SIZE)
if not self._from_tun_to_endpoint_buf:
watcher.stop()
def _send_get(self, size):
""" Send to the peer, the right to write data on the tunnel.
"""
self._connection.rpc.send_special('protoctl', label=self._label,
type='get', payload=dict(size=size))
self._asked += size
def cb_default_on_shutdown(self, tun):
""" Action to do on the endpoint when the connection is shutdown.
"""
pass
#
# Public methods:
#
@property
def socket(self):
return self._socket
@property
def endpoint(self):
""" Return the tunnel endpoint.
"""
return self._endpoint
def start(self):
""" Start the reader on the endpoint.
"""
if not self._is_shutdown.is_set():
self._connection.rpc.send_special('protoctl', label=self._label, type='ready')
self._send_get(TunnelProtocol.DEFAULT_GET_SIZE)
self._is_started = True
self._endpoint_reader.start()
def shutdown(self):
""" Shutdown the tunnel.
"""
if not self._is_shutdown.is_set():
self._is_shutdown.set()
# Stop watchers:
self._endpoint_reader.stop()
self._endpoint_writer.stop()
# Send the end of stream message to the peer:
try:
self._connection.rpc.send_special('protoctl', label=self._label, type='eos')
except RpcConnectionError:
pass # Ignore errors when connection is down.
# Execute the callback:
if self._cb_on_shutdown is not None:
self._cb_on_shutdown(self)
if self._close_endpoint_on_shutdown:
self.endpoint.close()
super(TunnelProtocol, self).shutdown()
def wait_shutdown(self, timeout=None, sig_timeout=1):
""" Wait until tunnel has been shutdown.
Return True if shutdown has been triggered, or False if timeout occurs.
"""
if timeout is None: # Workaround to avoid signal to be masked by Python
# while waiting the event.
while True:
if self._is_shutdown.wait(sig_timeout):
return True
else:
return self._is_shutdown.wait(timeout)
def end_of_message(self):
""" Handle inbound data from the :class:`RpcConnection` peer and place
it on the incoming buffer.
"""
self._from_tun_to_endpoint_buf += self._incoming_buf
self._endpoint_writer.start()
def handle_control(self, control_type, payload):
if not self._is_shutdown.is_set():
if control_type == 'get':
size = payload.get('size', TunnelProtocol.DEFAULT_GET_SIZE)
self._ok_to_send += size
if self._is_started:
self._endpoint_reader.start()
elif control_type == 'ready':
self._send_get(TunnelProtocol.GET_SIZE)
if self._is_started:
self._endpoint_reader.start()
elif control_type == 'eos':
self.logger.debug('Received EOS event')
self.shutdown()
sjrpc-master/sjrpc/core/protocols/vpn.py 0000664 0000000 0000000 00000005617 13437517267 0020772 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
from __future__ import absolute_import
import os
import struct
import fcntl
from sjrpc.core.protocols import TunnelProtocol
__all__ = ['VpnProtocol']
class _FileSocket(object):
""" A fake Socket interface for files.
"""
def __init__(self, file_):
self.file = file_
def recv(self, size):
self.file.read(size)
def send(self, data):
self.file.write(data)
return len(data)
def fileno(self):
return self.file.fileno()
def setblocking(self, blocking):
if not blocking:
# Disable blocking mode on devzero:
current_flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFL)
fcntl.fcntl(self.fileno(), fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
else:
raise NotImplementedError('Not implemented')
class VpnProtocol(TunnelProtocol):
""" A VPN protocol which spawn a network interface and tunnel all its
traffic through the sjRpc tunnel.
:param inherited: All options (except endpoint) are inherited from
:class:`TunnelProtocol`.
:param tun_prefix: the prefix name of the spawned interface
:param tun_mode: which can be VpnProtocol.IFF_TUN or VpnProtocol.IFF_TAP
The user starting the sjRpc process must have the CAP_NET_ADMIN capability,
by default, only root have this capability.
.. warning::
This protocol is a proof of concept and should not be used in production
software.
"""
# Taken from linux/if_tun.h:
TUNSETIFF = 0x400454ca
IFF_TUN = 0x0001
IFF_TAP = 0x0002
# Default values:
DEFAULT_TUN_PREFIX = 'tun'
DEFAULT_TUN_MODE = IFF_TAP
def __init__(self, *args, **kwargs):
tun_prefix = kwargs.pop('tun_prefix', VpnProtocol.DEFAULT_TUN_PREFIX)
tun_mode = kwargs.pop('tun_mode', VpnProtocol.DEFAULT_TUN_MODE)
ftun = open('/dev/net/tun', 'r+b')
# Create the tunnel and get its final name:
create_tun_arg = struct.pack('16sH', tun_prefix + '%d', tun_mode)
tun = fcntl.ioctl(ftun, VpnProtocol.TUNSETIFF, create_tun_arg)
self.tun_name = tun[:16].rstrip('\0')
kwargs['endpoint'] = _FileSocket(ftun)
super(VpnProtocol, self).__init__(*args, **kwargs)
sjrpc-master/sjrpc/core/rpcconnection.py 0000664 0000000 0000000 00000043015 13437517267 0021001 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
""" This module contains the RpcConnection class, more informations about this
class are located in it docstring.
"""
from __future__ import absolute_import
import ssl
import errno
import struct
import socket
import logging
import threading
from sjrpc.core.protocols import Protocol, RpcProtocol, TunnelProtocol
from sjrpc.core.exceptions import (RpcConnectionError, NoFreeLabelError,
FallbackModeEnabledError, SocketError,
RpcError)
import pyev
class RpcConnection(object):
""" This class manage a single peer connection.
You can wrap an existing socket with the default constructor::
>>> conn = RpcConnection(mysocket)
Or create a new socket automatically with from_addr constructor::
>>> conn = RpcConnection.from_addr(host, port)
If you prefer SSL connection, you can use the from_addr_ssl constructor::
>>> conn = RpcConnection.from_addr_ssl(host, port)
By default, an :class:`RpcProtocol` is created on label 0, you can access
to this rpc through the `conn.rpc` shortcut::
>>> conn.rpc.call('ping')
Also, the connection object expose :meth:`call` and :meth:`async_call`
method from default rpc, so you can use it directly on connection::
>>> conn.call('ping') # Equivalent to the exemple before
.. seealso::
You can read the :ref:`Default rpc, aka Rpc0` section to know more about
the default rpc
:param sock: the socket object of this newly created :class:`RpcConnection`
:param fallback_timeout: set the maximum time to wait the "capabilities"
message before to send anything to the peer. 0 to wait indefinitly, -1
to disable the fallback mode.
:param \*args,\*\*kwargs: arguments to pass to the default rpc protocol
automatically registered on label 0.
"""
NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK)
NONBLOCKING_SSL_ERRORS = (ssl.SSL_ERROR_WANT_READ, )
MESSAGE_HEADER = '!HL'
MESSAGE_HEADER_FALLBACK = '!L'
MAX_LABEL = 2 ** 16
DEFAULT_RECV_SIZE = 1024 * 64 # 64kB
SHORTCUTS_MAINRPC = ('call', 'async_call')
def __init__(self, sock, loop=None, enable_tcp_keepalive=False,
fallback_timeout=1.0, *args, **kwargs):
# Sock of this connection:
self._sock = sock
sock.setblocking(False)
# Initialization requires fallback mode disabled:
self.fallback = False
# Get the pyev loop:
if loop is None:
self.loop = pyev.Loop()
else:
self.loop = loop
# Activate TCP keepalive on the connection:
if enable_tcp_keepalive:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Watcher list:
self._watchers = set()
# Socket inbound/outbound buffers:
self._inbound_buffer = ''
self._outbound_buffer = ''
if fallback_timeout == -1:
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER)
else:
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
self._proto_receiving = None
# Initialize main read/write watchers:
self._sock_reader = self.create_watcher(pyev.Io,
fd=self._sock,
events=pyev.EV_READ,
callback=self._reader)
self._sock_reader.start()
self._sock_writer = self.create_watcher(pyev.Io,
fd=self._sock,
events=pyev.EV_WRITE,
callback=self._writer)
# OpenSSL requires that the SAME string object must be used on the
# next send retry when all the entire buffer can't be write on the
# socket. This attribute store this string, or None if the previous
# send has been a success:
self._writer_last_try_buf = None
# Is the RpcConnection connected to its peer:
self._connected = True
# "Need to send" loop signal:
self._need_to_send = self.create_watcher(pyev.Async,
callback=self._cb_need_to_send)
self._need_to_send.start()
# Setup logging facility:
self.logger = logging.getLogger('sjrpc.%s' % self.getpeername())
# Protocols registered on this connection:
self._protocols = {}
self.register_protocol(0, RpcProtocol, *args, **kwargs)
# Create shortcuts to the main rpc (protocol 0) for convenience:
for name in RpcConnection.SHORTCUTS_MAINRPC:
setattr(self, name, getattr(self.get_protocol(0), name))
self._event_fallback = threading.Event()
# By default, enter in fallback mode, no label, all frames are
# redirected on Rpc0:
if fallback_timeout != -1:
self.fallback = True
self.create_watcher(pyev.Timer, after=fallback_timeout, repeat=0,
callback=self._cb_set_event_fallback).start()
# Set the event fallback just to send the capability message:
self._event_fallback.set()
# Send our capabilities to the peer:
self._remote_capabilities = None
self._send_capabilities()
# And clear it after:
self._event_fallback.clear()
@classmethod
def from_addr(cls, addr, port, conn_timeout=30.0, *args, **kwargs):
""" Construct the instance of :class:`RpcConnection` without providing
the :class:`socket` object.
Socket is automatically created and passed to the standard constructor
before to return the new instance.
:param addr: the target ip address
:param port: the target port
:param conn_timeout: the connection operation timeout
:param \*args,\*\*kwargs: extra argument to pass to the constructor (see
constructor doctring)
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(conn_timeout)
sock.connect((addr, port))
sock.setblocking(True)
return cls(sock, *args, **kwargs)
@classmethod
def from_addr_ssl(cls, addr, port, cert=None,
conn_timeout=30, *args, **kwargs):
""" Construct :class:`RpcConnection` instance like :meth:`from_addr`,
but enable ssl on socket.
:param cert: ssl client certificate or None for ssl without certificat
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(conn_timeout)
sock.connect((addr, port))
sock.setblocking(True)
req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED
sock = ssl.wrap_socket(sock, certfile=cert, cert_reqs=req,
ssl_version=ssl.PROTOCOL_TLSv1)
return cls(sock, *args, **kwargs)
def __repr__(self):
return ''
def __hash__(self):
return self._sock.__hash__()
def __nonzero__(self):
return self._connected
def _send_capabilities(self):
""" Send capabilities to the peer, only work in fallback mode for
compatibility with old sjRpc.
Send a special message through the Rpc0 with these fields:
- special: 'capabilities'
- capabilities: {'version': REMOTE_VERSION, 'capabilities': []}
"""
from sjrpc import __version__
cap = {'version': __version__, 'capabilities': ['rpc', 'tunnel']}
rpc0 = self.get_protocol(0)
rpc0.send_special('capabilities', capabilities=cap)
def _reader(self, watcher, revents):
""" Read socket and feed inbound buffer. Launch the dispatch when all
data are buffered.
"""
# Read all possible data from the socket:
try:
buf = self._sock.recv(RpcConnection.DEFAULT_RECV_SIZE)
except socket.error as err:
if (isinstance(err, socket.error) and err.errno
in RpcConnection.NONBLOCKING_ERRORS):
return
elif (isinstance(err, ssl.SSLError) and err.errno
in RpcConnection.NONBLOCKING_SSL_ERRORS):
return
else:
# If any fatal error is triggered, the connection is shutdown:
self.shutdown()
self.logger.debug('Unexpected socket error: %s', err)
return
# For ssl socket, we need to fetch buffered ssl-side data:
if isinstance(self._sock, ssl.SSLSocket):
pending = self._sock.pending()
if pending:
buf += self._sock.recv(pending)
# Empty data on non-blocking socket means that the connection
# has been closed:
if not buf:
self.shutdown()
self._remains -= len(buf)
self._inbound_buffer += buf
# Process and dispatch all inbound data:
while self._remains <= 0:
self._dispatch()
def _dispatch(self):
""" Read the inbound_buffer, parse and dispatch messages.
"""
if self._proto_receiving is None:
if self.fallback:
size = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
buf = self._inbound_buffer[:size]
self._inbound_buffer = self._inbound_buffer[size:]
pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER_FALLBACK, buf)[0]
label = 0
else:
size = struct.calcsize(RpcConnection.MESSAGE_HEADER)
buf = self._inbound_buffer[:size]
self._inbound_buffer = self._inbound_buffer[size:]
label, pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER, buf)
# Get the registered protocol for the specified label:
self._proto_receiving = self._protocols.get(label)
# If frame's label is not binded to a protocol, we create a
# dummy protocol to consume the payload:
if self._proto_receiving is None:
self._proto_receiving = Protocol(self, -1)
self._proto_receiving.start_message(pl_size)
self._remains += pl_size
else:
size = len(self._inbound_buffer) + self._remains
buf = self._inbound_buffer[:size]
self._inbound_buffer = self._inbound_buffer[size:]
self._proto_receiving.feed(buf)
if self._remains <= 0:
self._proto_receiving.end_of_message()
if self.fallback:
self._remains += struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
else:
self._remains += struct.calcsize(RpcConnection.MESSAGE_HEADER)
self._proto_receiving = None
def _writer(self, watcher, revent):
""" Write data on the socket.
"""
if self._outbound_buffer:
try:
if self.fallback:
sent = self._sock.send(self._outbound_buffer[:4096])
else:
if self._writer_last_try_buf is None:
self._writer_last_try_buf = self._outbound_buffer[:1024 * 64]
sent = self._sock.send(self._writer_last_try_buf)
except socket.error as err:
if (isinstance(err, socket.error) and err.errno in RpcConnection.NONBLOCKING_ERRORS):
return
elif (isinstance(err, ssl.SSLError) and err.errno in RpcConnection.NONBLOCKING_SSL_ERRORS):
return
errmsg = 'Fatal error while sending through socket: %s' % err
self.logger.error(errmsg)
self.shutdown()
return
else:
self._writer_last_try_buf = None
self._outbound_buffer = self._outbound_buffer[sent:]
if not self._outbound_buffer:
watcher.stop()
def _cb_need_to_send(self, watcher, revents):
self._sock_writer.start()
def _cb_set_event_fallback(self, watcher, revents):
self._event_fallback.set()
#
# Public API
#
@property
def rpc(self):
return self.get_protocol(0)
def run(self):
""" Main loop execution.
"""
self.loop.start()
def create_watcher(self, watcher_class, **kwargs):
""" Create a new pyev watcher and return it.
"""
kwargs['loop'] = self.loop
watcher = watcher_class(**kwargs)
self._watchers.add(watcher)
return watcher
def send(self, label, payload):
""" Low level method to send a message through the socket, generally
used by protocols.
"""
self._event_fallback.wait()
if not self._connected:
raise RpcConnectionError('Not connected to the peer')
size = len(payload)
if self.fallback:
header = struct.pack(RpcConnection.MESSAGE_HEADER_FALLBACK, size)
else:
header = struct.pack(RpcConnection.MESSAGE_HEADER, label, size)
self._outbound_buffer += header + payload
self._need_to_send.send()
def set_capabilities(self, capabilities):
""" Set capabilities of remote host (and disable fallback mode).
Should be called by Rpc0 when the peer send its capabilities message.
"""
self._remote_capabilities = capabilities
self.fallback = False
self._event_fallback.set()
def register_protocol(self, label, protocol_class, *args, **kwargs):
""" Register a new protocol for the specified label.
"""
if self.fallback:
raise FallbackModeEnabledError('Fallback mode is not compatible '
'with protocols')
if label is None:
for label in xrange(0, RpcConnection.MAX_LABEL):
if label not in self._protocols:
break
else:
raise NoFreeLabelError('No more label number are availables')
if label in self._protocols:
raise KeyError('A protocol is already registered for this label')
elif not isinstance(label, int):
raise ValueError('Label must be an integer')
self._protocols[label] = protocol_class(self, label, *args, **kwargs)
return self._protocols[label]
def unregister_protocol(self, label):
""" Unregister the specified protocol label for this connection.
"""
if self.fallback:
raise FallbackModeEnabledError('Fallback mode is not compatible '
'with protocols')
if label in self._protocols and label != 0:
del self._protocols[label]
else:
raise KeyError('No protocol registered for this label')
def create_rpc(self, label=None, *args, **kwargs):
""" Shortcut which can be used to create rpc protocols.
"""
return self.register_protocol(label, RpcProtocol, *args, **kwargs)
def create_tunnel(self, label=None, *args, **kwargs):
""" Shortcut which can be used to create tunnels protocols.
"""
return self.register_protocol(label, TunnelProtocol, *args, **kwargs)
def get_protocol(self, label):
""" Get the protocol registered for specified label.
"""
proto = self._protocols.get(label)
if proto is None:
raise KeyError('No protocol registered for this label')
return proto
def shutdown(self):
""" Shutdown this connection.
"""
# Ignore repeated calls to shutdown:
if not self._connected:
return
# Unset connected state:
self._connected = False
self.logger.info('Connection shutdown.')
# Shutdown each registered watcher:
for watcher in self._watchers:
watcher.stop()
# Shutdown each registered protocols:
for proto in self._protocols.values():
try:
proto.shutdown()
except RpcError:
# FIXME use errno on RpcError
pass
# Close the connection socket:
try:
try:
self._sock.shutdown(socket.SHUT_RDWR)
except socket.error as e:
if e.errno != errno.ENOTCONN:
raise
self._sock.close()
except socket.error as err:
#self.logger.warn('Error while socket close: %s', err)
pass
def get_fd(self):
""" Get the file descriptor of the socket managed by this connection.
:return: the file descriptor number of the socket
"""
try:
return self._sock.fileno()
except socket.error:
return None
def getpeername(self):
""" Get the peer name.
:return: string representing the peer name
"""
return '%s:%s' % self._sock.getpeername()
sjrpc-master/sjrpc/server/ 0000775 0000000 0000000 00000000000 13437517267 0016136 5 ustar 00root root 0000000 0000000 sjrpc-master/sjrpc/server/__init__.py 0000664 0000000 0000000 00000001503 13437517267 0020246 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
from __future__ import absolute_import
from sjrpc.server.simple import (RpcServer, SSLRpcServer)
__all__ = ('RpcServer', 'SSLRpcServer')
sjrpc-master/sjrpc/server/simple.py 0000664 0000000 0000000 00000011303 13437517267 0017777 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
from __future__ import absolute_import
import gc
import ssl
import time
import errno
import socket
import select
import weakref
import logging
import threading
from sjrpc.core import RpcConnection
import pyev
class RpcServer(object):
""" Base class for all RpcServer classes.
"""
DEFAULT_TCP_BACKLOG = 30
NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK)
def __init__(self, sock, loop=None, conn_args=(), conn_kw={}):
self._clients = set()
self.logger = logging.getLogger('sjrpc')
if loop is None:
self.loop = pyev.Loop()
else:
self.loop = loop
self._conn_args = conn_args
self._conn_kw = conn_kw
self._sock = sock
self._sock_watcher = self.loop.io(self._sock, pyev.EV_READ, self._handle)
self._sock_watcher.start()
@classmethod
def from_addr(cls, addr, port, tcp_backlog=None, *args, **kwargs):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr, port))
sock.setblocking(False)
if tcp_backlog is None:
tcp_backlog = RpcServer.DEFAULT_TCP_BACKLOG
sock.listen(tcp_backlog)
return cls(sock, *args, **kwargs)
def _wrap(self, sock):
""" Wrap the socket into the :class:`RpcConnection` and return it.
"""
return RpcConnection(sock, self.loop, *self._conn_args, **self._conn_kw)
def _handle(self, watcher, revents):
# Collect offline connections:
self._collect_connection()
while True:
try:
sock, address = self._sock.accept()
except socket.error as err:
if err.errno in self.NONBLOCKING_ERRORS:
return
if err.errno in (errno.ECONNABORTED, errno.EPROTO, errno.EINTR):
pass
else:
self.logger.warn('Error while accepting client: %s',
err.strerror)
return
self.logger.info('New incoming connection from %s:%s', *address)
conn = self._wrap(sock)
self.register(conn)
def _collect_connection(self):
""" Collect the offlines connections.
"""
for conn in self._clients.copy():
if not conn:
self._clients.remove(conn)
#
# Public methods:
#
def register(self, conn):
""" Register a new connection on this server.
:param conn: the connection to register.
"""
self._clients.add(conn)
def unregister(self, conn, shutdown=False):
""" Unregister the specified client from this server.
If shutdown is specified, client is shutdown before to be unregistered.
:param conn: the connection to unregister
:param shutdown: shutdown or not the connection before to register
"""
if conn in self._clients:
if shutdown:
conn.shutdown()
def run(self):
""" Run the :class:`RpcServer`.
"""
self.loop.start()
def shutdown(self):
""" Shutdown the :class:`RpcServer` instance.
"""
self.logger.info('Shutdown requested')
self._sock_watcher.stop()
for client in self._clients.copy():
self.unregister(client, shutdown=True)
self.loop.stop(pyev.EVBREAK_ALL)
class SSLRpcServer(RpcServer):
""" SSL version of the RpcServer.
"""
def __init__(self, sock, certfile, keyfile, loop=None, *args, **kwargs):
super(SSLRpcServer, self).__init__(sock, loop, *args, **kwargs)
self._certfile = certfile
self._keyfile = keyfile
def _wrap(self, sock):
sock = ssl.wrap_socket(sock, server_side=True,
keyfile=self._keyfile,
certfile=self._certfile,
ssl_version=ssl.PROTOCOL_TLSv1,
do_handshake_on_connect=True)
return super(SSLRpcServer, self)._wrap(sock)
sjrpc-master/sjrpc/utils/ 0000775 0000000 0000000 00000000000 13437517267 0015770 5 ustar 00root root 0000000 0000000 sjrpc-master/sjrpc/utils/__init__.py 0000664 0000000 0000000 00000001605 13437517267 0020103 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
from sjrpc.utils.proxies import *
from sjrpc.utils.handlers import *
__all__ = ('ConnectionProxy', 'RpcHandler', 'threadless', 'pure',
'pass_connection', 'pass_rpc', 'pass_deferred_response')
sjrpc-master/sjrpc/utils/handlers.py 0000664 0000000 0000000 00000004403 13437517267 0020143 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
class RpcHandler(object):
""" Basic RPC functions handler.
Derive this call in your handler and define some methods. All the defined
methods (including privates) are available to your peer.
"""
def __getitem__(self, name):
if hasattr(self, name):
return getattr(self, name)
else:
raise KeyError(name)
#
# Decorators:
#
def threadless(func):
""" Function handler decorator -- don't spawn a new thread when function is
called.
"""
func.__threaded__ = False
return func
def pure(func):
""" Function handler decorator -- the function is a pure fonction, caller
will not pass :class:`RpcConnection` object as first call parameters.
.. deprecated:: 14
This decorator is useless since the default behavior have change. You
can use :func:`pass_connection` decorator to do the opposite.
This function is kept for compatibility only, and will be removed latter.
"""
return func
def pass_connection(func):
""" Function handler decorator -- pass on first parameter the connection
which called this function.
"""
func.__pass_connection__ = True
return func
def pass_rpc(func):
""" Function handler decorator -- pass on first (or after connection) the
rpc protocol which called this function.
"""
func.__pass_rpc__ = True
return func
def pass_deferred_response(func):
""" Function handler decorator -- pass on first argument a deferred reponse
object.
"""
func.__pass_deferred_response__ = True
return func
sjrpc-master/sjrpc/utils/proxies.py 0000664 0000000 0000000 00000003732 13437517267 0020040 0 ustar 00root root 0000000 0000000 # This file is part of CloudControl.
#
# CloudControl is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# CloudControl is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with CloudControl. If not, see .
from __future__ import absolute_import
from sjrpc.core.exceptions import RpcError
class ConnectionProxy(object):
""" Create a new call proxy for the connection passed in arguments.
Usage example:
>>> proxy = ConnectionProxy(conn)
>>> ret = proxy.existing_function()
If exception is raised by the function, the proxy tries to get the
exception class in *__builtins__* to re-raise it. If exception class
is not found in *__builtins__*, the exception raised is the
original :exc:`RpcError`:
>>> proxy.unknown_function()
[Traceback]
NameError: remote name 'unknown_function' is not defined
>>> proxy['myfunc']() # You can also use this syntax
...
"""
def __init__(self, connection):
self.connection = connection
def __getattr__(self, name):
def func(*args, **kwargs):
try:
returned = self.connection.call(name, *args, **kwargs)
except RpcError as err:
expt = getattr(__builtins__, err.exception)
if expt is not None:
raise expt(err.message)
else:
raise err
else:
return returned
return func
def __getitem__(self, name):
return self.__getattr__(name)