Loading ccnode/node.py +60 −21 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import time import signal import logging import logging.config from functools import partial import pyev from sjrpc.core import RpcConnection Loading @@ -18,31 +19,41 @@ logger = logging.getLogger(__name__) DEFAULT_TAGS = (Tag(u'version', __version__, -1),) class DefaultHandler(RpcHandler): """Base handler for :class:`Node` objects. Containing only a ``version`` tag that returns current ``cc-node`` version. See `sjRpc documentation <http://google.fr>`_ for more information. """ def __init__(self, *args, **kwargs): RpcHandler.__init__(self) class RPCHandler(object): """Handles rpc connection to the remote cc-server.""" def __init__(self, loop): self.loop = loop self.watcher = loop.loop.io(self.cb) self.timer = loop.loop.timer(0, 5, self.timeout) self.tags = dict((t.name, t) for t in DEFAULT_TAGS) self.connection = None self.proxy = None def get_tags(self, tags=None, noresolve_tags=None): """Method used from the ``cc-server`` to get tags. # configure socket self.sock = None self.sock.setblocking(0) :param iterable tags: list of tags to return :param iterable noresolve_tags: list of tags to not return """ return get_tags(self.tags, tags, noresolve_tags) def cb(self, watcher, revents): if revents & pyev.EV_WRITE: self.loop.connection = RpcConnection( sock=ssl.wrap_socket(self.sock, ssl_version=ssl.PROTOCOL_TLSv1), loop=self.loop.loop, ) self.loop.proxy = ConnectionProxy(self.loop.connection) def set_tag(self, tag): self.tags[tag.name] = tag def timeout(self, watcher, revents): self.sock.close() def remove_tag(self, tag_name): self.tags.pop(tag_name, None) def run(self): self.watcher.start() self.timeout.start() self.connection = RpcConnection.from_addr_ssl( addr=self.loop.config.server_host, port=self.loop.config.server_port, handler=self.loop.handlers, ) self.proxy = ConnectionProxy(self.connection) #self.loop. class AuthHandler(object): Loading Loading @@ -143,12 +154,40 @@ class MainLoop(object): # role self.role = None # tag database self.tag_db = dict( __main__=dict((t.name, t) for t in DEFAULT_TAGS), # db for main objects # other sub-objects db can go here (for example VMs) ) # handlers self.rpc_handler = dict( get_tags=partial(get_tags, self.tag_db['__main__']), sub_tags=self.sub_tags, ) # RPC handlers definitions def sub_tags(self, sub_id, tags=None, noresolve_tags=None): if sub_id == '__main__': # FIXME should we raise ? logger.debug('Invalid request for sub object') return sub_db = self.tag_db.get(sub_id) if sub_db is None: # FIXME should we also raise here ? logger.debug('Failed to find sub_id %s', sub_id) return return get_tags(sub_db, tags, noresolve_tags) # End RPC handlers definitions def rpc_connect(self): # TODO async and error handling self.rpc_con = RpcConnection.from_addr_ssl( addr=self.config.server_host, port=self.config.server_port, handler=DefaultHandler(), handler=self.rpc_handler, loop=self.loop, ) self.proxy = ConnectionProxy(self.rpc_con) Loading Loading
ccnode/node.py +60 −21 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import time import signal import logging import logging.config from functools import partial import pyev from sjrpc.core import RpcConnection Loading @@ -18,31 +19,41 @@ logger = logging.getLogger(__name__) DEFAULT_TAGS = (Tag(u'version', __version__, -1),) class DefaultHandler(RpcHandler): """Base handler for :class:`Node` objects. Containing only a ``version`` tag that returns current ``cc-node`` version. See `sjRpc documentation <http://google.fr>`_ for more information. """ def __init__(self, *args, **kwargs): RpcHandler.__init__(self) class RPCHandler(object): """Handles rpc connection to the remote cc-server.""" def __init__(self, loop): self.loop = loop self.watcher = loop.loop.io(self.cb) self.timer = loop.loop.timer(0, 5, self.timeout) self.tags = dict((t.name, t) for t in DEFAULT_TAGS) self.connection = None self.proxy = None def get_tags(self, tags=None, noresolve_tags=None): """Method used from the ``cc-server`` to get tags. # configure socket self.sock = None self.sock.setblocking(0) :param iterable tags: list of tags to return :param iterable noresolve_tags: list of tags to not return """ return get_tags(self.tags, tags, noresolve_tags) def cb(self, watcher, revents): if revents & pyev.EV_WRITE: self.loop.connection = RpcConnection( sock=ssl.wrap_socket(self.sock, ssl_version=ssl.PROTOCOL_TLSv1), loop=self.loop.loop, ) self.loop.proxy = ConnectionProxy(self.loop.connection) def set_tag(self, tag): self.tags[tag.name] = tag def timeout(self, watcher, revents): self.sock.close() def remove_tag(self, tag_name): self.tags.pop(tag_name, None) def run(self): self.watcher.start() self.timeout.start() self.connection = RpcConnection.from_addr_ssl( addr=self.loop.config.server_host, port=self.loop.config.server_port, handler=self.loop.handlers, ) self.proxy = ConnectionProxy(self.connection) #self.loop. class AuthHandler(object): Loading Loading @@ -143,12 +154,40 @@ class MainLoop(object): # role self.role = None # tag database self.tag_db = dict( __main__=dict((t.name, t) for t in DEFAULT_TAGS), # db for main objects # other sub-objects db can go here (for example VMs) ) # handlers self.rpc_handler = dict( get_tags=partial(get_tags, self.tag_db['__main__']), sub_tags=self.sub_tags, ) # RPC handlers definitions def sub_tags(self, sub_id, tags=None, noresolve_tags=None): if sub_id == '__main__': # FIXME should we raise ? logger.debug('Invalid request for sub object') return sub_db = self.tag_db.get(sub_id) if sub_db is None: # FIXME should we also raise here ? logger.debug('Failed to find sub_id %s', sub_id) return return get_tags(sub_db, tags, noresolve_tags) # End RPC handlers definitions def rpc_connect(self): # TODO async and error handling self.rpc_con = RpcConnection.from_addr_ssl( addr=self.config.server_host, port=self.config.server_port, handler=DefaultHandler(), handler=self.rpc_handler, loop=self.loop, ) self.proxy = ConnectionProxy(self.rpc_con) Loading