(session: ClientSession, event: asyncio.Event)
| 96 | |
| 97 | async def token_refresh_preemptively_example() -> None: |
| 98 | async def set_token(session: ClientSession, event: asyncio.Event) -> None: |
| 99 | while True: |
| 100 | async with session.post("/refresh") as resp: |
| 101 | token = await resp.json() |
| 102 | session.headers["Authorization"] = f"Bearer {token['auth']}" |
| 103 | event.set() |
| 104 | await asyncio.sleep(token["valid_duration"]) |
| 105 | |
| 106 | @asynccontextmanager |
| 107 | async def auto_refresh_client() -> AsyncIterator[ClientSession]: |
no test coverage detected
searching dependent graphs…