Return the next task from the local queue. If [`TaskInput`][agentlightning.TaskInput] instances were provided, they are converted into [`Task`][agentlightning.Task] objects on the fly. Otherwise, the preconstructed tasks are returned in sequence. Returns:
(self)
| 345 | return self._rollouts |
| 346 | |
| 347 | def poll_next_task(self) -> Optional[Task]: |
| 348 | """Return the next task from the local queue. |
| 349 | |
| 350 | If [`TaskInput`][agentlightning.TaskInput] instances were provided, |
| 351 | they are converted into [`Task`][agentlightning.Task] objects on the |
| 352 | fly. Otherwise, the preconstructed tasks are returned in sequence. |
| 353 | |
| 354 | Returns: |
| 355 | Next task to execute. |
| 356 | """ |
| 357 | if self._task_index >= len(self._tasks): |
| 358 | self._task_index = 0 |
| 359 | |
| 360 | task_or_input = self._tasks[self._task_index] |
| 361 | |
| 362 | if isinstance(task_or_input, Task): |
| 363 | task = task_or_input |
| 364 | else: |
| 365 | rollout_id = f"local_task_{self._task_index + 1:03d}" |
| 366 | task = Task( |
| 367 | rollout_id=rollout_id, |
| 368 | input=task_or_input, |
| 369 | resources_id=self._resources_update.resources_id, |
| 370 | create_time=time.time(), |
| 371 | ) |
| 372 | |
| 373 | self._task_index += 1 |
| 374 | self.task_count += 1 |
| 375 | logger.info(f"[Task {self.task_count} Received] Task ID: {task.rollout_id}") |
| 376 | return task |
| 377 | |
| 378 | def get_resources_by_id(self, resource_id: str) -> Optional[ResourcesUpdate]: |
| 379 | logger.debug(f"DevTaskLoader checking resources for ID: {resource_id}") |