Enqueue a new task and return the generated rollout identifier. Args: sample: Payload that describes the task input. mode: Phase in which the sample should be executed (`"train"`, `"val"`, or `"test"`). resources_id: Identifier of a resour
(
self,
sample: Any,
mode: Literal["train", "val", "test"] | None = None,
resources_id: str | None = None,
metadata: Dict[str, Any] | None = None,
)
| 59 | self._resources_lock = asyncio.Lock() |
| 60 | |
| 61 | async def add_task( |
| 62 | self, |
| 63 | sample: Any, |
| 64 | mode: Literal["train", "val", "test"] | None = None, |
| 65 | resources_id: str | None = None, |
| 66 | metadata: Dict[str, Any] | None = None, |
| 67 | ) -> str: |
| 68 | """Enqueue a new task and return the generated rollout identifier. |
| 69 | |
| 70 | Args: |
| 71 | sample: Payload that describes the task input. |
| 72 | mode: Phase in which the sample should be executed (`"train"`, `"val"`, or |
| 73 | `"test"`). |
| 74 | resources_id: Identifier of a resource bundle that the executor should |
| 75 | load before running the task. |
| 76 | metadata: Optional metadata forwarded to the executor. |
| 77 | |
| 78 | Returns: |
| 79 | Unique rollout identifier assigned to the task. |
| 80 | """ |
| 81 | rollout_id = f"rollout-{uuid.uuid4()}" |
| 82 | task = Task( |
| 83 | rollout_id=rollout_id, |
| 84 | input=sample, |
| 85 | mode=mode, |
| 86 | resources_id=resources_id, |
| 87 | create_time=time.time(), |
| 88 | num_claims=0, |
| 89 | metadata=metadata or {}, |
| 90 | ) |
| 91 | await self._task_queue.put(task) |
| 92 | logger.info(f"Task queued: {rollout_id} (mode: {mode}, resources_id: {resources_id})") |
| 93 | return rollout_id |
| 94 | |
| 95 | async def get_next_task(self) -> Optional[Task]: |
| 96 | """Retrieve the next task from the queue without blocking. |
no test coverage detected