Awaitable async runable event.
| 53 | |
| 54 | |
| 55 | class RunableEventAsync: |
| 56 | """Awaitable async runable event.""" |
| 57 | |
| 58 | def __init__(self, scheduler: 'Scheduler'): |
| 59 | self.scheduler = scheduler |
| 60 | self.event = asyncio.Event() |
| 61 | |
| 62 | async def wait(self): |
| 63 | """Wait event.""" |
| 64 | await self.event.wait() |
| 65 | |
| 66 | def set(self): |
| 67 | """Set event.""" |
| 68 | if self.scheduler.has_unfinished(): |
| 69 | self.event.set() |
| 70 | else: |
| 71 | self.event.clear() |
| 72 | |
| 73 | |
| 74 | def build_runable_event(scheduler: 'Scheduler'): |
no outgoing calls
no test coverage detected