MCPcopy
hub / github.com/fluentpython/example-code-2e / download_one

Function download_one

20-executors/getflags/flags3_asyncio.py:44–68  ·  view source on GitHub ↗
(client: httpx.AsyncClient,
                       cc: str,
                       base_url: str,
                       semaphore: asyncio.Semaphore,
                       verbose: bool)

Source from the content-addressed store, hash-verified

42
43# tag::FLAGS3_ASYNCIO_DOWNLOAD_ONE[]
44async def download_one(client: httpx.AsyncClient,
45 cc: str,
46 base_url: str,
47 semaphore: asyncio.Semaphore,
48 verbose: bool) -> DownloadStatus:
49 try:
50 async with semaphore: # <1>
51 image = await get_flag(client, base_url, cc)
52 async with semaphore: # <2>
53 country = await get_country(client, base_url, cc)
54 except httpx.HTTPStatusError as exc:
55 res = exc.response
56 if res.status_code == HTTPStatus.NOT_FOUND:
57 status = DownloadStatus.NOT_FOUND
58 msg = f'not found: {res.url}'
59 else:
60 raise
61 else:
62 filename = country.replace(' ', '_') # <3>
63 await asyncio.to_thread(save_flag, image, f'{filename}.gif')
64 status = DownloadStatus.OK
65 msg = 'OK'
66 if verbose and msg:
67 print(cc, msg)
68 return status
69# end::FLAGS3_ASYNCIO_DOWNLOAD_ONE[]
70
71# tag::FLAGS2_ASYNCIO_START[]

Callers 1

supervisorFunction · 0.70

Calls 3

get_countryFunction · 0.85
get_flagFunction · 0.70
replaceMethod · 0.45

Tested by

no test coverage detected