MCPcopy Index your code
hub / github.com/wechaty/python-wechaty / sem_task

Function sem_task

src/wechaty/utils/async_helper.py:29–31  ·  view source on GitHub ↗
(task: Task)

Source from the content-addressed store, hash-verified

27 semaphore = asyncio.Semaphore(n_task)
28
29 async def sem_task(task: Task) -> Any:
30 async with semaphore:
31 return await task
32 return await asyncio.gather(*(sem_task(task) for task in tasks))
33
34

Callers 1

gather_with_concurrencyFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected