(task_index: int)
| 305 | await tracker.handle_progress(progress_made=False, pending_rollout_ids=pending_ids, store=store) |
| 306 | |
| 307 | async def handle_single(task_index: int) -> None: |
| 308 | task_name = f"task-{task_index}" |
| 309 | async with semaphore: |
| 310 | console.print(f"Submitting task {task_index} of {total_tasks}") |
| 311 | await store.add_resources( |
| 312 | { |
| 313 | "llm": agl.LLM( |
| 314 | endpoint=f"http://localhost:{task_index}/v1", |
| 315 | model=f"test-model-{task_index}", |
| 316 | ) |
| 317 | } |
| 318 | ) |
| 319 | rollout = await store.enqueue_rollout(input=task_name, mode="train") |
| 320 | rollout_id = rollout.rollout_id |
| 321 | async with active_lock: |
| 322 | active_rollouts.add(rollout_id) |
| 323 | try: |
| 324 | while True: |
| 325 | current = await store.get_rollout_by_id(rollout_id) |
| 326 | if current is not None and current.status in ("failed", "succeeded", "cancelled"): |
| 327 | if current.status != "succeeded": |
| 328 | raise RuntimeError(f"Rollout {rollout_id} finished with status {current.status}") |
| 329 | break |
| 330 | await emit_progress(progress_made=False) |
| 331 | await asyncio.sleep(5.0) |
| 332 | spans = await store.query_spans(rollout_id=rollout_id, attempt_id="latest") |
| 333 | check_spans(spans, task_name) |
| 334 | await emit_progress(progress_made=True) |
| 335 | finally: |
| 336 | async with active_lock: |
| 337 | active_rollouts.discard(rollout_id) |
| 338 | |
| 339 | all_tasks = [handle_single(i) for i in range(total_tasks)] |
| 340 | await asyncio.gather(*all_tasks) |
nothing calls this directly
no test coverage detected