MCPcopy
hub / github.com/fluentpython/example-code-2e / supervisor

Function supervisor

20-executors/getflags/flags3_asyncio.py:72–106  ·  view source on GitHub ↗
(cc_list: list[str],
                     base_url: str,
                     verbose: bool,
                     concur_req: int)

Source from the content-addressed store, hash-verified

70
71# tag::FLAGS2_ASYNCIO_START[]
72async def supervisor(cc_list: list[str],
73 base_url: str,
74 verbose: bool,
75 concur_req: int) -> Counter[DownloadStatus]: # <1>
76 counter: Counter[DownloadStatus] = Counter()
77 semaphore = asyncio.Semaphore(concur_req) # <2>
78 async with httpx.AsyncClient() as client:
79 to_do = [download_one(client, cc, base_url, semaphore, verbose)
80 for cc in sorted(cc_list)] # <3>
81 to_do_iter = asyncio.as_completed(to_do) # <4>
82 if not verbose:
83 to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
84 error: httpx.HTTPError | None = None # <6>
85 for coro in to_do_iter: # <7>
86 try:
87 status = await coro # <8>
88 except httpx.HTTPStatusError as exc:
89 error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
90 error_msg = error_msg.format(resp=exc.response)
91 error = exc # <9>
92 except httpx.RequestError as exc:
93 error_msg = f'{exc} {type(exc)}'.strip()
94 error = exc # <10>
95 except KeyboardInterrupt:
96 break
97
98 if error:
99 status = DownloadStatus.ERROR # <11>
100 if verbose:
101 url = str(error.request.url) # <12>
102 cc = Path(url).stem.upper() # <13>
103 print(f'{cc} error: {error_msg}')
104 counter[status] += 1
105
106 return counter
107
108def download_many(cc_list: list[str],
109 base_url: str,

Callers 1

download_manyFunction · 0.70

Calls 1

download_oneFunction · 0.70

Tested by

no test coverage detected