(args: tuple[str, str, str])
| 231 | |
| 232 | |
| 233 | def _dequeue_only_task(args: tuple[str, str, str]) -> bool: |
| 234 | store_url, worker_id, task_id = args |
| 235 | console.print(f"[Dequeue-Only Task {task_id}] Dequeueing rollout for worker {worker_id}") |
| 236 | store = agl.LightningStoreClient(store_url) |
| 237 | |
| 238 | async def _async_task() -> bool: |
| 239 | attempted = await store.dequeue_rollout() # no worker_id |
| 240 | if attempted is None: |
| 241 | console.print(f"[Dequeue-Only Task {task_id}] No rollout available to dequeue") |
| 242 | return False |
| 243 | return True |
| 244 | |
| 245 | try: |
| 246 | return asyncio.run(_async_task()) |
| 247 | except Exception as e: |
| 248 | console.print(f"Error dequeueing only worker {worker_id} for task {task_id}: {e}") |
| 249 | return False |
| 250 | finally: |
| 251 | _close_store_client(store) |
| 252 | |
| 253 | |
| 254 | def dequeue_rollouts(store_url: str) -> BenchmarkSummary: |
nothing calls this directly
no test coverage detected