MCPcopy Index your code
hub / github.com/google/adk-python / test_workflow_node_output

Function test_workflow_node_output

tests/unittests/runners/test_runner_node.py:203–214  ·  view source on GitHub ↗

Runner drives a Workflow and yields its terminal output.

()

Source from the content-addressed store, hash-verified

201
202@pytest.mark.asyncio
203async def test_workflow_node_output():
204 """Runner drives a Workflow and yields its terminal output."""
205
206 def upper(node_input: str) -> str:
207 return node_input.upper()
208
209 wf = Workflow(name='wf', edges=[(START, upper)])
210 events, _, _ = await _run_node(wf, message='hi')
211
212 output_events = [e for e in events if e.output == 'HI']
213 assert len(output_events) == 1
214 assert output_events[0].node_info.path == 'wf@1/upper@1'
215
216
217# ---------------------------------------------------------------------------

Callers

nothing calls this directly

Calls 2

WorkflowClass · 0.90
_run_nodeFunction · 0.85

Tested by

no test coverage detected