Newer
Older
from sjrpc.core import RpcConnection
from sjrpc.utils import ConnectionProxy, RpcHandler
from ccnode.tags import Tag, get_tags
logger = logging.getLogger(__name__)
"""Base handler for :class:`Node` objects. Containing only a ``version``
tag that returns current ``cc-node`` version.
See `sjRpc documentation <http://google.fr>`_ for more information.
"""
self.tags = dict((t.name, t) for t in DEFAULT_TAGS)
def get_tags(self, tags=None, noresolve_tags=None):
"""Method used from the ``cc-server`` to get tags.
:param iterable tags: list of tags to return
:param iterable noresolve_tags: list of tags to not return
"""
def set_tag(self, tag):
self.tags[tag.name] = tag
def remove_tag(self, tag_name):
self.tags.pop(tag_name, None)
class AuthHandler(object):
"""Handles rpc authentication to the remote cc-server."""
def __init__(self, loop):
self.loop = loop
self.watcher = loop.loop.timer(.5, 5, self.cb)
self.auth_id = None
def cb(self, watcher, revents):
logger.debug('Callback auth')
# check is fallback mode on sjrpc is set otherwise our call would block
# the loop
if not self.loop.rpc_con._event_fallback.is_set():
logger.debug('Will try authentication again latter')
return
if self.auth_id is not None:
logger.error('Authentication is taking longer than expected')
return
# try to authenticate
self.auth_id = self.loop.rpc_con.rpc.async_call_cb(
self.cb_auth,
'authentify',
self.loop.config.server_user,
self.loop.config.server_passwd,
def cb_auth(self, call_id, response=None, error=None):
assert call_id == self.auth_id
if error is not None:
# we got an error
logger.error('Error while authenticating with cc-server: %s("%s")',
error['exception'], error.get('message', ''))
# try to recconnect in 5 seconds
return
# set handler according to which role was returned by the cc-server
from ccnode.host import Handler as HostHandler
self.loop.rpc_con.rpc.set_handler(HostHandler())
self.loop.role = u'host'
elif response == u'hv':
from ccnode.hypervisor import Handler as HypervisorHandler
self.loop.rpc_con.rpc.set_handler(HypervisorHandler(
proxy=self.loop.proxy,
hypervisor_name=self.loop.config.server_user))
self.loop.role = u'hv'
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
logger.info('Failed authentication, role returned: %s', response)
self.loop.role = None
self.auth_id = None
return
if self.loop.role is not None:
self.watcher.stop()
logger.info('Successfully authenticated with role %s', response)
self.auth_id = None
def start(self):
self.watcher.start()
class MainLoop(object):
def __init__(self, config_path):
self.loop = pyev.default_loop()
self.config_path = config_path
# set signal watchers
self.signals = {
signal.SIGINT: self.stop,
signal.SIGTERM: self.stop,
signal.SIGUSR1: self.reload,
}
# turn into real watchers
self.signals = dict((
signal,
self.loop.signal(signal, cb),
) for signal, cb in self.signals.iteritems())
# load config variables
self.config = NodeConfigParser(self.config_path)
# configure logging
logging.config.fileConfig(self.config_path)
# rpc connection
self.rpc_con = None
self.auth = AuthHandler(self)
# role
self.role = None
def rpc_connect(self):
# TODO async and error handling
self.rpc_con = RpcConnection.from_addr_ssl(
addr=self.config.server_host,
port=self.config.server_port,
handler=DefaultHandler(),
loop=self.loop,
)
self.proxy = ConnectionProxy(self.rpc_con)
def start(self):
for signal in self.signals.itervalues():
signal.start()
logger.debug('About to connect')
self.rpc_connect()
self.auth.start()
logger.debug('About to start ev_loop')
self.loop.start()
def stop(self, watcher=None, revents=None):
logger.info('Exiting node...')
if self.rpc_con is not None:
self.rpc_con.shutdown()
self.loop.stop()
def reload(self, watcher=None, revents=None):
logger.info(u'Reloading logging configuration...')
logging.config.fileConfig(self.config_path)