检查 Agent 是否有权执行该命令。未知 Agent 不拦截(向前兼容)。
(agent_id, cmd)
| 174 | } |
| 175 | |
| 176 | def _check_permission(agent_id, cmd): |
| 177 | """检查 Agent 是否有权执行该命令。未知 Agent 不拦截(向前兼容)。""" |
| 178 | if not agent_id: |
| 179 | return # 无法推断 Agent 身份时不拦截 |
| 180 | policy = AGENT_POLICY.get(agent_id) |
| 181 | if policy is None: |
| 182 | return # 未注册的 Agent 不拦截 |
| 183 | if cmd not in policy["commands"]: |
| 184 | _append_audit(None, agent_id, "permission_denied", cmd, None, f"{agent_id} 越权执行 {cmd}") |
| 185 | log.warning(f"⛔ {agent_id} 无权执行 {cmd}(允许: {policy['commands']})") |
| 186 | print(f"[看板] 越权拒绝: {agent_id} 不可执行 {cmd}", flush=True) |
| 187 | sys.exit(1) |
| 188 | |
| 189 | |
| 190 | def find_task(tasks, task_id): |
no test coverage detected