MCPcopy
hub / github.com/cft0808/edict / main

Function main

scripts/refresh_live_data.py:21–118  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

19
20
21def main():
22 # 使用 officials_stats.json(与 sync_officials_stats.py 统一)
23 officials_data = read_json(DATA / 'officials_stats.json', {})
24 officials = officials_data.get('officials', []) if isinstance(officials_data, dict) else officials_data
25 # 任务源优先:tasks_source.json(可对接外部系统同步写入)
26 tasks = atomic_json_read(DATA / 'tasks_source.json', [])
27 if not tasks:
28 tasks = read_json(DATA / 'tasks.json', [])
29
30 sync_status = read_json(DATA / 'sync_status.json', {})
31
32 org_map = {}
33 for o in officials:
34 label = o.get('label', o.get('name', ''))
35 if label:
36 org_map[label] = label
37
38 now_ts = datetime.datetime.now(datetime.timezone.utc)
39 for t in tasks:
40 t['org'] = t.get('org') or org_map.get(t.get('official', ''), '')
41 t['outputMeta'] = output_meta(t.get('output', ''))
42
43 # 心跳时效检测:对 Doing/Assigned 状态的任务标注活跃度
44 if t.get('state') in ('Doing', 'Assigned', 'Review'):
45 updated_raw = t.get('updatedAt') or t.get('sourceMeta', {}).get('updatedAt')
46 age_sec = None
47 if updated_raw:
48 try:
49 if isinstance(updated_raw, (int, float)):
50 updated_dt = datetime.datetime.fromtimestamp(updated_raw / 1000, tz=datetime.timezone.utc)
51 else:
52 updated_dt = datetime.datetime.fromisoformat(str(updated_raw).replace('Z', '+00:00'))
53 age_sec = (now_ts - updated_dt).total_seconds()
54 except Exception:
55 pass
56 if age_sec is None:
57 t['heartbeat'] = {'status': 'unknown', 'label': '⚪ 未知', 'ageSec': None}
58 elif age_sec < 300:
59 t['heartbeat'] = {'status': 'active', 'label': f'🟢 活跃 {int(age_sec//60)}分钟前', 'ageSec': int(age_sec)}
60 elif age_sec < 900:
61 t['heartbeat'] = {'status': 'warn', 'label': f'🟡 可能停滞 {int(age_sec//60)}分钟前', 'ageSec': int(age_sec)}
62 else:
63 t['heartbeat'] = {'status': 'stalled', 'label': f'🔴 已停滞 {int(age_sec//60)}分钟', 'ageSec': int(age_sec)}
64 else:
65 t['heartbeat'] = None
66
67 today_str = datetime.datetime.now(datetime.timezone.utc).strftime('%Y-%m-%d')
68 def _is_today_done(t):
69 if t.get('state') != 'Done':
70 return False
71 ua = t.get('updatedAt', '')
72 if isinstance(ua, str) and ua[:10] == today_str:
73 return True
74 # fallback: outputMeta lastModified
75 lm = t.get('outputMeta', {}).get('lastModified', '')
76 if isinstance(lm, str) and lm[:10] == today_str:
77 return True
78 return False

Callers 1

Calls 5

read_jsonFunction · 0.90
atomic_json_readFunction · 0.90
atomic_json_writeFunction · 0.90
output_metaFunction · 0.85
_is_today_doneFunction · 0.85

Tested by

no test coverage detected