追加一条审计记录到 data/audit_log.json(原子操作)。
(task_id, agent, action, old_val=None, new_val=None, reason="")
| 135 | MAX_AUDIT_LOG = 5000 # 审计日志最大条数 |
| 136 | |
| 137 | def _append_audit(task_id, agent, action, old_val=None, new_val=None, reason=""): |
| 138 | """追加一条审计记录到 data/audit_log.json(原子操作)。""" |
| 139 | entry = { |
| 140 | "ts": now_iso(), |
| 141 | "task": task_id or "", |
| 142 | "agent": agent or "", |
| 143 | "action": action, |
| 144 | "from": old_val, |
| 145 | "to": new_val, |
| 146 | "reason": reason, |
| 147 | } |
| 148 | try: |
| 149 | def modifier(logs): |
| 150 | if logs is None: |
| 151 | logs = [] |
| 152 | logs.append(entry) |
| 153 | if len(logs) > MAX_AUDIT_LOG: |
| 154 | logs = logs[-MAX_AUDIT_LOG:] |
| 155 | return logs |
| 156 | atomic_json_update(AUDIT_FILE, modifier, []) |
| 157 | except Exception as e: |
| 158 | log.warning(f"审计日志写入失败: {e}") |
| 159 | |
| 160 | |
| 161 | # ── 越权检测(Agent 权限策略)── |
no test coverage detected