A slow runner that creates backpressure by processing rollouts with delays.
()
| 184 | artifacts: List[_RolloutArtifacts] = [] |
| 185 | |
| 186 | async def _slow_runner() -> None: |
| 187 | """A slow runner that creates backpressure by processing rollouts with delays.""" |
| 188 | processed = 0 |
| 189 | while processed < expected_rollouts: |
| 190 | attempted = await store.dequeue_rollout() |
| 191 | if attempted is None: |
| 192 | await asyncio.sleep(0.01) |
| 193 | continue |
| 194 | |
| 195 | attempt = attempted.attempt |
| 196 | rollout_id = attempted.rollout_id |
| 197 | rollout = await store.get_rollout_by_id(rollout_id) |
| 198 | |
| 199 | # Track the sample that was enqueued |
| 200 | if rollout: |
| 201 | enqueued_samples.append(rollout.input) |
| 202 | |
| 203 | await store.update_attempt( |
| 204 | rollout_id, |
| 205 | attempt.attempt_id, |
| 206 | status="running", |
| 207 | worker_id="slow-runner", |
| 208 | ) |
| 209 | |
| 210 | # Add a delay to create backpressure and cause queue to fill up |
| 211 | await asyncio.sleep(0.05) |
| 212 | |
| 213 | span = _build_span(rollout_id, attempt.attempt_id, sequence_id=1, index=processed + 1) |
| 214 | await store.add_span(span) |
| 215 | await store.update_attempt(rollout_id, attempt.attempt_id, status="succeeded") |
| 216 | await store.update_rollout(rollout_id, status="succeeded") |
| 217 | |
| 218 | artifacts.append( |
| 219 | _RolloutArtifacts( |
| 220 | rollout_id=rollout_id, |
| 221 | attempt_id=attempt.attempt_id, |
| 222 | attempt_sequence=attempt.sequence_id, |
| 223 | span=span, |
| 224 | ) |
| 225 | ) |
| 226 | processed += 1 |
| 227 | |
| 228 | runner_task = asyncio.create_task(_slow_runner()) |
| 229 | try: |
no test coverage detected