MCPcopy
hub / github.com/meta-llama/PurpleLlama / scan

Method scan

LlamaFirewall/src/llamafirewall/llamafirewall.py:107–162  ·  view source on GitHub ↗
(
        self,
        input: Message,
        trace: Trace | None = None,
    )

Source from the content-addressed store, hash-verified

105 return cls(scanners=PREDEFINED_USE_CASES[usecase])
106
107 def scan(
108 self,
109 input: Message,
110 trace: Trace | None = None,
111 ) -> ScanResult:
112 scanners = self.scanners.get(input.role, [])
113 reasons = []
114 decisions = {}
115 last_reason = ""
116 for scanner_type in scanners:
117 scanner_instance = create_scanner(scanner_type)
118 LOG.debug(
119 f"[LlamaFirewall] Scanning with {scanner_instance.name}, for the input {str(input.content)[:20]}"
120 )
121 scanner_result = asyncio.run(scanner_instance.scan(input, trace))
122 reasons.append(
123 f"{scanner_type}: {scanner_result.reason.strip()} - score: {scanner_result.score}"
124 )
125 last_reason = scanner_result.reason.strip()
126
127 # Record the highest score for each found ScanDecision
128 decisions[scanner_result.decision] = max(
129 scanner_result.score,
130 decisions.get(scanner_result.decision, scanner_result.score),
131 )
132
133 if len(scanners) == 1:
134 return ScanResult(
135 decision=list(decisions.keys())[0],
136 reason=last_reason,
137 score=list(decisions.values())[0],
138 status=ScanStatus.SUCCESS,
139 )
140
141 formatted_reasons = "; ".join(set(reasons))
142 # Select BLOCK as the final decision if present in the list,
143 # otherwise select the decision with the highest score
144 final_decision = (
145 ScanDecision.BLOCK
146 if ScanDecision.BLOCK in decisions.keys()
147 else (
148 max(decisions, key=decisions.get) if decisions else ScanDecision.ALLOW
149 )
150 )
151 final_score = (
152 decisions[ScanDecision.BLOCK]
153 if ScanDecision.BLOCK in decisions.keys()
154 else max(list(decisions.values()) + [0.0])
155 )
156
157 return ScanResult(
158 decision=final_decision,
159 reason=formatted_reasons if formatted_reasons else "default",
160 score=final_score,
161 status=ScanStatus.SUCCESS,
162 )
163
164 async def scan_async(

Callers 15

scan_replayMethod · 0.95
mainFunction · 0.95
invokeMethod · 0.95
run_code_shield_checkFunction · 0.95
scan_asyncMethod · 0.45
test_pii_detectionMethod · 0.45
test_benign_inputMethod · 0.45

Calls 3

ScanResultClass · 0.90
create_scannerFunction · 0.85
runMethod · 0.45