Skip to content
Snippets Groups Projects
Commit 88d91693 authored by Antoine Millet's avatar Antoine Millet
Browse files

Refactored client and associated rpc handlers

Created a class for each client role
Moved handled in same module of the client role class
Fixed handlers to be compatible with new object db.
parent 6866920b
No related branches found
No related tags found
No related merge requests found
......@@ -6,20 +6,39 @@ from __future__ import absolute_import
import logging
from fnmatch import fnmatch as glob
from functools import partial
from sjrpc.server import SSLRpcServer
from sjrpc.utils import RpcHandler, pass_connection
from ccserver.handlers import WelcomeHandler
from ccserver.conf import CCConf
from ccserver.client import CCClient
from ccserver.exceptions import AlreadyRegistered, NotConnectedAccountError
from ccserver.exceptions import AlreadyRegistered, NotConnectedAccountError, AuthenticationError, BadRoleError
from ccserver.jobs import JobsManager
from ccserver.clients import Client
from cloudcontrol.common.tql.db.object import TqlObject
from cloudcontrol.common.tql.db.tag import StaticTag, CallbackTag
#from cloudcontrol.common.tql.db.object import TqlObject
from ccserver.db import SObject
from cloudcontrol.common.tql.db.tag import StaticTag
from cloudcontrol.common.tql.db.db import TqlDatabase
# Import all enabled roles
import ccserver.clients.cli
import ccserver.clients.host
import ccserver.clients.hv
import ccserver.clients.bootstrap
import ccserver.clients.spv
class WelcomeHandler(RpcHandler):
""" Default handler used on client connections of the server.
"""
def __init__(self, server):
self._server = server
@pass_connection
def authentify(self, conn, login, password):
return self._server.authenticate(conn, login, password)
class CCServer(object):
'''
......@@ -32,7 +51,7 @@ class CCServer(object):
:param port: the port to bind
'''
LISTEN_BACKLOG = 5
LISTEN_BACKLOG = 50
# These tags are reserved and cannot be setted by an user:
RESERVED_TAGS = ('id', 'a', 'r', 'close', 'con', 'ip', 'p')
......@@ -40,9 +59,8 @@ class CCServer(object):
def __init__(self, conf_dir, maxcon, maxidle, certfile=None, keyfile=None,
address='0.0.0.0', port=1984):
# Dict containing all connected accounts, the key is the login of
# the account and the value the :class:`RpcConnection` of the peer:
self._connected = {}
self._clients = {} # Clients connected to the server
# The interface object to the configuration directory:
self.conf = CCConf(conf_dir)
# Some settings:
......@@ -80,9 +98,12 @@ class CCServer(object):
for login in to_register:
conf = self.conf.show(login)
obj = TqlObject(login)
obj.register(StaticTag('r', conf['role']))
obj.register(StaticTag('a', login))
obj = SObject(login)
obj.register(StaticTag('r', conf['role']), override=True)
obj.register(StaticTag('a', login), override=True)
# Register static tags:
for tag, value in self.conf.show(login)['tags'].iteritems():
obj.register(StaticTag(tag, value), override=True)
self.db.register(obj)
for login in to_unregister:
......@@ -96,10 +117,66 @@ class CCServer(object):
:param role: role to filter
'''
for login, client in self._connected.items():
for login, client in self._clients.items():
if role is None or client.role == role:
yield client
def authenticate(self, conn, login, password):
""" Authenticate a client against provided login and password.
If the authentication is a success, register the client on the server
and return the client role, else, raise an exception.
"""
logmsg = 'Authentication error from %s: '
with self.conf:
try:
role = self.conf.authentify(login, password)
except CCConf.UnknownAccount:
raise AuthenticationError('Unknown login')
else:
if 'close' in self.conf.show(login)['tags']:
logging.warning(logmsg + 'account closed (%s)', conn.getpeername(), login)
raise AuthenticationError('Account is closed')
if role is None:
logging.warning(logmsg + 'bad login/password (%s)', conn.getpeername(), login)
raise AuthenticationError('Bad login/password')
else:
if role not in Client.roles:
logging.warning(logmsg + 'bad role in account config (%s)', conn.getpeername(), login)
raise BadRoleError('%r is not a legal role' % role)
create_object = False
# If authentication is a success, try to register the client:
if role == 'bootstrap':
# Set a bootstrap id for the object:
login = '%s.%s' % (login, conn.get_fd())
# Real role of the node will be host:
role = 'host'
create_object = True
# Try to register the client:
for _ in xrange(5):
try:
self.register(login, role, conn, create_object)
except AlreadyRegistered:
if role == 'cli':
try:
self.kill(login)
except NotConnectedAccountError:
pass
else:
break
else:
logging.warning(logmsg + 'already connected (%s)', conn.getpeername(), login)
raise AuthenticationError('Already connected')
logging.info('Authentication success from %s with login %s', conn.getpeername(), login)
return role
def register(self, login, role, connection, create_object=False):
'''
Register a new connected account on the server.
......@@ -110,94 +187,26 @@ class CCServer(object):
:param create_object: create the object on objectdb
'''
if login in self._connected:
raise AlreadyRegistered('A client is already connected with this '
'account.')
else:
self._connected[login] = CCClient(login, role, self, connection)
# Create the object on objectdb if required:
if create_object:
obj = TqlObject(login)
obj.register(StaticTag('r', role))
self.db.register(obj)
tql_object = SObject(login)
tql_object.register(StaticTag('r', role))
self.db.register(tql_object)
else:
obj = self.db.get(login)
assert obj is not None
# Define server defined tags for the new node:
obj.register(CallbackTag('con', self._connected[login].get_uptime, ttl=0))
obj.register(CallbackTag('ip', self._connected[login].get_ip))
def unregister(self, connection):
'''
Unregister an already connected account.
:param connection: the connection of the client to unregister
'''
#XXX: not functional since new db!!!
client = self.search_client_by_connection(connection)
tql_object = self.db.get(login)
assert tql_object is not None
# Unregister objects from database if it have no account attached:
obj = self.db.get(client.login)
if obj is not None and 'a' not in obj:
self.db.unregister(obj)
# Register the client:
if login in self._clients:
raise AlreadyRegistered('A client is already connected with this account.')
else:
# Unregister tags of connected account:
obj.unregister()
if client.login in self._connected:
del self._connected[client.login]
#self.objects.unregister_children(client.login)
def sub_register(self, parent, name, role):
'''
Register a new node supervised by a parent.
:param parent: the parent login of the subnode
:param login: the name of the subnode
:param role: the role of the subnode
'''
client = self.get_connection(parent)
child = '%s.%s' % (parent, name)
client.register_child(child)
# Register the children in the tags database:
obj = TqlObject(child)
obj.register(StaticTag('r', role))
obj.register(StaticTag('p', client))
self.db.register(obj)
def sub_unregister(self, parent, name):
'''
Unregister a node supervised by a parent.
:param parent: the parent of the subnode
:param login: the name of the subnode
'''
client = self.get_connection(parent)
child = '%s.%s' % (parent, name)
client.unregister_child(child)
# Unregister the children from the tags database:
self.objects.unregister(child)
def search_client_by_connection(self, connection):
'''
Search a connected client by it connection. If no client is found,
return None.
:param connection: the connection of the client to search
:return: the found client or None
'''
client = Client.from_role(role, login, self, connection, tql_object)
self._clients[login] = client
for client in self._connected.values():
if client.connection is connection:
return client
else:
return None
def unregister(self, client):
""" Unregister a client.
"""
del self._clients[client.login]
def run(self):
'''
......@@ -214,15 +223,13 @@ class CCServer(object):
logging.debug('Running rpc mainloop')
self.rpc.run()
def get_connection(self, login):
'''
Get the connection of a connecter account login.
def get_client(self, login):
""" Get a connected client by its login.
:param login: login of the connection to get
:return: :class:`RpcConnection` instance of the peer connection
'''
return self._connected[login]
:return: the client instance
"""
return self._clients[login]
def kill(self, login):
'''
......@@ -233,7 +240,7 @@ class CCServer(object):
if account doesn't exists).
'''
client = self._connected.get(login)
client = self._clients.get(login)
if client is None:
raise NotConnectedAccountError('The account %s is not '
'connected' % login)
......@@ -263,63 +270,6 @@ class CCServer(object):
return right['target'] == 'allow'
return False
def tags_register(self, login, name, ttl=None, value=None):
'''
Register a new tag for a client.
:param login: login of the client
:param name: name of the tag to register
:param ttl: TTL of the tag if applicable (None = no TTL, the tag will
never expire)
:param value: value of the tag
'''
obj = self.db.get(login)
client = self._connected.get(login)
callback = partial(client.get_remote_tags, name)
tag = CallbackTag(name, callback, ttl=ttl)
obj.register(tag)
def tags_unregister(self, login, name):
'''
Unregister a tag for the client.
:param login: login of the client
:param name: name of the tag to unregister
'''
obj = self.db.get(login)
obj.unregister(name)
def tags_drop(self, login, name):
'''
Drop the cached value of a tag for the client.
:param login: login of the client
:param name: name of the tag to drop
'''
obj = self.db.get(login)
tag = obj.get(name)
if tag is not None:
tag.invalidate()
def tags_update(self, login, name, value=None, ttl=None):
'''
Update a tag.
:param login: login of the client
:param name: name of the tag to update
:param value: new value of the tag
:param ttl: new ttl of the tag
'''
obj = self.db.get(login)
tag = obj.get(name)
if tag is not None:
if value is not None:
tag.set_value(value)
if ttl is not None:
tag.ttl = ttl
def list(self, query, show=None):
self._update_accounts()
return self.db.query(query, show)
""" Connected client management package.
This package store classes representing each client's role and the associated
sjRPC handler.
"""
import logging
from functools import partial
from datetime import datetime
from cloudcontrol.common.tql.db.tag import CallbackTag
from ccserver.handlers import CCHandler, listed
from ccserver.exceptions import RightError
class RegisteredCCHandler(CCHandler):
""" Basic handler for all registered clients.
"""
def __getitem__(self, name):
self.client.top()
return super(RegisteredCCHandler, self).__getitem__(name)
def on_disconnect(self, conn):
logging.info('Client %s disconnected', self.client.login)
self.client.shutdown()
def check(self, method, tql=None):
""" Check if the client have access to this method.
"""
allow = self.client.server.check(self.client.login, method, tql)
if not allow:
raise RightError('You are not allowed to do this action.')
#
# Tags registration handler functions:
#
@listed
def tags_register(self, name, ttl=None, value=None):
""" Register a new tag on the calling node.
:param name: name of the tag to register
:param ttl: ttl of the tag (or None if not applicable)
:param value: value to fill the tag (optionnal)
"""
self.client.tags_register(name, ttl, value)
@listed
def tags_unregister(self, name):
""" Unregister a tag on the calling node.
:param name: name of the tag to unregister
"""
self.client.tags_unregister(name)
@listed
def tags_drop(self, name):
""" Drop the tag value of the specified tag on the calling node.
:param name: name of the tag to drop
"""
self.client.tags_drop(name)
@listed
def tags_update(self, name, value, ttl=None):
""" Update the value of the specified tag on the calling node.
:param name: name of the tag to update
:param value: new tag value
:param ttl: new ttl value
"""
self.client.tags_update(name, value, ttl)
class Client(object):
""" Base class for all types cc-server clients.
:param login: login of the client
:param server: server instance
:param connection: rpc connection to the client
"""
ROLE = None
RPC_HANDLER = RegisteredCCHandler
roles = {}
def __init__(self, login, server, connection, tql_object):
self._login = login
self._server = server
self._connection = connection
self._tql_object = tql_object
self._handler = self.RPC_HANDLER(self)
self._last_action = datetime.now()
self._connection_date = datetime.now()
# Set the role's handler for the client:
self._connection.rpc.set_handler(self._handler)
# Remote tags registered:
self._remote_tags = set()
# Register the server defined client tags:
self._tql_object.register(CallbackTag('con', lambda: self.uptime, ttl=0))
self._tql_object.register(CallbackTag('idle', lambda: self.idle, ttl=0))
self._tql_object.register(CallbackTag('ip', lambda: self.ip))
@classmethod
def register_client_class(cls, class_):
""" Register a new client class.
"""
cls.roles[class_.ROLE] = class_
@classmethod
def from_role(cls, role, login, server, connection, tql_object):
return cls.roles[role](login, server, connection, tql_object)
#
# Properties
#
@property
def login(self):
""" Return the login of this client.
"""
return self._login
@property
def server(self):
""" Return the cc-server binded to this client.
"""
return self._server
@property
def conn(self):
""" Return the sjrpc connection to the client.
"""
return self._connection
@property
def uptime(self):
""" Get the uptime of the client connection in seconds.
:return: uptime of the client
"""
dt = datetime.now() - self._connection_date
return dt.seconds + dt.days * 86400
@property
def idle(self):
""" Get the idle time of the client connection in seconds.
:return: idle of the client
"""
dt = datetime.now() - self._last_action
return dt.seconds + dt.days * 86400
@property
def ip(self):
""" Get client remote ip address.
"""
peer = self.conn.getpeername()
return ':'.join(peer.split(':')[:-1])
def get_tags(self, tags):
""" Get tags on the remote node.
:param tags: tags is the list of tags to fetch
"""
return self._connection.call('get_tags', tags)
def shutdown(self):
""" Shutdown the connection to the client.
"""
# Unregister all remote tags:
for tag in self._remote_tags.copy():
self.tags_unregister(tag)
# Unrefister all server defined client tags:
self._tql_object.unregister('con')
self._tql_object.unregister('idle')
self._tql_object.unregister('ip')
self._server.rpc.unregister(self.conn, shutdown=True)
self._server.unregister(self)
def top(self):
""" Reset the "last action" date to now.
"""
self._last_action = datetime.now()
def get_remote_tags(self, tag):
return self.conn.call('get_tags', (tag,))[tag]
def tags_register(self, name, ttl=None, value=None):
""" Register a new remote tag for the client.
:param name: name of the tag to register
:param ttl: TTL of the tag if applicable (None = no TTL, the tag will
never expire)
:param value: value of the tag
"""
callback = partial(self.get_remote_tags, name)
tag = CallbackTag(name, callback, ttl=ttl)
self._tql_object.register(tag)
self._remote_tags.add(name)
def tags_unregister(self, name):
"""
Unregister a remote tag for the client.
:param name: name of the tag to unregister
"""
self._tql_object.unregister(name)
self._remote_tags.discard(name)
def tags_drop(self, name):
""" Drop the cached value of a remote tag for the client.
:param name: name of the tag to drop
"""
tag = self._tql_object.get(name)
if tag is not None:
tag.invalidate()
def tags_update(self, name, value=None, ttl=None):
""" Update a remote tag.
:param name: name of the tag to update
:param value: new value of the tag
:param ttl: new ttl of the tag
"""
tag = self._tql_object.get(name)
if tag is not None:
if value is not None:
tag.set_value(value)
if ttl is not None:
tag.ttl = ttl
from ccserver.clients import Client
class BootstrapClient(Client):
""" A bootstrap client connected to the cc-server.
"""
ROLE = 'bootstrap'
Client.register_client_class(BootstrapClient)
This diff is collapsed.
from ccserver.clients import Client
class HostClient(Client):
""" A host client connected to the cc-server.
"""
ROLE = 'host'
Client.register_client_class(HostClient)
from ccserver.handlers import listed
from ccserver.clients import Client, RegisteredCCHandler
from cloudcontrol.common.tql.db.object import TqlObject
from cloudcontrol.common.tql.db.tag import StaticTag, CallbackTag
from functools import partial
class HypervisorHandler(RegisteredCCHandler):
""" Handler binded to an hv client.
"""
@listed
def register(self, obj_id, role):
'''
Register an object managed by the calling node.
.. note:
the obj_id argument passed to this handler is the object id of the
registered object (not the fully qualified id, the server will
preprend the id by "node_id." itself).
:param obj_id: the id of the object to register
:param role: the role of the object to register
'''
self.client.register(obj_id, role)
@listed
def unregister(self, obj_id):
'''
Unregister an object managed by the calling node.
.. note:
the obj_id argument passed to this handler is the object id of the
unregistered object (not the fully qualified id, the server will
preprend the id by "node_id." itself).
:param obj_id: the id of the object to unregister
'''
self.client.unregister(obj_id)
@listed
def sub_tags_register(self, obj_id, name, ttl=None, value=None):
""" Register a new remote tag for a child of the client.
:param obj_id: child name
:param name: name of the tag to register
:param ttl: TTL of the tag if applicable (None = no TTL, the tag will
never expire)
:param value: value of the tag
"""
self.client.sub_tags_register(obj_id, name, ttl, value)
@listed
def sub_tags_unregister(self, obj_id, name):
"""
Unregister a remote tag for a child of the client.
:param obj_id: child name
:param name: name of the tag to unregister
"""
self.client.sub_tags_unregister(obj_id, name)
@listed
def sub_tags_drop(self, obj_id, name):
""" Drop the cached value of a remote tag for a child of the client.
:param obj_id: child name
:param name: name of the tag to drop
"""
self.client.sub_tags_drop(obj_id, name)
@listed
def sub_tags_update(self, obj_id, name, value=None, ttl=None):
""" Update a remote tag for a child of the client.
:param obj_id: child name
:param name: name of the tag to update
:param value: new value of the tag
:param ttl: new ttl of the tag
"""
self.client.sub_tags_update(obj_id, name, value, ttl)
class HvClient(Client):
""" A hv client connected to the cc-server.
"""
ROLE = 'hv'
RPC_HANDLER = HypervisorHandler
def __init__(self, *args, **kwargs):
super(HvClient, self).__init__(*args, **kwargs)
self._children = {}
#
# Children specific methods:
#
def shutdown(self):
# Unregister all children:
for child in self._children.copy():
self.unregister(child)
super(HvClient, self).shutdown()
def get_child_remote_tags(self, obj_id, tag):
return self.conn.call('sub_tags', obj_id, (tag,))[tag]
def register(self, obj_id, role):
""" Register a new child.
"""
child = '%s.%s' % (self.login, obj_id)
# Register the children in the tags database:
obj = TqlObject(child)
obj.register(StaticTag('r', role))
obj.register(StaticTag('p', self.login))
self._server.db.register(obj)
self._children[obj_id] = obj
def unregister(self, obj_id):
""" Unregister a child.
"""
child = '%s.%s' % (self.login, obj_id)
del self._children[obj_id]
# Unregister the children from the tags database:
self._server.db.unregister(child)
def vm_action(self, action, vms):
return self.conn.call(action, vms)
def sub_tags_register(self, obj_id, name, ttl=None, value=None):
""" Register a new remote tag for a child of the client.
:param obj_id: child name
:param name: name of the tag to register
:param ttl: TTL of the tag if applicable (None = no TTL, the tag will
never expire)
:param value: value of the tag
"""
callback = partial(self.get_child_remote_tags, obj_id, name)
tag = CallbackTag(name, callback, ttl=ttl)
self._children[obj_id].register(tag)
def sub_tags_unregister(self, obj_id, name):
"""
Unregister a remote tag for a child of the client.
:param obj_id: child name
:param name: name of the tag to unregister
"""
self._children[obj_id].unregister(name)
def sub_tags_drop(self, obj_id, name):
""" Drop the cached value of a remote tag for a child of the client.
:param obj_id: child name
:param name: name of the tag to drop
"""
tag = self._children[obj_id].get(name)
if tag is not None:
tag.invalidate()
def sub_tags_update(self, obj_id, name, value=None, ttl=None):
""" Update a remote tag for a child of the client.
:param obj_id: child name
:param name: name of the tag to update
:param value: new value of the tag
:param ttl: new ttl of the tag
"""
tag = self._children[obj_id].get(name)
if tag is not None:
if value is not None:
tag.set_value(value)
if ttl is not None:
tag.ttl = ttl
Client.register_client_class(HvClient)
import logging
from ccserver.clients import Client, RegisteredCCHandler
from ccserver.handlers import listed
from ccserver.utils import OrderedSet
class SpvHandler(RegisteredCCHandler):
""" Handler binded to 'spv' role.
"""
@listed
def list(self, query):
""" List all objects registered on this instance.
:param query: the query to select objects to show
"""
logging.debug('Executed list function with query %s', query)
objects = self.server.list(query)
return {'objects': objects}
class SpvClient(Client):
""" A spv client connected to the cc-server.
"""
ROLE = 'spv'
RPC_HANDLER = SpvHandler
Client.register_client_class(SpvClient)
""" This module contains some cc-server specific helpers for the CloudControl
tags database provided in cc-commons.
"""
from cloudcontrol.common.tql.db.tag import BaseTag
from cloudcontrol.common.tql.db.object import TqlObject
class SObject(TqlObject):
""" A TQL object with features specific to cc-server.
"""
def register(self, tag, override=False):
""" Register a tag on this object (or override).
"""
if override:
self._tags[tag.name] = OverrideTag(tag.name, tag, self._tags.get(tag.name))
else:
return super(SObject, self).register(tag)
def unregister(self, name, override=False):
""" Unregister a tag on this object (or remove override).
"""
if override:
if isinstance(self._tags.get(name), OverrideTag):
if self._tags[name].replaced is not None:
self._tags[name] = self._tags[name].replaced
else:
super(SObject, self).unregister(name)
else:
if isinstance(self._tags.get(name), OverrideTag):
self._tags[name].replaced = None
else:
super(SObject, self).unregister(name)
def is_overriding(self, name):
""" Return True if a tag is overriding another one for the name.
If the tag is not found, False is returned.
"""
return isinstance(self._tags.get(name), OverrideTag)
class OverrideTag(BaseTag):
""" A tag to override another one.
"""
def __init__(self, name, override, replaced=None):
super(OverrideTag, self).__init__(name)
self.override = override
self.replaced = replaced
def get_value(self):
return self.override.get_value()
def set_value(self, value):
return self.override.set_value(value)
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment