| 298 | |
| 299 | |
| 300 | async def _open_loop_scheduler( |
| 301 | session, |
| 302 | url: str, |
| 303 | rng: random.Random, |
| 304 | results: list[Result], |
| 305 | test_start: float, |
| 306 | stop_at: float, |
| 307 | rps: float, |
| 308 | simple_ratio: float, |
| 309 | max_in_flight: int, |
| 310 | stats: LoopStats, |
| 311 | ) -> None: |
| 312 | inter_arrival = 1.0 / rps |
| 313 | next_fire = test_start |
| 314 | in_flight: set[asyncio.Task] = set() |
| 315 | |
| 316 | def _release(t: asyncio.Task) -> None: |
| 317 | in_flight.discard(t) |
| 318 | |
| 319 | while time.monotonic() < stop_at: |
| 320 | now = time.monotonic() |
| 321 | if now < next_fire: |
| 322 | await asyncio.sleep(next_fire - now) |
| 323 | if len(in_flight) >= max_in_flight: |
| 324 | # Overloaded: client would normally back off; we drop the slot |
| 325 | # and let the scheduler stay on cadence. |
| 326 | stats.dropped += 1 |
| 327 | else: |
| 328 | task = asyncio.create_task( |
| 329 | _fire_one(session, url, rng, results, test_start, simple_ratio) |
| 330 | ) |
| 331 | in_flight.add(task) |
| 332 | task.add_done_callback(_release) |
| 333 | stats.scheduled += 1 |
| 334 | stats.in_flight_peak = max(stats.in_flight_peak, len(in_flight)) |
| 335 | next_fire += inter_arrival |
| 336 | |
| 337 | if in_flight: |
| 338 | await asyncio.gather(*in_flight, return_exceptions=True) |
| 339 | |
| 340 | |
| 341 | async def run_http(args: argparse.Namespace) -> dict: |