MCPcopy
hub / github.com/microsoft/agent-lightning / _slow_runner

Function _slow_runner

tests/algorithm/test_baseline.py:186–226  ·  view source on GitHub ↗

A slow runner that creates backpressure by processing rollouts with delays.

()

Source from the content-addressed store, hash-verified

184 artifacts: List[_RolloutArtifacts] = []
185
186 async def _slow_runner() -> None:
187 """A slow runner that creates backpressure by processing rollouts with delays."""
188 processed = 0
189 while processed < expected_rollouts:
190 attempted = await store.dequeue_rollout()
191 if attempted is None:
192 await asyncio.sleep(0.01)
193 continue
194
195 attempt = attempted.attempt
196 rollout_id = attempted.rollout_id
197 rollout = await store.get_rollout_by_id(rollout_id)
198
199 # Track the sample that was enqueued
200 if rollout:
201 enqueued_samples.append(rollout.input)
202
203 await store.update_attempt(
204 rollout_id,
205 attempt.attempt_id,
206 status="running",
207 worker_id="slow-runner",
208 )
209
210 # Add a delay to create backpressure and cause queue to fill up
211 await asyncio.sleep(0.05)
212
213 span = _build_span(rollout_id, attempt.attempt_id, sequence_id=1, index=processed + 1)
214 await store.add_span(span)
215 await store.update_attempt(rollout_id, attempt.attempt_id, status="succeeded")
216 await store.update_rollout(rollout_id, status="succeeded")
217
218 artifacts.append(
219 _RolloutArtifacts(
220 rollout_id=rollout_id,
221 attempt_id=attempt.attempt_id,
222 attempt_sequence=attempt.sequence_id,
223 span=span,
224 )
225 )
226 processed += 1
227
228 runner_task = asyncio.create_task(_slow_runner())
229 try:

Calls 7

_RolloutArtifactsClass · 0.85
_build_spanFunction · 0.70
dequeue_rolloutMethod · 0.45
get_rollout_by_idMethod · 0.45
update_attemptMethod · 0.45
add_spanMethod · 0.45
update_rolloutMethod · 0.45

Tested by

no test coverage detected