MCPcopy
hub / github.com/Runnin4ik/dpi-detector / _check_dc

Function _check_dc

core/telegram_scanner.py:109–144  ·  view source on GitHub ↗
(display: LiveDisplay)

Source from the content-addressed store, hash-verified

107 return False, None
108
109async def _check_dc(display: LiveDisplay) -> List[dict]:
110 spin = 0
111 results = []
112 total = len(TELEGRAM_DC_IPS)
113 lock = asyncio.Lock()
114
115 async def _one(ip, label):
116 nonlocal spin
117 reachable, rtt = await _tcp_ping(ip, TELEGRAM_DC_PORT)
118
119 async with lock:
120 results.append({"ip": ip, "label": label, "reachable": reachable, "rtt": rtt})
121 spin = (spin + 1) % len(_SPIN)
122 ok = sum(1 for x in results if x["reachable"])
123 await display.update(2, f"[dim]{_SPIN[spin]} проверяем {len(results)}/{total} доступно: {ok}[/dim]")
124
125 await asyncio.gather(*[_one(ip, lbl) for ip, lbl in TELEGRAM_DC_IPS])
126
127 ok = sum(1 for d in results if d["reachable"])
128 parts = []
129 for d in sorted(results, key=lambda x: x["label"]):
130 rtt_s = f" {d['rtt']*1000:.0f}мс" if d.get("rtt") else ""
131 if d["reachable"]:
132 parts.append(f"[green]{d['label']}[/green][dim]{rtt_s}[/dim]")
133 else:
134 parts.append(f"[red]{d['label']}✗[/red]")
135
136 if ok == total:
137 st = f"[green]ОК {ok}/{total}[/green]"
138 elif ok == 0:
139 st = f"[red]НЕДОСТУПНЫ 0/{total}[/red]"
140 else:
141 st = f"[yellow]{ok}/{total}[/yellow]"
142
143 await display.update(2, f"{st} {' '.join(parts)}")
144 return results
145
146# ─── Upload ──────────────────────────────────────────────────────────────────
147async def _run_upload(display: LiveDisplay) -> dict:

Callers 1

run_telegram_testFunction · 0.85

Calls 2

updateMethod · 0.80
_oneFunction · 0.70

Tested by

no test coverage detected