Flush pending metrics immediately.
()
| 173 | |
| 174 | @staticmethod |
| 175 | async def flush() -> None: |
| 176 | """Flush pending metrics immediately.""" |
| 177 | lock = Metric._get_lock() |
| 178 | async with lock: |
| 179 | pending_metrics = list(Metric._pending_metrics.values()) |
| 180 | Metric._pending_metrics = {} |
| 181 | |
| 182 | for metrics_data in pending_metrics: |
| 183 | await Metric._post_metrics(metrics_data) |
| 184 | await asyncio.sleep(0.25) |
| 185 | |
| 186 | @staticmethod |
| 187 | async def _post_metrics(metrics_data: dict[str, Any]) -> None: |
no test coverage detected