MCPcopy Index your code
hub / github.com/microsoft/agent-lightning / handle_single

Method handle_single

tests/benchmark/benchmark_store.py:307–337  ·  view source on GitHub ↗
(task_index: int)

Source from the content-addressed store, hash-verified

305 await tracker.handle_progress(progress_made=False, pending_rollout_ids=pending_ids, store=store)
306
307 async def handle_single(task_index: int) -> None:
308 task_name = f"task-{task_index}"
309 async with semaphore:
310 console.print(f"Submitting task {task_index} of {total_tasks}")
311 await store.add_resources(
312 {
313 "llm": agl.LLM(
314 endpoint=f"http://localhost:{task_index}/v1",
315 model=f"test-model-{task_index}",
316 )
317 }
318 )
319 rollout = await store.enqueue_rollout(input=task_name, mode="train")
320 rollout_id = rollout.rollout_id
321 async with active_lock:
322 active_rollouts.add(rollout_id)
323 try:
324 while True:
325 current = await store.get_rollout_by_id(rollout_id)
326 if current is not None and current.status in ("failed", "succeeded", "cancelled"):
327 if current.status != "succeeded":
328 raise RuntimeError(f"Rollout {rollout_id} finished with status {current.status}")
329 break
330 await emit_progress(progress_made=False)
331 await asyncio.sleep(5.0)
332 spans = await store.query_spans(rollout_id=rollout_id, attempt_id="latest")
333 check_spans(spans, task_name)
334 await emit_progress(progress_made=True)
335 finally:
336 async with active_lock:
337 active_rollouts.discard(rollout_id)
338
339 all_tasks = [handle_single(i) for i in range(total_tasks)]
340 await asyncio.gather(*all_tasks)

Callers

nothing calls this directly

Calls 5

check_spansFunction · 0.85
add_resourcesMethod · 0.45
enqueue_rolloutMethod · 0.45
get_rollout_by_idMethod · 0.45
query_spansMethod · 0.45

Tested by

no test coverage detected