Simulate a runner consuming rollouts, adding spans, and marking them complete.
(
*,
store: InMemoryLightningStore,
expected: int,
artifacts: List[_RolloutArtifacts],
)
| 70 | |
| 71 | |
| 72 | async def _mock_runner( |
| 73 | *, |
| 74 | store: InMemoryLightningStore, |
| 75 | expected: int, |
| 76 | artifacts: List[_RolloutArtifacts], |
| 77 | ) -> None: |
| 78 | """Simulate a runner consuming rollouts, adding spans, and marking them complete.""" |
| 79 | processed = 0 |
| 80 | while processed < expected: |
| 81 | attempted = await store.dequeue_rollout() |
| 82 | if attempted is None: |
| 83 | await asyncio.sleep(0.001) |
| 84 | continue |
| 85 | |
| 86 | attempt = attempted.attempt |
| 87 | rollout_id = attempted.rollout_id |
| 88 | await store.update_attempt( |
| 89 | rollout_id, |
| 90 | attempt.attempt_id, |
| 91 | status="running", |
| 92 | worker_id="runner-1", |
| 93 | ) |
| 94 | |
| 95 | span = _build_span(rollout_id, attempt.attempt_id, sequence_id=1, index=processed + 1) |
| 96 | await store.add_span(span) |
| 97 | await store.update_attempt(rollout_id, attempt.attempt_id, status="succeeded") |
| 98 | await store.update_rollout(rollout_id, status="succeeded") |
| 99 | |
| 100 | artifacts.append( |
| 101 | _RolloutArtifacts( |
| 102 | rollout_id=rollout_id, |
| 103 | attempt_id=attempt.attempt_id, |
| 104 | attempt_sequence=attempt.sequence_id, |
| 105 | span=span, |
| 106 | ) |
| 107 | ) |
| 108 | processed += 1 |
| 109 | |
| 110 | |
| 111 | @pytest.mark.asyncio |
no test coverage detected