| 9 | |
| 10 | |
| 11 | class RuleSet: |
| 12 | def __init__(self, rules): |
| 13 | self.rules = rules |
| 14 | |
| 15 | async def is_fact_allowed(self, fact): |
| 16 | allowed = True |
| 17 | for rule in await self._applicable_rules(fact): |
| 18 | if await self._is_ip_rule_match(rule, fact): |
| 19 | allowed = await self._rule_judgement(rule.action) |
| 20 | continue |
| 21 | if await self._is_regex_rule_match(rule, fact): |
| 22 | allowed = await self._rule_judgement(rule.action) |
| 23 | return allowed |
| 24 | |
| 25 | async def _applicable_rules(self, fact): |
| 26 | applicable_rules = [] |
| 27 | for rule in self.rules: |
| 28 | if rule.trait == fact.trait: |
| 29 | applicable_rules.append(rule) |
| 30 | return applicable_rules |
| 31 | |
| 32 | async def apply_rules(self, facts): |
| 33 | if await self._has_rules(): |
| 34 | valid_facts = [] |
| 35 | for fact in facts: |
| 36 | if await self.is_fact_allowed(fact): |
| 37 | valid_facts.append(fact) |
| 38 | return [valid_facts] |
| 39 | else: |
| 40 | return [facts] |
| 41 | |
| 42 | async def _has_rules(self): |
| 43 | return len(self.rules) |
| 44 | |
| 45 | @staticmethod |
| 46 | async def _rule_judgement(action): |
| 47 | if action.value == RuleAction.DENY.value: |
| 48 | return False |
| 49 | return True |
| 50 | |
| 51 | @staticmethod |
| 52 | async def _is_ip_network(value): |
| 53 | try: |
| 54 | ipaddress.IPv4Network(value) |
| 55 | return True |
| 56 | except (ValueError, ipaddress.AddressValueError): |
| 57 | pass |
| 58 | return False |
| 59 | |
| 60 | @staticmethod |
| 61 | async def _is_ip_address(value): |
| 62 | try: |
| 63 | ipaddress.IPv4Address(value) |
| 64 | return True |
| 65 | except (ValueError, ipaddress.AddressValueError): |
| 66 | pass |
| 67 | return False |
| 68 |
no outgoing calls
no test coverage detected