Newer
Older
from sjrpc.core import RpcConnection
from sjrpc.utils import ConnectionProxy, RpcHandler
from ccnode.tags import Tag, get_tags
logger = logging.getLogger(__name__)
class RPCHandler(object):
"""Handles rpc connection to the remote cc-server."""
def __init__(self, loop):
self.loop = loop
self.watcher = loop.loop.io(self.cb)
self.timer = loop.loop.timer(0, 5, self.timeout)
# configure socket
self.sock = None
self.sock.setblocking(0)
def cb(self, watcher, revents):
if revents & pyev.EV_WRITE:
self.loop.connection = RpcConnection(
sock=ssl.wrap_socket(self.sock, ssl_version=ssl.PROTOCOL_TLSv1),
loop=self.loop.loop,
)
self.loop.proxy = ConnectionProxy(self.loop.connection)
def timeout(self, watcher, revents):
self.sock.close()
def run(self):
self.watcher.start()
self.timeout.start()
self.connection = RpcConnection.from_addr_ssl(
addr=self.loop.config.server_host,
port=self.loop.config.server_port,
handler=self.loop.handlers,
)
self.proxy = ConnectionProxy(self.connection)
#self.loop.
class AuthHandler(object):
"""Handles rpc authentication to the remote cc-server."""
def __init__(self, loop):
self.loop = loop
self.watcher = loop.loop.timer(.5, 5, self.cb)
self.auth_id = None
def cb(self, watcher, revents):
logger.debug('Callback auth')
# check is fallback mode on sjrpc is set otherwise our call would block
# the loop
if not self.loop.rpc_con._event_fallback.is_set():
logger.debug('Will try authentication again latter')
return
if self.auth_id is not None:
logger.error('Authentication is taking longer than expected')
return
# try to authenticate
self.auth_id = self.loop.rpc_con.rpc.async_call_cb(
self.cb_auth,
'authentify',
self.loop.config.server_user,
self.loop.config.server_passwd,
def cb_auth(self, call_id, response=None, error=None):
assert call_id == self.auth_id
if error is not None:
# we got an error
logger.error('Error while authenticating with cc-server: %s("%s")',
error['exception'], error.get('message', ''))
# set handler according to which role was returned by the cc-server
from ccnode.host import Handler as HostHandler
from ccnode.hypervisor import Handler as HypervisorHandler
self.loop.role = HypervisorHandler(
hypervisor_name=self.loop.config.server_user,
logger.error('Failed authentication, role returned: %s', response)
self.loop.role = None
self.auth_id = None
return
if self.loop.role is not None:
self.watcher.stop()
logger.info('Successfully authenticated with role %s', response)
self.auth_id = None
def start(self):
self.watcher.start()
class MainLoop(object):
def __init__(self, config_path):
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
self.config_path = config_path
# set signal watchers
self.signals = {
signal.SIGINT: self.stop,
signal.SIGTERM: self.stop,
signal.SIGUSR1: self.reload,
}
# turn into real watchers
self.signals = dict((
signal,
self.loop.signal(signal, cb),
) for signal, cb in self.signals.iteritems())
# load config variables
self.config = NodeConfigParser(self.config_path)
# configure logging
logging.config.fileConfig(self.config_path)
# rpc connection
self.rpc_con = None
self.auth = AuthHandler(self)
# role
self.role = None
__main__=dict((t.name, t)
for t in DEFAULT_TAGS), # db for main objects
# other sub-objects db can go here (for example VMs)
)
# handlers
self.rpc_handler = dict(
get_tags=partial(get_tags, self.tag_db['__main__']),
sub_tags=self.sub_tags,
)
# RPC handlers definitions
def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
if sub_id == '__main__':
# FIXME should we raise ?
logger.debug('Invalid request for sub object')
return
sub_db = self.tag_db.get(sub_id)
if sub_db is None:
# FIXME should we also raise here ?
logger.debug('Failed to find sub_id %s', sub_id)
return
return get_tags(sub_db, tags, noresolve_tags)
# End RPC handlers definitions
def reset_tag(self, tag):
"""
:param tag: :class:`Tag` to add/replace
"""
# TODO tag register
tag.start(self.loop)
self.tag_db['__main__'][tag.name] = tag
def remove_tag(self, tag_name):
# TODO tag unregister
self.tag_db['__main__'].pop(tag_name).stop()
def reset_sub_tag(self, sub_id, tag):
# TODO tag register
tag.start(self.loop)
self.tag_db[sub_id][tag.name] = tag
def remove_sub_tag(self, sub_id, tag_name):
# TODO tag unregister
self.tag_db[sub_id].pop(tag_name).stop()
def remove_sub_object(self, sub_id):
for tag in self.tag_db.pop(sub_id, {}).itervalues():
tag.stop()
def register_plugin(self, plugin):
# keep track of registered plugins
if plugin in self.registered_plugins:
raise PluginError('Plugin was already registered')
self.registered_plugins.add(plugin)
# register tags
for name, sub_db in plugin.tag_db.iteritems():
self.tag_db[name].update(sub_db)
# register handler
self.rpc_handler.update(plugin.rpc_handler)
def unregister_plugin(self, plugin):
try:
self.registered_plugins.remove(plugin)
except KeyError:
raise PluginError('Plugin was not registered, cannot remove')
# remove tags
for db_name, tag_db in plugin.tag_db:
for tag_name in tag_db:
del self.tag_db[db_name][tag_name]
if not self.tag_db[db_name]: # if there's no more tag in the db
del self.tag_db[db_name]
# remove handlers
for handler_name in plugin.rpc_handler:
del self.rpc_handler[handler_name]
plugin.stop()
def rpc_connect(self):
# TODO async and error handling
self.rpc_con = RpcConnection.from_addr_ssl(
addr=self.config.server_host,
port=self.config.server_port,
loop=self.loop,
)
self.proxy = ConnectionProxy(self.rpc_con)
def start(self):
for signal in self.signals.itervalues():
signal.start()
logger.debug('About to connect')
self.rpc_connect()
self.auth.start()
logger.debug('About to start ev_loop')
self.loop.start()
def stop(self, watcher=None, revents=None):
logger.info('Exiting node...')
if self.rpc_con is not None:
self.rpc_con.shutdown()
# close all plugins
for plugin in self.registered_plugins:
plugin.stop()
self.registered_plugins = set()
self.role = None
self.loop.stop()
def reload(self, watcher=None, revents=None):
logger.info(u'Reloading logging configuration...')
logging.config.fileConfig(self.config_path)