| 105 | return cls(scanners=PREDEFINED_USE_CASES[usecase]) |
| 106 | |
| 107 | def scan( |
| 108 | self, |
| 109 | input: Message, |
| 110 | trace: Trace | None = None, |
| 111 | ) -> ScanResult: |
| 112 | scanners = self.scanners.get(input.role, []) |
| 113 | reasons = [] |
| 114 | decisions = {} |
| 115 | last_reason = "" |
| 116 | for scanner_type in scanners: |
| 117 | scanner_instance = create_scanner(scanner_type) |
| 118 | LOG.debug( |
| 119 | f"[LlamaFirewall] Scanning with {scanner_instance.name}, for the input {str(input.content)[:20]}" |
| 120 | ) |
| 121 | scanner_result = asyncio.run(scanner_instance.scan(input, trace)) |
| 122 | reasons.append( |
| 123 | f"{scanner_type}: {scanner_result.reason.strip()} - score: {scanner_result.score}" |
| 124 | ) |
| 125 | last_reason = scanner_result.reason.strip() |
| 126 | |
| 127 | # Record the highest score for each found ScanDecision |
| 128 | decisions[scanner_result.decision] = max( |
| 129 | scanner_result.score, |
| 130 | decisions.get(scanner_result.decision, scanner_result.score), |
| 131 | ) |
| 132 | |
| 133 | if len(scanners) == 1: |
| 134 | return ScanResult( |
| 135 | decision=list(decisions.keys())[0], |
| 136 | reason=last_reason, |
| 137 | score=list(decisions.values())[0], |
| 138 | status=ScanStatus.SUCCESS, |
| 139 | ) |
| 140 | |
| 141 | formatted_reasons = "; ".join(set(reasons)) |
| 142 | # Select BLOCK as the final decision if present in the list, |
| 143 | # otherwise select the decision with the highest score |
| 144 | final_decision = ( |
| 145 | ScanDecision.BLOCK |
| 146 | if ScanDecision.BLOCK in decisions.keys() |
| 147 | else ( |
| 148 | max(decisions, key=decisions.get) if decisions else ScanDecision.ALLOW |
| 149 | ) |
| 150 | ) |
| 151 | final_score = ( |
| 152 | decisions[ScanDecision.BLOCK] |
| 153 | if ScanDecision.BLOCK in decisions.keys() |
| 154 | else max(list(decisions.values()) + [0.0]) |
| 155 | ) |
| 156 | |
| 157 | return ScanResult( |
| 158 | decision=final_decision, |
| 159 | reason=formatted_reasons if formatted_reasons else "default", |
| 160 | score=final_score, |
| 161 | status=ScanStatus.SUCCESS, |
| 162 | ) |
| 163 | |
| 164 | async def scan_async( |