Newer
Older
from sjrpc.core import RpcConnection
from sjrpc.utils import ConnectionProxy, RpcHandler
from ccnode.tags import Tag, get_tags
logger = logging.getLogger(__name__)
class RPCHandler(object):
"""Handles rpc connection to the remote cc-server."""
def __init__(self, loop):
self.loop = loop
self.watcher = loop.loop.io(self.cb)
self.timer = loop.loop.timer(0, 5, self.timeout)
# configure socket
self.sock = None
self.sock.setblocking(0)
def cb(self, watcher, revents):
if revents & pyev.EV_WRITE:
self.loop.connection = RpcConnection(
sock=ssl.wrap_socket(self.sock, ssl_version=ssl.PROTOCOL_TLSv1),
loop=self.loop.loop,
)
self.loop.proxy = ConnectionProxy(self.loop.connection)
def timeout(self, watcher, revents):
self.sock.close()
def run(self):
self.watcher.start()
self.timeout.start()
self.connection = RpcConnection.from_addr_ssl(
addr=self.loop.config.server_host,
port=self.loop.config.server_port,
handler=self.loop.handlers,
)
self.proxy = ConnectionProxy(self.connection)
#self.loop.
class AuthHandler(object):
"""Handles rpc authentication to the remote cc-server."""
def __init__(self, loop):
self.loop = loop
self.watcher = loop.loop.timer(.5, 5, self.cb)
self.auth_id = None
def cb(self, watcher, revents):
logger.debug('Callback auth')
# check is fallback mode on sjrpc is set otherwise our call would block
# the loop
if not self.loop.rpc_con._event_fallback.is_set():
logger.debug('Will try authentication again latter')
return
if self.auth_id is not None:
logger.error('Authentication is taking longer than expected')
return
# try to authenticate
self.auth_id = self.loop.rpc_con.rpc.async_call_cb(
self.cb_auth,
'authentify',
self.loop.config.server_user,
self.loop.config.server_passwd,
def cb_auth(self, call_id, response=None, error=None):
assert call_id == self.auth_id
if error is not None:
# we got an error
logger.error('Error while authenticating with cc-server: %s("%s")',
error['exception'], error.get('message', ''))
# try to recconnect in 5 seconds
return
# set handler according to which role was returned by the cc-server
from ccnode.host import Handler as HostHandler
self.loop.rpc_con.rpc.set_handler(HostHandler())
self.loop.role = u'host'
elif response == u'hv':
from ccnode.hypervisor import Handler as HypervisorHandler
self.loop.rpc_con.rpc.set_handler(HypervisorHandler(
proxy=self.loop.proxy,
hypervisor_name=self.loop.config.server_user))
self.loop.role = u'hv'
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
logger.info('Failed authentication, role returned: %s', response)
self.loop.role = None
self.auth_id = None
return
if self.loop.role is not None:
self.watcher.stop()
logger.info('Successfully authenticated with role %s', response)
self.auth_id = None
def start(self):
self.watcher.start()
class MainLoop(object):
def __init__(self, config_path):
self.loop = pyev.default_loop()
self.config_path = config_path
# set signal watchers
self.signals = {
signal.SIGINT: self.stop,
signal.SIGTERM: self.stop,
signal.SIGUSR1: self.reload,
}
# turn into real watchers
self.signals = dict((
signal,
self.loop.signal(signal, cb),
) for signal, cb in self.signals.iteritems())
# load config variables
self.config = NodeConfigParser(self.config_path)
# configure logging
logging.config.fileConfig(self.config_path)
# rpc connection
self.rpc_con = None
self.auth = AuthHandler(self)
# role
self.role = None
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# tag database
self.tag_db = dict(
__main__=dict((t.name, t)
for t in DEFAULT_TAGS), # db for main objects
# other sub-objects db can go here (for example VMs)
)
# handlers
self.rpc_handler = dict(
get_tags=partial(get_tags, self.tag_db['__main__']),
sub_tags=self.sub_tags,
)
# RPC handlers definitions
def sub_tags(self, sub_id, tags=None, noresolve_tags=None):
if sub_id == '__main__':
# FIXME should we raise ?
logger.debug('Invalid request for sub object')
return
sub_db = self.tag_db.get(sub_id)
if sub_db is None:
# FIXME should we also raise here ?
logger.debug('Failed to find sub_id %s', sub_id)
return
return get_tags(sub_db, tags, noresolve_tags)
# End RPC handlers definitions
def rpc_connect(self):
# TODO async and error handling
self.rpc_con = RpcConnection.from_addr_ssl(
addr=self.config.server_host,
port=self.config.server_port,
loop=self.loop,
)
self.proxy = ConnectionProxy(self.rpc_con)
def start(self):
for signal in self.signals.itervalues():
signal.start()
logger.debug('About to connect')
self.rpc_connect()
self.auth.start()
logger.debug('About to start ev_loop')
self.loop.start()
def stop(self, watcher=None, revents=None):
logger.info('Exiting node...')
if self.rpc_con is not None:
self.rpc_con.shutdown()
self.loop.stop()
def reload(self, watcher=None, revents=None):
logger.info(u'Reloading logging configuration...')
logging.config.fileConfig(self.config_path)