Execute a single task directly, bypassing the task queue. This method creates a new rollout for the given input and executes it immediately. Unlike [`iter()`][agentlightning.LitAgentRunner.iter], exceptions are propagated to the caller. Args: input: The
(
self,
input: T_task,
*,
resources: Optional[NamedResources] = None,
mode: Optional[RolloutMode] = None,
event: Optional[ExecutionEvent] = None,
)
| 792 | logger.info(f"{self._log_prefix()} Finished async rollouts. Processed {num_tasks_processed} tasks.") |
| 793 | |
| 794 | async def step( |
| 795 | self, |
| 796 | input: T_task, |
| 797 | *, |
| 798 | resources: Optional[NamedResources] = None, |
| 799 | mode: Optional[RolloutMode] = None, |
| 800 | event: Optional[ExecutionEvent] = None, |
| 801 | ) -> Rollout: |
| 802 | """Execute a single task directly, bypassing the task queue. |
| 803 | |
| 804 | This method creates a new rollout for the given input and executes it |
| 805 | immediately. Unlike [`iter()`][agentlightning.LitAgentRunner.iter], |
| 806 | exceptions are propagated to the caller. |
| 807 | |
| 808 | Args: |
| 809 | input: The task input to be processed by the agent. |
| 810 | resources: Optional named resources to be used for this specific task. |
| 811 | If provided, a new resources entry will be created in the store. |
| 812 | If not provided, the latest resources from the store will be used. |
| 813 | mode: Optional rollout mode ("train" or "validation"). If not provided, |
| 814 | the agent's default mode will be used. |
| 815 | event: Optional ExecutionEvent object to signal interruption (currently unused |
| 816 | but included for interface consistency). |
| 817 | |
| 818 | Returns: |
| 819 | The completed rollout. |
| 820 | |
| 821 | Raises: |
| 822 | Exception: Any exception that occurs during rollout execution will be |
| 823 | re-raised to the caller. |
| 824 | """ |
| 825 | store = self.get_store() |
| 826 | |
| 827 | if resources is not None: |
| 828 | resources_update = await store.add_resources(resources) |
| 829 | resources_id = resources_update.resources_id |
| 830 | else: |
| 831 | resources_id = None |
| 832 | |
| 833 | attempted_rollout = await self.get_store().start_rollout( |
| 834 | input=input, mode=mode, resources_id=resources_id, worker_id=self.get_worker_id() |
| 835 | ) |
| 836 | rollout_id = await self._step_impl(attempted_rollout, raise_on_exception=True) |
| 837 | |
| 838 | completed_rollout = await store.get_rollout_by_id(rollout_id) |
| 839 | if completed_rollout is None: |
| 840 | raise RuntimeError(f"{self._log_prefix()} Failed to fetch completed rollout by id after step: {rollout_id}") |
| 841 | return completed_rollout |
nothing calls this directly
no test coverage detected