pax_global_header 0000666 0000000 0000000 00000000064 12040012356 0014503 g ustar 00root root 0000000 0000000 52 comment=ee1705e29558b47bc61a187897c514bcb080cf68
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/ 0000775 0000000 0000000 00000000000 12040012356 0017503 5 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/.gitignore 0000664 0000000 0000000 00000000051 12040012356 0021467 0 ustar 00root root 0000000 0000000 *.pyc
doc/_build/*
*.swp
*.log
test_*.py
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/CHANGELOG 0000664 0000000 0000000 00000000000 12040012356 0020703 0 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/COPYING 0000664 0000000 0000000 00000016743 12040012356 0020551 0 ustar 00root root 0000000 0000000 GNU LESSER GENERAL PUBLIC LICENSE
Version 3, 29 June 2007
Copyright (C) 2007 Free Software Foundation, Inc.
Everyone is permitted to copy and distribute verbatim copies
of this license document, but changing it is not allowed.
This version of the GNU Lesser General Public License incorporates
the terms and conditions of version 3 of the GNU General Public
License, supplemented by the additional permissions listed below.
0. Additional Definitions.
As used herein, "this License" refers to version 3 of the GNU Lesser
General Public License, and the "GNU GPL" refers to version 3 of the GNU
General Public License.
"The Library" refers to a covered work governed by this License,
other than an Application or a Combined Work as defined below.
An "Application" is any work that makes use of an interface provided
by the Library, but which is not otherwise based on the Library.
Defining a subclass of a class defined by the Library is deemed a mode
of using an interface provided by the Library.
A "Combined Work" is a work produced by combining or linking an
Application with the Library. The particular version of the Library
with which the Combined Work was made is also called the "Linked
Version".
The "Minimal Corresponding Source" for a Combined Work means the
Corresponding Source for the Combined Work, excluding any source code
for portions of the Combined Work that, considered in isolation, are
based on the Application, and not on the Linked Version.
The "Corresponding Application Code" for a Combined Work means the
object code and/or source code for the Application, including any data
and utility programs needed for reproducing the Combined Work from the
Application, but excluding the System Libraries of the Combined Work.
1. Exception to Section 3 of the GNU GPL.
You may convey a covered work under sections 3 and 4 of this License
without being bound by section 3 of the GNU GPL.
2. Conveying Modified Versions.
If you modify a copy of the Library, and, in your modifications, a
facility refers to a function or data to be supplied by an Application
that uses the facility (other than as an argument passed when the
facility is invoked), then you may convey a copy of the modified
version:
a) under this License, provided that you make a good faith effort to
ensure that, in the event an Application does not supply the
function or data, the facility still operates, and performs
whatever part of its purpose remains meaningful, or
b) under the GNU GPL, with none of the additional permissions of
this License applicable to that copy.
3. Object Code Incorporating Material from Library Header Files.
The object code form of an Application may incorporate material from
a header file that is part of the Library. You may convey such object
code under terms of your choice, provided that, if the incorporated
material is not limited to numerical parameters, data structure
layouts and accessors, or small macros, inline functions and templates
(ten or fewer lines in length), you do both of the following:
a) Give prominent notice with each copy of the object code that the
Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the object code with a copy of the GNU GPL and this license
document.
4. Combined Works.
You may convey a Combined Work under terms of your choice that,
taken together, effectively do not restrict modification of the
portions of the Library contained in the Combined Work and reverse
engineering for debugging such modifications, if you also do each of
the following:
a) Give prominent notice with each copy of the Combined Work that
the Library is used in it and that the Library and its use are
covered by this License.
b) Accompany the Combined Work with a copy of the GNU GPL and this license
document.
c) For a Combined Work that displays copyright notices during
execution, include the copyright notice for the Library among
these notices, as well as a reference directing the user to the
copies of the GNU GPL and this license document.
d) Do one of the following:
0) Convey the Minimal Corresponding Source under the terms of this
License, and the Corresponding Application Code in a form
suitable for, and under terms that permit, the user to
recombine or relink the Application with a modified version of
the Linked Version to produce a modified Combined Work, in the
manner specified by section 6 of the GNU GPL for conveying
Corresponding Source.
1) Use a suitable shared library mechanism for linking with the
Library. A suitable mechanism is one that (a) uses at run time
a copy of the Library already present on the user's computer
system, and (b) will operate properly with a modified version
of the Library that is interface-compatible with the Linked
Version.
e) Provide Installation Information, but only if you would otherwise
be required to provide such information under section 6 of the
GNU GPL, and only to the extent that such information is
necessary to install and execute a modified version of the
Combined Work produced by recombining or relinking the
Application with a modified version of the Linked Version. (If
you use option 4d0, the Installation Information must accompany
the Minimal Corresponding Source and Corresponding Application
Code. If you use option 4d1, you must provide the Installation
Information in the manner specified by section 6 of the GNU GPL
for conveying Corresponding Source.)
5. Combined Libraries.
You may place library facilities that are a work based on the
Library side by side in a single library together with other library
facilities that are not Applications and are not covered by this
License, and convey such a combined library under terms of your
choice, if you do both of the following:
a) Accompany the combined library with a copy of the same work based
on the Library, uncombined with any other library facilities,
conveyed under the terms of this License.
b) Give prominent notice with the combined library that part of it
is a work based on the Library, and explaining where to find the
accompanying uncombined form of the same work.
6. Revised Versions of the GNU Lesser General Public License.
The Free Software Foundation may publish revised and/or new versions
of the GNU Lesser General Public License from time to time. Such new
versions will be similar in spirit to the present version, but may
differ in detail to address new problems or concerns.
Each version is given a distinguishing version number. If the
Library as you received it specifies that a certain numbered version
of the GNU Lesser General Public License "or any later version"
applies to it, you have the option of following the terms and
conditions either of that published version or of any later version
published by the Free Software Foundation. If the Library as you
received it does not specify a version number of the GNU Lesser
General Public License, you may choose any version of the GNU Lesser
General Public License ever published by the Free Software Foundation.
If the Library as you received it specifies that a proxy can decide
whether future versions of the GNU Lesser General Public License shall
apply, that proxy's public statement of acceptance of any version is
permanent authorization for you to choose that version for the
Library.
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/README 0000664 0000000 0000000 00000000000 12040012356 0020351 0 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/debian/ 0000775 0000000 0000000 00000000000 12040012356 0020725 5 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/debian/changelog 0000664 0000000 0000000 00000010636 12040012356 0022605 0 ustar 00root root 0000000 0000000 sjrpc (18~dev) unstable; urgency=low
* Development release
-- Antoine Millet Mon, 18 Jun 2012 14:34:23 +0200
sjrpc (17) unstable; urgency=low
* Changed license from GPLv2 to LGPLv3.
* Fixed bug in socket reader function.
-- Antoine Millet Mon, 21 Nov 2011 11:17:27 +0100
sjrpc (16) unstable; urgency=low
* Now process only one new connection by loop iteration, added tcp_backlog
option. (Fix bug with large amound of connection)
* Some minor improvement in exception management.
-- Antoine Millet Mon, 10 Oct 2011 10:36:50 +0200
sjrpc (15) unstable; urgency=low
* Fixed bug with libev and Popen (SIGCHLD race condition bug).
-- Antoine Millet Thu, 06 Oct 2011 15:04:16 +0200
sjrpc (14) unstable; urgency=low
* New major update, API breaks, please read the doc.
* New pluggable protocols (rpc, tunneling and vpn for now).
* Still compatible with olders versions.
* Use libev and pyev as event loop.
* Updated and complete documentation.
-- Antoine Millet Wed, 05 Oct 2011 18:46:04 +0200
sjrpc (13) unstable; urgency=low
* Fixed a bug with dropped connections and async requests.
-- Antoine Millet Mon, 02 May 2011 10:59:17 +0200
sjrpc (12) unstable; urgency=low
* [bug#4062] Added usage of SO_KEEPALIVE flag on sjrpc sockets.
-- Antoine Millet Mon, 21 Mar 2011 14:49:56 +0100
sjrpc (11) unstable; urgency=low
* [bug#3961] New argument appeared to set a global call timeout.
* Fixed bug with ssl error handling.
-- Antoine Millet Wed, 16 Feb 2011 18:38:12 +0100
sjrpc (10) unstable; urgency=low
* Logging of sjrpc now use only "debug" verbosity.
* Calls now raise an exception if RpcConnection is disconnected from its peer.
* Fixed logging calls with format string.
* Code cleanup with the help of pylint.
-- Antoine Millet Tue, 08 Feb 2011 13:24:56 +0100
sjrpc (9) unstable; urgency=low
* Fixed the uuid bug
* Fixed a lot of bugs (disconnection, HUP, shutdown)
* Added feature to embed data on asynchronous calls
* Added timeout feature on (remote) call method
-- Antoine Millet Wed, 02 Feb 2011 11:43:07 +0100
sjrpc (8) unstable; urgency=low
* Added a shutdown_client method on SimpleRpcServer manager to disconnect a
client before to calling the disconnection callback.
* Added getpeername method on RpcConnection class.
* Removed exception name in RpcError.__str__ result.
-- Antoine Millet Mon, 10 Jan 2011 18:33:36 +0100
sjrpc (7) unstable; urgency=low
* Added retrieval of a proxified method by getitem.
* Fixed bug with polling object.
-- Antoine Millet Thu, 30 Dec 2010 11:46:38 +0100
sjrpc (6-1) unstable; urgency=low
* Added connection timeout
-- Antoine Millet Mon, 27 Dec 2010 12:08:52 +0100
sjrpc (5-1) unstable; urgency=low
* Fixed bugs
-- Antoine Millet Wed, 22 Dec 2010 16:59:14 +0100
sjrpc (4-1) unstable; urgency=low
* Added a new constructor "from_sock" to SimpleRpcClient class.
* Added a new constructor "from_addr" to SimpleRpcClient class.
* /!\ API MODIFICATION /!\ The SimpleRpcClient constructor now takes
an RpcConnection object instead of directly a socket object.
* New method start on ConnectionManager allow to run the main loop in a new thread.
* Threaded caller now execute the thread as a daemon thread.
* Added a new constructor "from_addr" to RpcConnection class.
-- Antoine Millet Mon, 20 Dec 2010 14:04:06 +0100
sjrpc (3-1) unstable; urgency=low
* Fixed a bug that disallow to send a large amound of
data in a single message.
-- Antoine Millet Thu, 16 Dec 2010 12:47:05 +0100
sjrpc (2-1) unstable; urgency=low
* Added is_running method on ConnectionManager
* Added get_handler method on ConnectionManager
* Compatibility with SSL sockets (added SimpleSslRpcServer
to handle ssl clients)
-- Antoine Millet Mon, 13 Dec 2010 18:56:12 +0100
sjrpc (1-1) unstable; urgency=low
* Initial release.
-- Antoine Millet Mon, 06 Dec 2010 14:42:27 +0100
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/debian/compat 0000664 0000000 0000000 00000000002 12040012356 0022123 0 ustar 00root root 0000000 0000000 7
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/debian/control 0000664 0000000 0000000 00000000773 12040012356 0022337 0 ustar 00root root 0000000 0000000 Source: sjrpc
Section: python
Priority: optional
Maintainer: Antoine Millet
Build-Depends: debhelper (>= 7), python-central (>= 0.6), cdbs (>= 0.4.50), python-setuptools, python
XS-Python-Version: >= 2.6
Standards-Version: 3.9.1
Package: python-sjrpc
Architecture: all
Depends: ${misc:Depends}, ${python:Depends}, python-pyev
XB-Python-Version: ${python:Versions}
Description: SmartJog RPC
This package provides a Python module implementing a bidirectionnal
RPC using JSON.
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/debian/copyright 0000664 0000000 0000000 00000000064 12040012356 0022660 0 ustar 00root root 0000000 0000000 Files: *
Copyright: © 2010 Smartjog
License: LGPL3
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/debian/rules 0000775 0000000 0000000 00000000600 12040012356 0022001 0 ustar 00root root 0000000 0000000 #!/usr/bin/make -f
# -*- makefile -*-
DEB_PYTHON_SYSTEM=pycentral
# Debhelper must be included before python-distutils to use
# dh_python / dh_pycentral / dh_pysupport
include /usr/share/cdbs/1/rules/debhelper.mk
include /usr/share/cdbs/1/class/python-distutils.mk
PYTHON_PACKAGES := python-sjrpc
$(patsubst %,binary-install/%,$(PYTHON_PACKAGES))::
dh_pycentral -p$(cdbs_curpkg)
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/ 0000775 0000000 0000000 00000000000 12040012356 0020250 5 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/Makefile 0000664 0000000 0000000 00000006064 12040012356 0021716 0 ustar 00root root 0000000 0000000 # Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
.PHONY: help clean html dirhtml pickle json htmlhelp qthelp latex changes linkcheck doctest
help:
@echo "Please use \`make ' where is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
clean:
-rm -rf $(BUILDDIR)/*
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/sjRpc.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/sjRpc.qhc"
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make all-pdf' or \`make all-ps' in that directory to" \
"run these through (pdf)latex."
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/api/ 0000775 0000000 0000000 00000000000 12040012356 0021021 5 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/api/core.protocols.rst 0000664 0000000 0000000 00000000137 12040012356 0024527 0 ustar 00root root 0000000 0000000 Protocols
---------
.. automodule:: sjrpc.core.protocols
:members:
:inherited-members:
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/api/core.rst 0000664 0000000 0000000 00000000337 12040012356 0022506 0 ustar 00root root 0000000 0000000 Core library
------------
.. toctree::
:maxdepth: 2
:hidden:
core.protocols
.. seealso::
API documentation on the sub-package :doc:`core.protocols`.
.. automodule:: sjrpc.core
:members:
:inherited-members:
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/api/index.rst 0000664 0000000 0000000 00000000165 12040012356 0022664 0 ustar 00root root 0000000 0000000 sjRpc API
=========
.. automodule:: sjrpc
Sub-packages:
.. toctree::
:maxdepth: 3
core
server
utils
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/api/server.rst 0000664 0000000 0000000 00000000153 12040012356 0023060 0 ustar 00root root 0000000 0000000
Server side library
-------------------
.. automodule:: sjrpc.server
:members:
:inherited-members:
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/api/utils.rst 0000664 0000000 0000000 00000000116 12040012356 0022711 0 ustar 00root root 0000000 0000000
Utils
-----
.. automodule:: sjrpc.utils
:members:
:inherited-members:
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/conf.py 0000664 0000000 0000000 00000014706 12040012356 0021557 0 ustar 00root root 0000000 0000000 # -*- coding: utf-8 -*-
#
# sjRpc documentation build configuration file, created by
# sphinx-quickstart on Mon Dec 20 15:00:55 2010.
#
# This file is execfile()d with the current directory set to its containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
sys.path.append(os.path.abspath('../'))
# -- General configuration -----------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx', 'sphinx.ext.todo',
'sphinx.ext.pngmath', 'sphinx.ext.ifconfig', 'sphinx.ext.autosummary']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'sjRpc'
copyright = u'2010, Smartjog'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
from sjrpc import __version__
version = __version__
# The full version, including alpha/beta/rc tags.
release = version
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# List of documents that shouldn't be included in the build.
#unused_docs = []
# List of directories, relative to source directory, that shouldn't be searched
# for source files.
exclude_trees = ['_build']
# The reST default role (used for this markup: `text`) to use for all documents.
#default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# -- Options for HTML output ---------------------------------------------------
# The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'.
html_theme = 'nature'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# " v documentation".
#html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# If false, no module index is generated.
#html_use_modindex = True
# If false, no index is generated.
#html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = ''
# Output file base name for HTML help builder.
htmlhelp_basename = 'sjRpcdoc'
# -- Options for LaTeX output --------------------------------------------------
# The paper size ('letter' or 'a4').
#latex_paper_size = 'letter'
# The font size ('10pt', '11pt' or '12pt').
#latex_font_size = '10pt'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
('index', 'sjRpc.tex', u'sjRpc Documentation',
u'Smartjog', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# Additional stuff for the LaTeX preamble.
#latex_preamble = ''
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# If false, no module index is generated.
#latex_use_modindex = True
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'http://docs.python.org/': None}
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/examples.rst 0000664 0000000 0000000 00000000345 12040012356 0022622 0 ustar 00root root 0000000 0000000 Examples
========
Client example
--------------
.. literalinclude:: examples/client.py
:language: python
:linenos:
Server example
--------------
.. literalinclude:: examples/server.py
:language: python
:linenos:
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/examples/ 0000775 0000000 0000000 00000000000 12040012356 0022066 5 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/examples/client.py 0000664 0000000 0000000 00000001555 12040012356 0023724 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
'''
Simple sjRpc client example.
'''
from __future__ import absolute_import
import sys
import random
import threading
from sjrpc.core import RpcConnection
from sjrpc.utils import RpcHandler
class MyClientHandler(RpcHandler):
def client_random(self, min=0, max=100):
print 'Local call to client_random'
return random.randint(min, max)
# Get arguments from the command line:
if len(sys.argv) < 3:
print 'Usage: %s ' % sys.argv[0]
sys.exit(2)
address = sys.argv[1]
port = int(sys.argv[2])
# Create the rpc connection:
conn = RpcConnection.from_addr(address, port, handler=MyClientHandler())
# Run the connection mainloop in another thread:
t = threading.Thread(target=conn.run)
t.daemon = True
t.start()
print 'Random = %s' % (conn.call('proxy', 'client_random'), )
conn.shutdown()
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/examples/server.py 0000664 0000000 0000000 00000001762 12040012356 0023754 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
'''
Simple sjRpc server.
'''
from __future__ import absolute_import
import sys
import random
from sjrpc.server import RpcServer
from sjrpc.utils import RpcHandler, pass_connection
class MyHandler(RpcHandler):
def random(self, min=0, max=100):
return random.randint(min, max)
@pass_connection
def proxy(self, conn, method, *args, **kwargs):
'''
Example of bidirectionnal RPC. When the peer call this method, the
server forward the call of the specified method the peer, and return
the value returned by the peer.
'''
return conn.call(method, *args, **kwargs)
# Get arguments from the command line:
if len(sys.argv) < 3:
print 'Usage: %s ' % sys.argv[0]
sys.exit(2)
address = sys.argv[1]
port = int(sys.argv[2])
# Create the server instance:
rpcserver = RpcServer.from_addr(address, port, conn_kw=dict(handler=MyHandler()))
rpcserver.loop.debug = True
rpcserver.run()
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/fundamentals.rst 0000664 0000000 0000000 00000012314 12040012356 0023464 0 ustar 00root root 0000000 0000000 Fundamentals
============
sjRpc is a RPC (Remote Procedure Call) library written with Python. It was
originally created for the needs of CloudControl project but can be reused
in other projects.
Features
--------
* **Bidirectional:** remote function can be called by both side of the
connection, so a client can connect to a server and export functions to it.
* **Fully event based**: sjRpc takes profits of asynchronous io, allowing a
server to handle thousands of client connection with a reasonable memory
usage.
* **Multiplexed:** sjRpc can run many "protocols" on the same connection,
read more about protocols in `Multiplexing & protocols`_ section.
* **Fallback mode:** for compatibility with olders sjRpc.
Basic usage
-----------
Server side, create the handler
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The handler is a simple object with a dict interface (getitem) responsible of
the association between the remote-function name and the locally executed
callable. The :class:`sjrpc.utils.Handler` class help you to define
an handle with a simple class extending this one::
>>> class MyHandler(RpcHandler):
... def random(self, min=0, max=100):
... return random.randint(min, max)
...
>>> handler = MyHandler()
But if you want to use a standard dictionnary, this is exactly the same::
>>> handler = {'random': lambda min, max: random.randint(min, max)}
Server side, create the :class:`~sjrpc.core.RpcServer` instance
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
The last thing to do on the server side is to launch the server itself::
>>> from sjrpc.server import RpcServer
>>> serv = RpcServer.from_addr('127.0.0.1', 1337, conn_kw=dict(handler=handler))
>>> serv.run()
.. note::
`conn_args` and `conn_kw` are the arguments which are automatically passed
to each client :class:`~sjrpc.core.RpcConnection` instanciation. In this
example, we just pass a default handler.
Client side, just connect !
^^^^^^^^^^^^^^^^^^^^^^^^^^^
For a basic client usage, the only thing you have to do is to create the
:class:`~sjrpc.core.RpcConnection` instance to the server and start the
:class:`~sjrpc.core.RpcConnection` main-loop in another thread to keep the
hands on your term::
>>> conn = RpcConnection.from_addr('127.0.0.1', 1337)
>>> threading.Thread(target=conn.run).start()
>>> print conn.call('random')
42
You can also use a proxy to simplify remote calls::
>>> from sjrpc.utils import ConnectionProxy
>>> proxy = ConnectionProxy(conn)
>>> print proxy.random(min=42, max=1000)
587
Proxy will also restore built-in exceptions embedded in
:class:`~sjrpc.core.RpcError`.
Multiplexing & protocols
------------------------
Protocol of sjRpc use channels to multiplexe many protocols on the
same connection. Each channel is binded to a protocol handler on each side, and
each channel have a label which identify it on the wire. Actually, the sjRpc
protocol look like this::
+------------+------------------------+---------------------------------+
| Label (2) | Payload size (4) | Payload (variable) |
+------------+------------------------+---------------------------------+
For the moment, two types of protocols are implemented:
* **Rpc:** protocol which allow to make remote function call easily.
* **Tunnel:** protocol which allow to tunnel a socket through the sjRpc
connection.
To register new protocols, you can use the :meth:`register_protocol` on
:class:`RpcConnection` instances, like this::
>>> from sjrpc.core.protocols import RpcProtocol, TunnelProtocol
>>> my_tunnel = myconn.register_protocol(1, TunnelProtocol)
do the same on other side, then
>>> mytunnel.send('ehlo !!')
>>> anwser = mytunnel.recv()
You can also use shortcuts::
>>> my_tunnel = myconn.create_tunnel(label=12)
or
>>> my_tunnel = myconn.create_tunnel() # Bind to the first free label
The same shortcut is available for rpc with :meth:`create_rpc`.
Default rpc, aka Rpc0
---------------------
Rpc0 is an RpcProtocol binded by default with the "0" label. You can't
unregister this protocol, and you can't disable this feature. Rpc0 is used
internally by sjRpc to share special messages, and for compatibility with
olders sjRpc (see `Fallback mode`_ below).
However, you can use this rpc like any other, with your own rpc handler.
Fallback mode
-------------
The fallback mode makes the *sjrpc >= 14* compatible with olders version where
channels and protocols doesn't exists. Old and new protocols are very similar,
the new one just add a label field on each frame which allow multiplexing, but
the rpc protocol itself has not changed, and is always using json messaging.
The fallback mode is enabled by default when a connection is established (you
can disable this behavior with the `disable_fallback` parameter of the
:class:`RpcConnection` class constructor) and is automatically disabled when
a special rpc message "capabilities" is received.
.. note::
When the fallback mode is enabled, you can't use another protocols than the
default rpc. All calls to :meth:`RpcConnection.register_protocol`,
:meth:`RpcConnection.unregister_protocol`, :meth:`RpcConnection.create_rpc`
and :meth:`RpcConnection.create_tunnel`, will fail with a
:exc:`FallbackModeEnabledError`.
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/doc/index.rst 0000664 0000000 0000000 00000000364 12040012356 0022114 0 ustar 00root root 0000000 0000000 Welcome to sjRpc's documentation!
=================================
Contents:
.. toctree::
:maxdepth: 3
fundamentals
examples
api/index
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/setup.py 0000664 0000000 0000000 00000001166 12040012356 0021221 0 ustar 00root root 0000000 0000000 from setuptools import setup
import os
ldesc = open(os.path.join(os.path.dirname(__file__), 'README')).read()
# Retrieval of version:
from sjrpc import __version__
setup(
name='sjrpc',
version=__version__,
description='Smartjog RPC',
long_description=ldesc,
author='Antoine Millet',
author_email='antoine.millet@smartjog.com',
license='GPL2',
packages=['sjrpc', 'sjrpc.server', 'sjrpc.core', 'sjrpc.core.protocols',
'sjrpc.utils'],
classifiers=[
'Intended Audience :: Developers',
'Operating System :: Unix',
'Programming Language :: Python',
],
)
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/ 0000775 0000000 0000000 00000000000 12040012356 0020624 5 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/__init__.py 0000664 0000000 0000000 00000001270 12040012356 0022735 0 ustar 00root root 0000000 0000000
'''
This module implements a Remote Procedure Call system using socket objects as
transport. The main feature of this RPC is to be bidirectionnal: both client and
server can serve remote procedure for its peer.
Features:
* Multiplexed: you can run tunnels or many RPC through the same socket.
* Gevent: sjRpc is compatible with gevent and use it for server feature.
* Bidirectionnal: each peer can call remote function on other.
The library is separated into three parts:
* **core** package contains all common classes.
* **server** package contains all the server side related stuff.
* **utils** package contains some helpers used in previous libraries.
'''
__version__ = '18~dev'
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/core/ 0000775 0000000 0000000 00000000000 12040012356 0021554 5 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/core/__init__.py 0000664 0000000 0000000 00000001414 12040012356 0023665 0 ustar 00root root 0000000 0000000
'''
The **core** package contains all classes used by both client mode and server
mode.
This packages export following function/classes:
* :class:`RpcConnection` which handle the connection to the peer.
* :class:`RpcCaller`, :class:`ThreadedRpcCaller`: which are used internally
by rpc protocol to execute handlers' functions.
* :class:`RpcError` which is the exception raised by rpc to wrap remote-side
exceptions.
* :class:`AsyncWatcher` which allow to make asynchronous calls.
It also contains a sub-package containing protocols: :mod:`core.protocols`.
'''
from __future__ import absolute_import
from sjrpc.core.rpcconnection import *
from sjrpc.core.exceptions import *
from sjrpc.core.async import *
__all__ = ('RpcConnection', 'RpcError', 'AsyncWatcher')
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/core/async.py 0000664 0000000 0000000 00000006347 12040012356 0023255 0 ustar 00root root 0000000 0000000 import datetime
from Queue import Queue, Empty
#TODO: implement iterable interface
#TODO: implement priority responses
class AsyncWatcher(object):
'''
Asynchronous call watcher -- Handle asynchronous calls and responses.
Usage example::
>>> watcher = AsyncWatcher()
>>> watcher.register(conn1, 'ping')
>>> watcher.register(conn2, 'ping')
>>> responses = watcher.wait() # Wait for all responses
'''
def __init__(self):
self._response_queue = Queue()
self._expected_responses = set()
def _get_in_queue(self, *args, **kwargs):
'''
Get an item in response queue and return it with the time waited to its
comming. Accept all arguments accepted by :meth:`Queue.get`
'''
start = datetime.datetime.now()
item = self._response_queue.get(*args, **kwargs)
dt = datetime.datetime.now() - start
dt = dt.seconds + dt.days * 86400 + dt.microseconds * 0.000001
return (dt, item)
#
# Public API:
#
@property
def remains(self):
'''
Remaining expected responses.
'''
return len(self._expected_responses)
def register(self, rpc, method, *args, **kwargs):
'''
Call specified method on specified rpc with specified arguments
and register the call on this object.
:param rpc: the rpc on which made the call
:param method: the rpc method to call
:param \*args, \*\*kwargs: the arguments to pass to the rpc method
'''
msg_id = rpc.async_call(self._response_queue, method, *args, **kwargs)
self._expected_responses.add(msg_id)
def wait(self, timeout=None, max_wait=None):
'''
Wait responses for calls registered on this :class:`AsyncWatcher`
instance and return them.
:param timeout: timeout value or None to disable timeout (default: None)
:param max_wait: maximum waited responses
:return: received messages in a list
.. note::
You can repeat call to this method on a single
:class:`AsyncWatcher`. For example, if you want to process the
first arrived message, then wait others for a minute, you can do::
>>> msgs = watcher.wait(max_wait=1)
>>> process_speedy(msgs[0])
>>> for msg in watcher.wait(timeout=60):
>>> process(msg)
'''
return list(self.iter(timeout, max_wait))
def iter(self, timeout=None, max_wait=None):
'''
Work like :meth:`AsyncWatcher.wait` but return an iterable, instead of
a list object.
.. note::
Responses are yielded by the iterable when they are received, so you
can start the processing of response before receiving all.
'''
while self.remains:
try:
dt, response = self._get_in_queue(timeout=timeout)
except Empty:
break
if timeout is not None:
timeout -= dt
self._expected_responses.remove(response['id'])
yield response
# Check for max_wait:
if max_wait is not None:
max_wait -= 1
if not max_wait:
break
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/core/exceptions.py 0000664 0000000 0000000 00000001552 12040012356 0024312 0 ustar 00root root 0000000 0000000
'''
Contains sjRpc exceptions.
'''
class RpcError(Exception):
'''
Exception raised by caller when an error occurs while execution of remote
procedure call.
'''
def __init__(self, exception, message):
self.exception = exception
self.message = message
def __str__(self):
return '%s' % self.message
class RpcConnectionError(Exception):
'''
Base class for RpcConnection errors.
'''
class SocketError(RpcConnectionError):
'''
Exception used internally to raise a socket fault.
'''
class NoFreeLabelError(RpcConnectionError):
'''
Exception raised when no more free labels are available for protocol
allocation.
'''
class FallbackModeEnabledError(RpcConnectionError):
'''
Exception raised when a feature which is not compatible with fallback mode
is used.
'''
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/core/protocols/ 0000775 0000000 0000000 00000000000 12040012356 0023600 5 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/core/protocols/__init__.py 0000664 0000000 0000000 00000005251 12040012356 0025714 0 ustar 00root root 0000000 0000000
'''
Protocols can be binded on a specific label of a :class:`RpcConnection` (see
`Multiplexing & protocols`_ for more informations).
Following protocols are provided with standard distribution of sjRpc, but you
can create yours if you needs:
- :class:`RpcProtocol`: the standard rpc protocol
- :class:`TunnelProtocol`: a protocol which allow to tunnel a socket traffic
through the sjRpc connection
- :class:`VpnProtocol` (experimental): like :class:`TunnelProtocol` but work
with a network interface instead of a socket.
'''
from __future__ import absolute_import
import logging
class Protocol(object):
'''
Base class for all protocols.
'''
def __init__(self, connection, label, logger=None):
self._connection = connection
self._label = label
if logger is None:
logger_name = '%s.protos.%s' % (connection.logger.name, label)
self.logger = logging.getLogger(logger_name)
else:
self.logger = logger
@property
def connection(self):
'''
The :class:`~sjrpc.core.RpcConnection` instance which handle
this protocol.
'''
return self._connection
@property
def label(self):
'''
The label binded to this protocol in the
:class:`~sjrpc.core.RpcConnection` instance.
'''
return self._label
def send(self, payload):
'''
Send a message through the sjRpc connection.
'''
self._connection.send(self._label, payload)
def start_message(self, payload_size):
'''
Start a new incoming message receipt. By default, this method create
a new empty buffer on self._incoming_buf variable.
'''
self._incoming_buf = ''
def feed(self, data):
'''
Handle a chunk of data received from the tunnel. By default, this
method append this chunk to the end of the incoming buffer created by
default by :meth:`start_message` method.
'''
self._incoming_buf += data
def end_of_message(self):
'''
Signal the end of the currently received message. With default
:meth:`start_message` and :meth:`feed` methods, it's a good place to
implements the processing of the incoming message.
'''
pass
def handle_control(self, payload):
'''
Handle a control message received from the Rpc0.
'''
pass
def shutdown(self):
pass
from sjrpc.core.protocols.rpc import RpcProtocol
from sjrpc.core.protocols.tunnel import TunnelProtocol
from sjrpc.core.protocols.vpn import VpnProtocol
__all__ = ['Protocol', 'RpcProtocol', 'TunnelProtocol', 'VpnProtocol']
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/core/protocols/rpc.py 0000664 0000000 0000000 00000036503 12040012356 0024745 0 ustar 00root root 0000000 0000000 from __future__ import absolute_import
import json
from uuid import uuid4
from threading import Event, Thread
from sjrpc.core.exceptions import RpcError, RpcConnectionError
from sjrpc.core.protocols import Protocol
__all__ = ['RpcProtocol']
ERRMSG_RPCERR = ('Unable to send reply to the peer: %s (this error is usualy '
'raised when connection is lost while handler function '
'execution)')
class RpcCaller(object):
'''
A caller execute a callable (function, method, class which implement the
:meth:`__call__` method...) in a particular context (threaded or
"timeouted" for example), and return the result (or the exception) to the
peer through it :class:`RpcConnection` object.
'''
def __init__(self, request, protocol, func):
self._request = request
self._protocol = protocol
self._func = func
def run(self):
'''
Run the callable and return the result (or the exception) to the peer.
'''
msg_id = self._request['id']
args = self._request['args']
kwargs = self._request['kwargs']
if getattr(self._func, '__pass_rpc__', False):
args.insert(0, self._protocol)
if getattr(self._func, '__pass_connection__', False):
args.insert(0, self._protocol.connection)
try:
returned = self._func(*args, **kwargs)
except Exception as err:
try:
self._protocol.error(msg_id, message=str(err),
error=err.__class__.__name__)
except RpcError as err:
self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
else:
try:
self._protocol.response(msg_id, returned=returned)
except RpcError as err:
self._protocol.connection.logger.error(ERRMSG_RPCERR, err)
def start(self):
'''
Start execution of the callable, the most of time, it just call
:meth:`run` method.
'''
self.run()
class ThreadedRpcCaller(RpcCaller):
'''
A caller which make the call into a separated thread.
'''
def __init__(self, *args, **kwargs):
super(ThreadedRpcCaller, self).__init__(*args, **kwargs)
self._thread = Thread(target=self.run)
self._thread.name = 'Processing of call: %s' % self._request['id']
self._thread.daemon = True
def start(self):
self._thread.start()
class RpcProtocol(Protocol):
'''
The standard protocol used to do RPC request/responses.
:param connection: the connection serving this :class:`RpcProtocol`
:param label: the label of this :class:`RpcProtocol` instance
:param handler: command handler to bind by default
:param on_disconnect: callback called when client disconnect, could be the
name of an RPC handler or a callable
:param timeout: global command timeout
'''
REQUEST_MESSAGE = {'id': None, 'method': None, 'args': [], 'kwargs': {}}
RESPONSE_MESSAGE = {'id': None, 'return': None, 'error': None}
SPECIAL_MESSAGE = {'special': None}
def __init__(self, connection, label, handler=None, on_disconnect=None,
timeout=30, *args, **kwargs):
super(RpcProtocol, self).__init__(connection, label, *args, **kwargs)
self._handler = handler
self._on_disconnect = on_disconnect
self._call_timeout = timeout
# Store all calls sent to the peer. Key is the id of the call and value
# the event to raise when call is finished.
self._calls = {}
def _dispatch(self, message):
'''
Dispatch a received message according to it type.
:param message: the received message to dispatch
.. note::
When dispatching a call, the responsability of response is delegated
to the caller, except for the case where the method isn't found on
the handler.
'''
self.logger.debug('Received: %s', message)
if set(RpcProtocol.REQUEST_MESSAGE) <= set(message):
self._handle_request(message)
elif set(RpcProtocol.RESPONSE_MESSAGE) <= set(message):
self._handle_response(message)
elif set(RpcProtocol.SPECIAL_MESSAGE) <= set(message):
self._handle_special(message)
else:
self.logger.debug('Malformed message received: %s', message)
def _handle_request(self, message):
'''
Handle an inbound request message.
'''
if self._handler is not None:
try:
func = self._handler[message['method']]
except KeyError:
self.error(message['id'], 'NameError',
"remote name '%s' is not defined" % message['method'])
else:
if getattr(func, '__threaded__', True):
ThreadedRpcCaller(message, self, func).start()
else:
RpcCaller(message, self, func).start()
else:
self.error(message['id'], 'NameError',
"remote name '%s' is not defined" % message['method'])
def _handle_response(self, message):
'''
Handle an inbound response message
'''
# Handle response message from the peer:
call = self._calls.get(message['id'])
if call is not None:
# Call exists in call list
if message['error'] is None:
call['return'] = message['return']
else:
call['error'] = message['error']
if 'event' in call:
# Release the call if its synchronous:
call['event'].set()
elif 'async_cb' in call:
call['async_cb'](call_id=message['id'],
response=call.get('return'),
error=call.get('error'))
else:
# Else, it's an asynchonous call, we need to push the answer
# on the queue:
queue = call['queue']
del call['queue']
queue.put(call)
# Finally, delete the call from the current running call list:
del self._calls[message['id']]
def _handle_special(self, message):
'''
Handle special message.
'''
if message['special'] == 'capabilities':
if self._label == 0:
self._connection.set_capabilities(message.get('capabilities'))
else:
self.logger.warning('Capabilities message received by non-zero'
' rpc.')
elif message['special'] == 'protoctl':
label = message.get('label')
if label is None:
self.logger.warning('Protoctl message received without label.')
return
try:
proto = self._connection.get_protocol(label)
except KeyError:
self.logger.warning('Protoctl message received for unknown label')
else:
try:
proto.handle_control(message.get('type'),
message.get('payload'))
except Exception as err:
self.logger.error('Protoctl handler failed for proto %s: ',
'%s' % err)
def _send(self, message):
'''
Low level method to encode a message in json, calculate it size, and
place result on outbound buffer.
.. warning::
Message must be a jsonisable structure.
'''
self.logger.debug('Sending: %s', message)
json_msg = json.dumps(message)
try:
self._connection.send(self._label, payload=json_msg)
except RpcConnectionError as err:
raise RpcError('RpcConnectionError', str(err))
def _send_call(self, method_name, *args, **kwargs):
'''
Create the message for the call and push them to the outbound queue.
:param method_name: the name of the method to call on the peer
:param *args: arguments to pass to the remote method
:param **kwargs: keyword arguments to pass to the remote method
:return: the generated id for the request
:rtype: :class:`str` object
'''
msg = RpcProtocol.REQUEST_MESSAGE.copy()
msg['method'] = method_name
msg['args'] = args
msg['kwargs'] = kwargs
msg['id'] = str(uuid4())
self._send(msg)
return msg['id']
def _send_response(self, msg_id, returned=None, error=None):
'''
Low level method to send a response message to the peer.
:param msg_id: the id of the replied message
:param returned: returned data
:type returned: returned data or None if errors have been raised
:param error: raised errors
:type error: raised error or None if no error have been raised
'''
msg = RpcProtocol.RESPONSE_MESSAGE.copy()
msg['id'] = msg_id
msg['return'] = returned
msg['error'] = error
self._send(msg)
#
# Public methods:
#
def end_of_message(self):
'''
When the message is fully received, decode the json and dispatch it.
'''
msg = json.loads(self._incoming_buf)
self._dispatch(msg)
def shutdown(self):
'''
Handle the shutdown process of this protocol instance:
* Release all waiting calls with a "Connection reset by peer" error.
* Execute the on_disconnect callback.
'''
# Release all waiting calls from this rpc:
for cid in self._calls.keys():
err = {'exception': 'RpcError',
'message': 'Connection reset by peer'}
self._handle_response({'id': cid, 'return': None, 'error': err})
# Execute on_disconnect callback:
if self._on_disconnect is None:
return
callback = None
if not callable(self._on_disconnect):
if self._handler is not None:
try:
callback = self._handler[self._on_disconnect]
except KeyError:
self.logger.warn('Shutdown callback not found in current '
'rpc attached handler, ignoring')
callback = None
else:
self.logger.warn('Shutdown callback specified but no handler '
'binded on rpc, ignoring')
callback = None
else:
callback = self._on_disconnect
if callback is not None:
try:
callback(self._connection)
except Exception as err:
self.logger.debug('Error while execution of shutdown '
'callback: %s', err)
def get_handler(self):
'''
Return the handler binded to the :class:`RpcConnection`.
:return: binded handler
'''
return self._handler
def set_handler(self, handler):
'''
Define a new handler for this connection.
:param handler: the new handler to define.
'''
self._handler = handler
def send_special(self, special, **kwargs):
'''
Send a "special" message to the peer.
:param special: type of the special message
:param \*\*kwargs: fields of the special message
'''
msg = {'special': special}
msg.update(kwargs)
self._send(msg)
def response(self, msg_id, returned):
'''
Send an "return" response to the peer.
:param msg_id: the id of the replied message
:param returned: the value returned by the function
.. warning::
In case of raised error, use the :meth:`error` method instead of
this one.
'''
self._send_response(msg_id, returned=returned)
def error(self, msg_id, error, message, traceback=None):
'''
Send an error response to the peer.
:param msg_id: the id of the replied message
:param error: the name of the raised exception
:param message: human readable error for the exception
'''
err = {'exception': error, 'message': message}
self._send_response(msg_id, error=err)
def call(self, method_name, *args, **kwargs):
'''
Make a new remote call on the peer.
:param method_name: the method to call on the peer
:param \*args: the arguments for the call
:param \*\*kwargs: the keyword arguments for the call
:return: the data returned by the peer for the call
.. note::
This function will block until the peer response is received. You
can also specify a ``timeout`` argument to specify a number of
seconds before to raise an :exc:`CallTimeout` exception if the peer
didnt respond.
'''
timeout = kwargs.pop('_timeout', self._call_timeout)
# Send the call to the peer:
msg_id = self._send_call(method_name, *args, **kwargs)
# Create an item in calls dict with reference to the event to raise:
call = {'return': None, 'error': None, 'event': Event(), 'id': msg_id}
self._calls[msg_id] = call
# Wait for the response:
call['event'].wait(timeout)
# Check if timeout occured:
if not call['event'].is_set():
raise RpcError('TimeoutError', 'remote method timeout')
# Check if error occured while execution:
if call['error'] is not None:
raise RpcError(call['error']['exception'],
call['error']['message'])
return call['return']
def async_call(self, queue, method_name, *args, **kwargs):
'''
Make a new asynchronous call on the peer.
:param queue: the queue where to push the response when received
:param method_name: the method to call on the peer
:param _data: local data to give back on the response
:param \*args: the arguments for the call
:param \*\*kwargs: the keyword arguments for the call
:return: the message id of the call
'''
# Extract _data from argument:
data = kwargs.pop('_data', None)
# Send the call to the peer:
msg_id = self._send_call(method_name, *args, **kwargs)
# Register the call but don't wait for the response:
self._calls[msg_id] = {'id': msg_id, 'async': True,
'data': data, 'queue': queue}
return msg_id
def async_call_cb(self, callback, method_name, *args, **kwargs):
'''Make an asynchronous call on the peer and callback when call is done.
:param callable callback: callback must have the following named
arguments:
* **call_id** (*int*) - unique identifier for the async call made
* **response** - response as returned by the remote function
* **error** (*dict*) - error indicator in case remote call throwed exception
:param str method_name: the method to call on the peer
:param \*args: the arguments for the call
:param \*\*kwargs: the keyword arguments for the call
:return: the message id of the call
'''
msg_id = self._send_call(method_name, *args, **kwargs)
self._calls[msg_id] = {'id': msg_id, 'async_cb': callback}
return msg_id
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/core/protocols/tunnel.py 0000664 0000000 0000000 00000013252 12040012356 0025462 0 ustar 00root root 0000000 0000000 from __future__ import absolute_import
import socket
from sjrpc.core.protocols import Protocol
import pyev
__all__ = ['TunnelProtocol']
class TunnelProtocol(Protocol):
'''
'''
GET_SIZE = 1024 * 1024 # 1MB
DEFAULT_GET_SIZE = GET_SIZE
def __init__(self, *args, **kwargs):
endpoint = kwargs.pop('endpoint', None)
autostart = kwargs.pop('autostart', True)
self._cb_on_close = kwargs.pop('on_close', self.cb_default_on_close)
super(TunnelProtocol, self).__init__(*args, **kwargs)
if endpoint is None:
self._endpoint, self._socket = socket.socketpair()
else:
self._endpoint = endpoint
self._socket = None
self._is_closed = False
self._is_started = autostart
self._from_tun_to_endpoint_buf = ''
self._asked = 0 # Data asked to the peer
self._ok_to_send = 0 # Data I can send to the peer
# Set the endpoint as non-blocking socket:
self._endpoint.setblocking(False)
# Create watcher to handle data coming from the endpoint:
props = dict(fd=self._endpoint, events=pyev.EV_READ,
callback=self._handle_from_endpoint)
self._endpoint_reader = self._connection.create_watcher(pyev.Io, **props)
# Create watcher to handle data going to the endpoint:
props = dict(fd=self._endpoint, events=pyev.EV_WRITE,
callback=self._handle_from_tunnel)
self._endpoint_writer = self._connection.create_watcher(pyev.Io, **props)
self._connection.rpc.send_special('protoctl', label=self._label,
type='ready')
def _handle_from_endpoint(self, watcher, revents):
'''
Handle data coming from the endpoint socket and push it through the
sjRpc tunnel.
'''
# Abort if peer don't want more data:
if self._ok_to_send <= 0:
watcher.stop
return
try:
read = self._endpoint.recv(self._ok_to_send)
except (IOError, socket.error) as err:
if err.errno in self.connection.NONBLOCKING_ERRORS:
return
else:
self.close()
return
# Empty read means the connection has been closed on other side:
if not read:
self.close()
return
self._ok_to_send -= len(read)
self.send(read)
if not self._ok_to_send:
watcher.stop()
def _handle_from_tunnel(self, watcher, revent):
'''
Handle writing of the data already received from the tunnel and stored
into the incoming buffer. Data are writed only when the endpoint socket
is ready to write.
This method is also responsible of asking more data to the peer when
the incoming buffer is empty.
'''
try:
sent = self._endpoint.send(self._from_tun_to_endpoint_buf)
except (IOError, socket.error) as err:
if err.errno in self.connection.NONBLOCKING_ERRORS:
return
else:
self.close()
return
self._from_tun_to_endpoint_buf = self._from_tun_to_endpoint_buf[sent:]
self._asked -= sent
if self._asked < TunnelProtocol.GET_SIZE * 2:
self._send_get(TunnelProtocol.GET_SIZE)
if not self._from_tun_to_endpoint_buf:
watcher.stop()
def _send_get(self, size):
self._connection.rpc.send_special('protoctl', label=self._label,
type='get', payload=dict(size=size))
self._asked += size
def cb_default_on_close(self, tun):
'''
Action to do on the endpoint when the connection is closed.
'''
tun.endpoint.close()
#
# Public methods:
#
@property
def socket(self):
return self._socket
@property
def endpoint(self):
'''
Return the tunnel endpoint.
'''
return self._endpoint
def start(self):
'''
Start the reader on the endpoint.
'''
if not self._is_closed:
self._is_started = True
self._endpoint_reader.start()
def close(self):
'''
Close the tunnel and unregister it from connection.
'''
if not self._is_closed:
self._is_closed = True
# Stop watchers:
self._endpoint_reader.stop()
self._endpoint_writer.stop()
# Send the end of stream message to the peer:
self._connection.rpc.send_special('protoctl', label=self._label, type='eos')
# Unregister the tunnel:
self._connection.unregister_protocol(self._label)
self._cb_on_close(self)
def end_of_message(self):
'''
Handle inbound data from the :class:`RpcConnection` peer and place it
on the incoming buffer.
'''
self._from_tun_to_endpoint_buf += self._incoming_buf
self._endpoint_writer.start()
def handle_control(self, control_type, payload):
if not self._is_closed:
if control_type == 'get':
size = payload.get('size', TunnelProtocol.DEFAULT_GET_SIZE)
self._ok_to_send += size
if self._is_started:
self._endpoint_reader.start()
elif control_type == 'ready':
self._send_get(TunnelProtocol.GET_SIZE)
self._ok_to_send += TunnelProtocol.GET_SIZE
if self._is_started:
self._endpoint_reader.start()
elif control_type == 'eos':
self.logger.debug('Received EOS event')
self.close()
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/core/protocols/vpn.py 0000664 0000000 0000000 00000004333 12040012356 0024760 0 ustar 00root root 0000000 0000000 from __future__ import absolute_import
import os
import struct
import fcntl
from sjrpc.core.protocols import TunnelProtocol
__all__ = ['VpnProtocol']
class _FileSocket(object):
'''
A fake Socket interface for files.
'''
def __init__(self, file_):
self.file = file_
def recv(self, size):
self.file.read(size)
def send(self, data):
self.file.write(data)
return len(data)
def fileno(self):
return self.file.fileno()
def setblocking(self, blocking):
if not blocking:
# Disable blocking mode on devzero:
current_flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFL)
fcntl.fcntl(self.fileno(), fcntl.F_SETFL, current_flags | os.O_NONBLOCK)
else:
raise NotImplementedError('Not implemented')
class VpnProtocol(TunnelProtocol):
'''
A VPN protocol which spawn a network interface and tunnel all its traffic
through the sjRpc tunnel.
:param inherited: All options (except endpoint) are inherited from
:class:`TunnelProtocol`.
:param tun_prefix: the prefix name of the spawned interface
:param tun_mode: which can be VpnProtocol.IFF_TUN or VpnProtocol.IFF_TAP
The user starting the sjRpc process must have the CAP_NET_ADMIN capability,
by default, only root have this capability.
.. warning::
This protocol is a proof of concept and should not be used in production
software.
'''
# Taken from linux/if_tun.h:
TUNSETIFF = 0x400454ca
IFF_TUN = 0x0001
IFF_TAP = 0x0002
# Default values:
DEFAULT_TUN_PREFIX = 'tun'
DEFAULT_TUN_MODE = IFF_TAP
def __init__(self, *args, **kwargs):
tun_prefix = kwargs.pop('tun_prefix', VpnProtocol.DEFAULT_TUN_PREFIX)
tun_mode = kwargs.pop('tun_mode', VpnProtocol.DEFAULT_TUN_MODE)
ftun = open('/dev/net/tun', 'r+b')
# Create the tunnel and get its final name:
create_tun_arg = struct.pack('16sH', tun_prefix + '%d', tun_mode)
tun = fcntl.ioctl(ftun, VpnProtocol.TUNSETIFF, create_tun_arg)
self.tun_name = tun[:16].rstrip('\0')
kwargs['endpoint'] = _FileSocket(ftun)
super(VpnProtocol, self).__init__(*args, **kwargs)
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/core/rpcconnection.py 0000664 0000000 0000000 00000041247 12040012356 0025002 0 ustar 00root root 0000000 0000000
'''
This module contains the RpcConnection class, more informations about this
class are located in it docstring.
'''
from __future__ import absolute_import
import ssl
import errno
import struct
import socket
import logging
import threading
from sjrpc.core.protocols import Protocol, RpcProtocol, TunnelProtocol
from sjrpc.core.exceptions import (RpcConnectionError, NoFreeLabelError,
FallbackModeEnabledError, SocketError)
import pyev
class RpcConnection(object):
'''
This class manage a single peer connection.
You can wrap an existing socket with the default constructor::
>>> conn = RpcConnection(mysocket)
Or create a new socket automatically with from_addr constructor::
>>> conn = RpcConnection.from_addr(host, port)
If you prefer SSL connection, you can use the from_addr_ssl constructor::
>>> conn = RpcConnection.from_addr_ssl(host, port)
By default, an :class:`RpcProtocol` is created on label 0, you can access
to this rpc through the `conn.rpc` shortcut::
>>> conn.rpc.call('ping')
Also, the connection object expose :meth:`call` and :meth:`async_call`
method from default rpc, so you can use it directly on connection::
>>> conn.call('ping') # Equivalent to the exemple before
.. seealso::
You can read the :ref:`Default rpc, aka Rpc0` section to know more about
the default rpc
:param sock: the socket object of this newly created :class:`RpcConnection`
:param fallback_timeout: set the maximum time to wait the "capabilities"
message before to send anything to the peer. 0 to wait indefinitly, -1
to disable the fallback mode.
:param \*args,\*\*kwargs: arguments to pass to the default rpc protocol
automatically registered on label 0.
'''
NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK)
NONBLOCKING_SSL_ERRORS = (ssl.SSL_ERROR_WANT_READ, )
MESSAGE_HEADER = '!HL'
MESSAGE_HEADER_FALLBACK = '!L'
MAX_LABEL = 2 ** 16
DEFAULT_RECV_SIZE = 1024 * 64 # 64kB
SHORTCUTS_MAINRPC = ('call', 'async_call')
def __init__(self, sock, loop=None, enable_tcp_keepalive=False,
fallback_timeout=1.0, *args, **kwargs):
# Sock of this connection:
self._sock = sock
sock.setblocking(False)
# Initialization requires fallback mode disabled:
self.fallback = False
# Get the pyev loop:
if loop is None:
self.loop = pyev.Loop()
else:
self.loop = loop
# Activate TCP keepalive on the connection:
if enable_tcp_keepalive:
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# Watcher list:
self._watchers = set()
# Socket inbound/outbound buffers:
self._inbound_buffer = ''
self._outbound_buffer = ''
if fallback_timeout == -1:
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER)
else:
self._remains = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
self._proto_receiving = None
# Initialize main read/write watchers:
self._sock_reader = self.create_watcher(pyev.Io,
fd=self._sock,
events=pyev.EV_READ,
callback=self._reader)
self._sock_reader.start()
self._sock_writer = self.create_watcher(pyev.Io,
fd=self._sock,
events=pyev.EV_WRITE,
callback=self._writer)
# OpenSSL requires that the SAME string object must be used on the
# next send retry when all the entire buffer can't be write on the
# socket. This attribute store this string, or None if the previous
# send has been a success:
self._writer_last_try_buf = None
# Is the RpcConnection connected to its peer:
self._connected = True
# "Need to send" loop signal:
self._need_to_send = self.create_watcher(pyev.Async,
callback=self._cb_need_to_send)
self._need_to_send.start()
# Setup logging facility:
self.logger = logging.getLogger('sjrpc.%s' % self.getpeername())
# Protocols registered on this connection:
self._protocols = {}
self.register_protocol(0, RpcProtocol, *args, **kwargs)
# Create shortcuts to the main rpc (protocol 0) for convenience:
for name in RpcConnection.SHORTCUTS_MAINRPC:
setattr(self, name, getattr(self.get_protocol(0), name))
self._event_fallback = threading.Event()
# By default, enter in fallback mode, no label, all frames are
# redirected on Rpc0:
if fallback_timeout != -1:
self.fallback = True
self.create_watcher(pyev.Timer, after=fallback_timeout, repeat=0,
callback=self._cb_set_event_fallback).start()
# Set the event fallback just to send the capability message:
self._event_fallback.set()
# Send our capabilities to the peer:
self._remote_capabilities = None
self._send_capabilities()
# And clear it after:
self._event_fallback.clear()
@classmethod
def from_addr(cls, addr, port, conn_timeout=30.0, *args, **kwargs):
'''
Construct the instance of :class:`RpcConnection` without providing
the :class:`socket` object. Socket is automatically created and passed
to the standard constructor before to return the new instance.
:param addr: the target ip address
:param port: the target port
:param conn_timeout: the connection operation timeout
:param \*args,\*\*kwargs: extra argument to pass to the constructor (see
constructor doctring)
'''
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(conn_timeout)
sock.connect((addr, port))
sock.setblocking(True)
return cls(sock, *args, **kwargs)
@classmethod
def from_addr_ssl(cls, addr, port, cert=None,
conn_timeout=30, *args, **kwargs):
'''
Construct :class:`RpcConnection` instance like :meth:`from_addr`, but
enable ssl on socket.
:param cert: ssl client certificate or None for ssl without certificat
'''
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(conn_timeout)
sock.connect((addr, port))
sock.setblocking(True)
req = ssl.CERT_NONE if cert is None else ssl.CERT_REQUIRED
sock = ssl.wrap_socket(sock, certfile=cert, cert_reqs=req,
ssl_version=ssl.PROTOCOL_TLSv1)
return cls(sock, *args, **kwargs)
def __repr__(self):
return ''
def __hash__(self):
return self._sock.__hash__()
def __nonzero__(self):
return self._connected
def _send_capabilities(self):
'''
Send capabilities to the peer, only work in fallback mode for
compatibility with old sjRpc.
Send a special message through the Rpc0 with these fields:
- special: 'capabilities'
- capabilities: {'version': REMOTE_VERSION, 'capabilities': []}
'''
from sjrpc import __version__
cap = {'version': __version__, 'capabilities': ['rpc', 'tunnel']}
rpc0 = self.get_protocol(0)
rpc0.send_special('capabilities', capabilities=cap)
def _reader(self, watcher, revents):
'''
Read socket and feed inbound buffer. Launch the dispatch when all
data are buffered.
'''
# Read all possible data from the socket:
try:
buf = self._sock.recv(RpcConnection.DEFAULT_RECV_SIZE)
except socket.error as err:
if (isinstance(err, socket.error) and err.errno
in RpcConnection.NONBLOCKING_ERRORS):
return
elif (isinstance(err, ssl.SSLError) and err.errno
in RpcConnection.NONBLOCKING_SSL_ERRORS):
return
else:
# If any fatal error is triggered, the connection is shutdown:
self.shutdown()
self.logger.debug('Unexpected socket error: %s', err)
return
# For ssl socket, we need to fetch buffered ssl-side data:
if isinstance(self._sock, ssl.SSLSocket):
pending = self._sock.pending()
if pending:
buf += self._sock.recv(pending)
# Empty data on non-blocking socket means that the connection
# has been closed:
if not buf:
self.shutdown()
self._remains -= len(buf)
self._inbound_buffer += buf
# Process and dispatch all inbound data:
while self._remains <= 0:
self._dispatch()
def _dispatch(self):
'''
Read the inbound_buffer, parse and dispatch messages.
'''
if self._proto_receiving is None:
if self.fallback:
size = struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
buf = self._inbound_buffer[:size]
self._inbound_buffer = self._inbound_buffer[size:]
pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER_FALLBACK, buf)[0]
label = 0
else:
size = struct.calcsize(RpcConnection.MESSAGE_HEADER)
buf = self._inbound_buffer[:size]
self._inbound_buffer = self._inbound_buffer[size:]
label, pl_size = struct.unpack(RpcConnection.MESSAGE_HEADER, buf)
# Get the registered protocol for the specified label:
self._proto_receiving = self._protocols.get(label)
# If frame's label is not binded to a protocol, we create a
# dummy protocol to consume the payload:
if self._proto_receiving is None:
self._proto_receiving = Protocol(self, -1)
self._proto_receiving.start_message(pl_size)
self._remains += pl_size
else:
size = len(self._inbound_buffer) + self._remains
buf = self._inbound_buffer[:size]
self._inbound_buffer = self._inbound_buffer[size:]
self._proto_receiving.feed(buf)
if self._remains <= 0:
self._proto_receiving.end_of_message()
if self.fallback:
self._remains += struct.calcsize(RpcConnection.MESSAGE_HEADER_FALLBACK)
else:
self._remains += struct.calcsize(RpcConnection.MESSAGE_HEADER)
self._proto_receiving = None
def _writer(self, watcher, revent):
'''
Write data on the socket.
'''
if self._outbound_buffer:
try:
if self.fallback:
sent = self._sock.send(self._outbound_buffer[:4096])
else:
if self._writer_last_try_buf is None:
self._writer_last_try_buf = self._outbound_buffer[:1024 * 64]
sent = self._sock.send(self._writer_last_try_buf)
except socket.error as err:
if (isinstance(err, socket.error) and err.errno in RpcConnection.NONBLOCKING_ERRORS):
return
elif (isinstance(err, ssl.SSLError) and err.errno in RpcConnection.NONBLOCKING_SSL_ERRORS):
return
errmsg = 'Fatal error while sending through socket: %s' % err
self.logger.error(errmsg)
self.shutdown()
return
else:
self._writer_last_try_buf = None
self._outbound_buffer = self._outbound_buffer[sent:]
if not self._outbound_buffer:
watcher.stop()
def _cb_need_to_send(self, watcher, revents):
self._sock_writer.start()
def _cb_set_event_fallback(self, watcher, revents):
self._event_fallback.set()
#
# Public API
#
@property
def rpc(self):
return self.get_protocol(0)
def run(self):
'''
Main loop execution.
'''
self.loop.start()
def create_watcher(self, watcher_class, **kwargs):
'''
Create a new pyev watcher and return it.
'''
kwargs['loop'] = self.loop
watcher = watcher_class(**kwargs)
self._watchers.add(watcher)
return watcher
def send(self, label, payload):
'''
Low level method to send a message through the socket, generally
used by protocols.
'''
self._event_fallback.wait()
if not self._connected:
raise RpcConnectionError('Not connected to the peer')
size = len(payload)
if self.fallback:
header = struct.pack(RpcConnection.MESSAGE_HEADER_FALLBACK, size)
else:
header = struct.pack(RpcConnection.MESSAGE_HEADER, label, size)
self._outbound_buffer += header + payload
self._need_to_send.send()
def set_capabilities(self, capabilities):
'''
Set capabilities of remote host (and disable fallback mode).
Should be called by Rpc0 when the peer send its capabilities message.
'''
self._remote_capabilities = capabilities
self.fallback = False
self._event_fallback.set()
def register_protocol(self, label, protocol_class, *args, **kwargs):
'''
Register a new protocol for the specified label.
'''
if self.fallback:
raise FallbackModeEnabledError('Fallback mode is not compatible '
'with protocols')
if label is None:
for label in xrange(0, RpcConnection.MAX_LABEL):
if label not in self._protocols:
break
else:
raise NoFreeLabelError('No more label number are availables')
if label in self._protocols:
raise KeyError('A protocol is already registered for this label')
elif not isinstance(label, int):
raise ValueError('Label must be an integer')
self._protocols[label] = protocol_class(self, label, *args, **kwargs)
return self._protocols[label]
def unregister_protocol(self, label):
'''
Unregister the specified protocol label for this connection.
'''
if self.fallback:
raise FallbackModeEnabledError('Fallback mode is not compatible '
'with protocols')
if label in self._protocols and label != 0:
del self._protocols[label]
else:
raise KeyError('No protocol registered for this label')
def create_rpc(self, label=None, *args, **kwargs):
'''
Shortcut which can be used to create rpc protocols.
'''
return self.register_protocol(label, RpcProtocol, *args, **kwargs)
def create_tunnel(self, label=None, *args, **kwargs):
'''
Shortcut which can be used to create tunnels protocols.
'''
return self.register_protocol(label, TunnelProtocol, *args, **kwargs)
def get_protocol(self, label):
'''
Get the protocol registered for specified label.
'''
proto = self._protocols.get(label)
if proto is None:
raise KeyError('No protocol registered for this label')
return proto
def shutdown(self):
'''
Shutdown this connection.
'''
# Ignore repeated calls to shutdown:
if not self._connected:
return
# Unset connected state:
self._connected = False
self.logger.info('Connection shutdown.')
# Shutdown each registered watcher:
for watcher in self._watchers:
watcher.stop()
# Shutdown each registered protocols:
for proto in self._protocols.itervalues():
proto.shutdown()
# Close the connection socket:
try:
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
except socket.error as err:
#self.logger.warn('Error while socket close: %s', err)
pass
def get_fd(self):
'''
Get the file descriptor of the socket managed by this connection.
:return: the file descriptor number of the socket
'''
try:
return self._sock.fileno()
except socket.error:
return None
def getpeername(self):
'''
Get the peer name.
:return: string representing the peer name
'''
return '%s:%s' % self._sock.getpeername()
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/server/ 0000775 0000000 0000000 00000000000 12040012356 0022132 5 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/server/__init__.py 0000664 0000000 0000000 00000000257 12040012356 0024247 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
#coding:utf8
from __future__ import absolute_import
from sjrpc.server.simple import (RpcServer, SSLRpcServer)
__all__ = ('RpcServer', 'SSLRpcServer')
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/server/simple.py 0000664 0000000 0000000 00000007552 12040012356 0024006 0 ustar 00root root 0000000 0000000 from __future__ import absolute_import
import gc
import ssl
import time
import errno
import socket
import select
import weakref
import logging
import threading
from sjrpc.core import RpcConnection
import pyev
class RpcServer(object):
'''
Base class for all RpcServer classes.
'''
DEFAULT_TCP_BACKLOG = 30
NONBLOCKING_ERRORS = (errno.EAGAIN, errno.EWOULDBLOCK)
def __init__(self, sock, loop=None, conn_args=(), conn_kw={}):
self._clients = set()
self.logger = logging.getLogger('sjrpc')
if loop is None:
self.loop = pyev.Loop()
else:
self.loop = loop
self._conn_args = conn_args
self._conn_kw = conn_kw
self._sock = sock
self._sock_watcher = self.loop.io(self._sock, pyev.EV_READ, self._handle)
self._sock_watcher.start()
@classmethod
def from_addr(cls, addr, port, tcp_backlog=None, *args, **kwargs):
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((addr, port))
sock.setblocking(False)
if tcp_backlog is None:
tcp_backlog = RpcServer.DEFAULT_TCP_BACKLOG
sock.listen(tcp_backlog)
return cls(sock, *args, **kwargs)
def _wrap(self, sock):
'''
Wrap the socket into the :class:`RpcConnection` and return it.
'''
return RpcConnection(sock, self.loop, *self._conn_args, **self._conn_kw)
def _handle(self, watcher, revents):
# Collect offline connections:
self._collect_connection()
try:
sock, address = self._sock.accept()
except socket.error as err:
if err.errno in self.NONBLOCKING_ERRORS:
return
else:
self.logger.warn('Error while accepting client: %s', err)
return
self.logger.info('New incoming connection from %s:%s', *address)
conn = self._wrap(sock)
self.register(conn)
def _collect_connection(self):
'''
Collect the offlines connections.
'''
for conn in self._clients.copy():
if not conn:
self._clients.remove(conn)
#
# Public methods:
#
def register(self, conn):
'''
Register a new connection on this server.
:param conn: the connection to register.
'''
self._clients.add(conn)
def unregister(self, conn, shutdown=False):
'''
Unregister the specified client from this server. If shutdown is
specified, client is shutdown before to be unregistered.
:param conn: the connection to unregister
:param shutdown: shutdown or not the connection before to register
'''
if conn in self._clients:
if shutdown:
conn.shutdown()
def run(self):
'''
Run the :class:`RpcServer`.
'''
self.loop.start()
def shutdown(self):
'''
Shutdown the :class:`RpcServer` instance.
'''
self.logger.info('Shutdown requested')
self._sock_watcher.stop()
for client in self._clients.copy():
self.unregister(client, shutdown=True)
self.loop.stop(pyev.EVBREAK_ALL)
class SSLRpcServer(RpcServer):
'''
SSL version of the RpcServer.
'''
def __init__(self, sock, certfile, keyfile, loop=None, *args, **kwargs):
super(SSLRpcServer, self).__init__(sock, loop, *args, **kwargs)
self._certfile = certfile
self._keyfile = keyfile
def _wrap(self, sock):
sock = ssl.wrap_socket(sock, server_side=True,
keyfile=self._keyfile,
certfile=self._certfile,
ssl_version=ssl.PROTOCOL_TLSv1,
do_handshake_on_connect=True)
return super(SSLRpcServer, self)._wrap(sock)
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/utils/ 0000775 0000000 0000000 00000000000 12040012356 0021764 5 ustar 00root root 0000000 0000000 sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/utils/__init__.py 0000664 0000000 0000000 00000000327 12040012356 0024077 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
#coding:utf8
from sjrpc.utils.proxies import *
from sjrpc.utils.handlers import *
__all__ = ('ConnectionProxy', 'RpcHandler', 'threadless', 'pure',
'pass_connection', 'pass_rpc')
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/utils/handlers.py 0000664 0000000 0000000 00000002661 12040012356 0024143 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
#coding:utf8
class RpcHandler(object):
'''
Basic RPC functions handler.
Derive this call in your handler and define some methods. All the defined
methods (including privates) are available to your peer.
'''
def __getitem__(self, name):
if hasattr(self, name):
return getattr(self, name)
else:
raise KeyError(name)
#
# Decorators:
#
def threadless(func):
'''
Function handler decorator -- don't spawn a new thread when function is
called.
'''
func.__threaded__ = False
return func
def pure(func):
'''
Function handler decorator -- the function is a pure fonction, caller will
not pass :class:`RpcConnection` object as first call parameters.
.. deprecated:: 14
This decorator is useless since the default behavior have change. You
can use :func:`pass_connection` decorator to do the opposite.
This function is kept for compatibility only, and will be removed latter.
'''
return func
def pass_connection(func):
'''
Function handler decorator -- pass on first parameter the connection which
called this function.
'''
func.__pass_connection__ = True
return func
def pass_rpc(func):
'''
Function handler decorator -- pass on first (or after connection) the
rpc protocol which called this function.
'''
func.__pass_rpc__ = True
return func
sjrpc-ee1705e29558b47bc61a187897c514bcb080cf68/sjrpc/utils/proxies.py 0000664 0000000 0000000 00000002504 12040012356 0024030 0 ustar 00root root 0000000 0000000 #!/usr/bin/env python
#coding:utf8
from __future__ import absolute_import
from sjrpc.core.exceptions import RpcError
class ConnectionProxy(object):
'''
Create a new call proxy for the connection passed in arguments.
Usage example:
>>> proxy = ConnectionProxy(conn)
>>> ret = proxy.existing_function()
If exception is raised by the function, the proxy tries to get the
exception class in *__builtins__* to re-raise it. If exception class
is not found in *__builtins__*, the exception raised is the
original :exc:`RpcError`:
>>> proxy.unknown_function()
[Traceback]
NameError: remote name 'unknown_function' is not defined
>>> proxy['myfunc']() # You can also use this syntax
...
'''
def __init__(self, connection):
self.connection = connection
def __getattr__(self, name):
def func(*args, **kwargs):
try:
returned = self.connection.call(name, *args, **kwargs)
except RpcError as err:
expt = __builtins__.get(err.exception)
if expt is not None:
raise expt(err.message)
else:
raise err
else:
return returned
return func
def __getitem__(self, name):
return self.__getattr__(name)