MCPcopy
hub / github.com/microsoft/agent-lightning / step

Method step

agentlightning/runner/agent.py:794–841  ·  view source on GitHub ↗

Execute a single task directly, bypassing the task queue. This method creates a new rollout for the given input and executes it immediately. Unlike [`iter()`][agentlightning.LitAgentRunner.iter], exceptions are propagated to the caller. Args: input: The

(
        self,
        input: T_task,
        *,
        resources: Optional[NamedResources] = None,
        mode: Optional[RolloutMode] = None,
        event: Optional[ExecutionEvent] = None,
    )

Source from the content-addressed store, hash-verified

792 logger.info(f"{self._log_prefix()} Finished async rollouts. Processed {num_tasks_processed} tasks.")
793
794 async def step(
795 self,
796 input: T_task,
797 *,
798 resources: Optional[NamedResources] = None,
799 mode: Optional[RolloutMode] = None,
800 event: Optional[ExecutionEvent] = None,
801 ) -> Rollout:
802 """Execute a single task directly, bypassing the task queue.
803
804 This method creates a new rollout for the given input and executes it
805 immediately. Unlike [`iter()`][agentlightning.LitAgentRunner.iter],
806 exceptions are propagated to the caller.
807
808 Args:
809 input: The task input to be processed by the agent.
810 resources: Optional named resources to be used for this specific task.
811 If provided, a new resources entry will be created in the store.
812 If not provided, the latest resources from the store will be used.
813 mode: Optional rollout mode ("train" or "validation"). If not provided,
814 the agent's default mode will be used.
815 event: Optional ExecutionEvent object to signal interruption (currently unused
816 but included for interface consistency).
817
818 Returns:
819 The completed rollout.
820
821 Raises:
822 Exception: Any exception that occurs during rollout execution will be
823 re-raised to the caller.
824 """
825 store = self.get_store()
826
827 if resources is not None:
828 resources_update = await store.add_resources(resources)
829 resources_id = resources_update.resources_id
830 else:
831 resources_id = None
832
833 attempted_rollout = await self.get_store().start_rollout(
834 input=input, mode=mode, resources_id=resources_id, worker_id=self.get_worker_id()
835 )
836 rollout_id = await self._step_impl(attempted_rollout, raise_on_exception=True)
837
838 completed_rollout = await store.get_rollout_by_id(rollout_id)
839 if completed_rollout is None:
840 raise RuntimeError(f"{self._log_prefix()} Failed to fetch completed rollout by id after step: {rollout_id}")
841 return completed_rollout

Callers

nothing calls this directly

Calls 7

get_storeMethod · 0.95
get_worker_idMethod · 0.95
_step_implMethod · 0.95
_log_prefixMethod · 0.95
add_resourcesMethod · 0.45
start_rolloutMethod · 0.45
get_rollout_by_idMethod · 0.45

Tested by

no test coverage detected