Wait until a dashboard update task reaches a terminal status. Args: test_client: Quart/FastAPI adapter test client. authenticated_header: Headers for authenticated dashboard requests. progress_id: Update progress id to poll. Returns: The progress response pa
(
test_client,
authenticated_header: dict,
progress_id: str,
)
| 75 | |
| 76 | |
| 77 | async def _wait_for_update_progress( |
| 78 | test_client, |
| 79 | authenticated_header: dict, |
| 80 | progress_id: str, |
| 81 | ) -> dict: |
| 82 | """Wait until a dashboard update task reaches a terminal status. |
| 83 | |
| 84 | Args: |
| 85 | test_client: Quart/FastAPI adapter test client. |
| 86 | authenticated_header: Headers for authenticated dashboard requests. |
| 87 | progress_id: Update progress id to poll. |
| 88 | |
| 89 | Returns: |
| 90 | The progress response payload. |
| 91 | """ |
| 92 | for _ in range(100): |
| 93 | response = await test_client.get( |
| 94 | f"/api/update/progress?id={progress_id}", |
| 95 | headers=authenticated_header, |
| 96 | ) |
| 97 | data = await response.get_json() |
| 98 | if data["data"].get("status") in {"success", "error"}: |
| 99 | return data |
| 100 | await asyncio.sleep(0.01) |
| 101 | pytest.fail(f"Update task did not finish: {progress_id}") |
| 102 | |
| 103 | |
| 104 | @pytest.fixture |
no test coverage detected