MCPcopy Index your code
hub / github.com/fluentpython/example-code-2e / supervisor

Function supervisor

20-executors/getflags/flags2_asyncio.py:57–91  ·  view source on GitHub ↗
(cc_list: list[str],
                     base_url: str,
                     verbose: bool,
                     concur_req: int)

Source from the content-addressed store, hash-verified

55
56# tag::FLAGS2_ASYNCIO_START[]
57async def supervisor(cc_list: list[str],
58 base_url: str,
59 verbose: bool,
60 concur_req: int) -> Counter[DownloadStatus]: # <1>
61 counter: Counter[DownloadStatus] = Counter()
62 semaphore = asyncio.Semaphore(concur_req) # <2>
63 async with httpx.AsyncClient() as client:
64 to_do = [download_one(client, cc, base_url, semaphore, verbose)
65 for cc in sorted(cc_list)] # <3>
66 to_do_iter = asyncio.as_completed(to_do) # <4>
67 if not verbose:
68 to_do_iter = tqdm.tqdm(to_do_iter, total=len(cc_list)) # <5>
69 error: httpx.HTTPError | None = None # <6>
70 for coro in to_do_iter: # <7>
71 try:
72 status = await coro # <8>
73 except httpx.HTTPStatusError as exc:
74 error_msg = 'HTTP error {resp.status_code} - {resp.reason_phrase}'
75 error_msg = error_msg.format(resp=exc.response)
76 error = exc # <9>
77 except httpx.RequestError as exc:
78 error_msg = f'{exc} {type(exc)}'.strip()
79 error = exc # <10>
80 except KeyboardInterrupt:
81 break
82
83 if error:
84 status = DownloadStatus.ERROR # <11>
85 if verbose:
86 url = str(error.request.url) # <12>
87 cc = Path(url).stem.upper() # <13>
88 print(f'{cc} error: {error_msg}')
89 counter[status] += 1
90
91 return counter
92
93def download_many(cc_list: list[str],
94 base_url: str,

Callers 1

download_manyFunction · 0.70

Calls 1

download_oneFunction · 0.70

Tested by

no test coverage detected