(task: Task)
| 27 | semaphore = asyncio.Semaphore(n_task) |
| 28 | |
| 29 | async def sem_task(task: Task) -> Any: |
| 30 | async with semaphore: |
| 31 | return await task |
| 32 | return await asyncio.gather(*(sem_task(task) for task in tasks)) |
| 33 | |
| 34 |
no outgoing calls
no test coverage detected