MCPcopy
hub / github.com/apache/caldera / RuleSet

Class RuleSet

app/utility/rule_set.py:11–101  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

9
10
11class RuleSet:
12 def __init__(self, rules):
13 self.rules = rules
14
15 async def is_fact_allowed(self, fact):
16 allowed = True
17 for rule in await self._applicable_rules(fact):
18 if await self._is_ip_rule_match(rule, fact):
19 allowed = await self._rule_judgement(rule.action)
20 continue
21 if await self._is_regex_rule_match(rule, fact):
22 allowed = await self._rule_judgement(rule.action)
23 return allowed
24
25 async def _applicable_rules(self, fact):
26 applicable_rules = []
27 for rule in self.rules:
28 if rule.trait == fact.trait:
29 applicable_rules.append(rule)
30 return applicable_rules
31
32 async def apply_rules(self, facts):
33 if await self._has_rules():
34 valid_facts = []
35 for fact in facts:
36 if await self.is_fact_allowed(fact):
37 valid_facts.append(fact)
38 return [valid_facts]
39 else:
40 return [facts]
41
42 async def _has_rules(self):
43 return len(self.rules)
44
45 @staticmethod
46 async def _rule_judgement(action):
47 if action.value == RuleAction.DENY.value:
48 return False
49 return True
50
51 @staticmethod
52 async def _is_ip_network(value):
53 try:
54 ipaddress.IPv4Network(value)
55 return True
56 except (ValueError, ipaddress.AddressValueError):
57 pass
58 return False
59
60 @staticmethod
61 async def _is_ip_address(value):
62 try:
63 ipaddress.IPv4Address(value)
64 return True
65 except (ValueError, ipaddress.AddressValueError):
66 pass
67 return False
68

Callers 2

add_test_variantsMethod · 0.90
TestIPRuleClass · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected