Runner drives a Workflow and yields its terminal output.
()
| 201 | |
| 202 | @pytest.mark.asyncio |
| 203 | async def test_workflow_node_output(): |
| 204 | """Runner drives a Workflow and yields its terminal output.""" |
| 205 | |
| 206 | def upper(node_input: str) -> str: |
| 207 | return node_input.upper() |
| 208 | |
| 209 | wf = Workflow(name='wf', edges=[(START, upper)]) |
| 210 | events, _, _ = await _run_node(wf, message='hi') |
| 211 | |
| 212 | output_events = [e for e in events if e.output == 'HI'] |
| 213 | assert len(output_events) == 1 |
| 214 | assert output_events[0].node_info.path == 'wf@1/upper@1' |
| 215 | |
| 216 | |
| 217 | # --------------------------------------------------------------------------- |