MCPcopy Index your code
hub / github.com/fluentpython/example-code-2e / download_one

Function download_one

20-executors/getflags/flags2_asyncio.py:32–53  ·  view source on GitHub ↗
(client: httpx.AsyncClient,
                       cc: str,
                       base_url: str,
                       semaphore: asyncio.Semaphore,
                       verbose: bool)

Source from the content-addressed store, hash-verified

30 return resp.content
31
32async def download_one(client: httpx.AsyncClient,
33 cc: str,
34 base_url: str,
35 semaphore: asyncio.Semaphore,
36 verbose: bool) -> DownloadStatus:
37 try:
38 async with semaphore: # <3>
39 image = await get_flag(client, base_url, cc)
40 except httpx.HTTPStatusError as exc: # <4>
41 res = exc.response
42 if res.status_code == HTTPStatus.NOT_FOUND:
43 status = DownloadStatus.NOT_FOUND
44 msg = f'not found: {res.url}'
45 else:
46 raise
47 else:
48 await asyncio.to_thread(save_flag, image, f'{cc}.gif') # <5>
49 status = DownloadStatus.OK
50 msg = 'OK'
51 if verbose and msg:
52 print(cc, msg)
53 return status
54# end::FLAGS2_ASYNCIO_TOP[]
55
56# tag::FLAGS2_ASYNCIO_START[]

Callers 1

supervisorFunction · 0.70

Calls 1

get_flagFunction · 0.70

Tested by

no test coverage detected