Loading cloudcontrol/server/rights.py 0 → 100644 +126 −0 Original line number Diff line number Diff line """ Rights manager. """ import os import json from threading import Lock from fnmatch import fnmatch class Rule(object): """ Represent a right rule. """ ACCEPT = 1 DENY = 2 __slots__ = ('_match', '_method', '_tql', '_action') def __init__(self, match, method, tql=None, action=ACCEPT): self._match = match self._method = method self._tql = tql self._action = action def __repr__(self): return '<Rule match=%s method=%s tql=%s action=%s>' % (self.match, self.method, self.tql, self.action) @classmethod def from_dict(cls, rule_dict): """ Construct a Rule object from a dict. The dict must contain the following keys: match, method, tql, action """ action = rule_dict['action'] if action == 'accept': action = Rule.ACCEPT elif action == 'deny': action = Rule.DENY else: raise ValueError('Bad action value, must be accept or deny') return cls(rule_dict['match'], rule_dict['method'], rule_dict['tql'], action) def to_dict(self): return {'match': self.match, 'method': self.method, 'tql': self.tql, 'action': {Rule.ACCEPT: 'accept', Rule.DENY: 'deny'}[self.action]} @property def match(self): return self._match @property def method(self): return self._method @property def tql(self): return self._tql @property def action(self): return self._action class RightManager(object): """ Right manager handle persistent storage of rights rules. """ INITIAL_RULES = [Rule('id', '*', 'id', Rule.ACCEPT)] def __init__(self, logger, rules_filename): self.logger = logger self._rules_filename = rules_filename self._rules = RightManager.INITIAL_RULES self._lock = Lock() # Create default ruleset if no file exists: if not os.path.exists(self._rules_filename): self.save() self.logger.info('Created default ruleset') else: self.reload() def export(self): """ Export the current ruleset to list of dict. """ return [r.to_dict() for r in self._rules] def load(self, ruleset, save=True): """ Load a ruleset. """ self.logger.info('Loaded %d rules', len(ruleset)) with self._lock: rules_tmp = [] for rule in ruleset: rules_tmp.append(Rule.from_dict(rule)) self._rules = rules_tmp if save: self.save() def reload(self): """ Reload ruleset from the disk. """ with open(self._rules_filename, 'r') as frules: rules = json.load(frules) self.load(rules['ruleset'], save=False) def save(self): """ Save the current loaded ruleset to disk. """ with open(self._rules_filename, 'w') as frules: json.dump({'ruleset': [r.to_dict() for r in self._rules]}, frules) def iter_rules_method(self, method): """ Iterate over rules matching the specified method. """ for rule in self._rules: if fnmatch(method, rule.method): yield rule Loading
cloudcontrol/server/rights.py 0 → 100644 +126 −0 Original line number Diff line number Diff line """ Rights manager. """ import os import json from threading import Lock from fnmatch import fnmatch class Rule(object): """ Represent a right rule. """ ACCEPT = 1 DENY = 2 __slots__ = ('_match', '_method', '_tql', '_action') def __init__(self, match, method, tql=None, action=ACCEPT): self._match = match self._method = method self._tql = tql self._action = action def __repr__(self): return '<Rule match=%s method=%s tql=%s action=%s>' % (self.match, self.method, self.tql, self.action) @classmethod def from_dict(cls, rule_dict): """ Construct a Rule object from a dict. The dict must contain the following keys: match, method, tql, action """ action = rule_dict['action'] if action == 'accept': action = Rule.ACCEPT elif action == 'deny': action = Rule.DENY else: raise ValueError('Bad action value, must be accept or deny') return cls(rule_dict['match'], rule_dict['method'], rule_dict['tql'], action) def to_dict(self): return {'match': self.match, 'method': self.method, 'tql': self.tql, 'action': {Rule.ACCEPT: 'accept', Rule.DENY: 'deny'}[self.action]} @property def match(self): return self._match @property def method(self): return self._method @property def tql(self): return self._tql @property def action(self): return self._action class RightManager(object): """ Right manager handle persistent storage of rights rules. """ INITIAL_RULES = [Rule('id', '*', 'id', Rule.ACCEPT)] def __init__(self, logger, rules_filename): self.logger = logger self._rules_filename = rules_filename self._rules = RightManager.INITIAL_RULES self._lock = Lock() # Create default ruleset if no file exists: if not os.path.exists(self._rules_filename): self.save() self.logger.info('Created default ruleset') else: self.reload() def export(self): """ Export the current ruleset to list of dict. """ return [r.to_dict() for r in self._rules] def load(self, ruleset, save=True): """ Load a ruleset. """ self.logger.info('Loaded %d rules', len(ruleset)) with self._lock: rules_tmp = [] for rule in ruleset: rules_tmp.append(Rule.from_dict(rule)) self._rules = rules_tmp if save: self.save() def reload(self): """ Reload ruleset from the disk. """ with open(self._rules_filename, 'r') as frules: rules = json.load(frules) self.load(rules['ruleset'], save=False) def save(self): """ Save the current loaded ruleset to disk. """ with open(self._rules_filename, 'w') as frules: json.dump({'ruleset': [r.to_dict() for r in self._rules]}, frules) def iter_rules_method(self, method): """ Iterate over rules matching the specified method. """ for rule in self._rules: if fnmatch(method, rule.method): yield rule