Create NodeRunner and start asyncio task for a node.
(
self,
loop_state: _LoopState,
ctx: Context,
node_name: str,
trigger: Trigger,
)
| 536 | node_state.status = NodeStatus.RUNNING |
| 537 | |
| 538 | def _start_node_task( |
| 539 | self, |
| 540 | loop_state: _LoopState, |
| 541 | ctx: Context, |
| 542 | node_name: str, |
| 543 | trigger: Trigger, |
| 544 | ) -> None: |
| 545 | """Create NodeRunner and start asyncio task for a node.""" |
| 546 | |
| 547 | assert self.graph is not None |
| 548 | |
| 549 | node = self._get_static_node_by_name(node_name) |
| 550 | is_terminal = node_name in self.graph._terminal_node_names |
| 551 | |
| 552 | node_state = loop_state.nodes[node_name] |
| 553 | # Reuse run_id on resume; assign a new sequential id for fresh runs. |
| 554 | run_id = node_state.run_id |
| 555 | if not run_id: |
| 556 | run_id = self._next_run_id(node_state) |
| 557 | node_state.run_id = run_id |
| 558 | |
| 559 | # Intercept execution based on historical session events. |
| 560 | key = f'{node_name}@{run_id}' |
| 561 | if key in loop_state.recovered_executions: |
| 562 | recovered = loop_state.recovered_executions[key] |
| 563 | |
| 564 | result = check_interception( |
| 565 | node_path=f'{ctx.node_path}/{node_name}@{run_id}', |
| 566 | node=node, |
| 567 | recovered=recovered, |
| 568 | curr_parent_ctx=ctx, |
| 569 | ) |
| 570 | |
| 571 | if not result.should_run: |
| 572 | is_terminal = node_name in self.graph._terminal_node_names |
| 573 | ancestor_path = ctx.node_path if is_terminal else None |
| 574 | |
| 575 | if ancestor_path: |
| 576 | ancestors = [ancestor_path] + list(ctx._output_for_ancestors or []) |
| 577 | else: |
| 578 | ancestors = list(ctx._output_for_ancestors or []) |
| 579 | |
| 580 | mock_ctx = create_mock_context( |
| 581 | parent_ctx=ctx, |
| 582 | node=node, |
| 583 | run_id=run_id, |
| 584 | result=result, |
| 585 | ancestors=ancestors, |
| 586 | branch=recovered.branch, |
| 587 | ) |
| 588 | |
| 589 | async def return_ctx(): |
| 590 | if loop_state.sequence_barrier: |
| 591 | await loop_state.sequence_barrier.wait(key) |
| 592 | return mock_ctx |
| 593 | |
| 594 | loop_state.pending_tasks[node_name] = asyncio.create_task(return_ctx()) |
| 595 | return |
no test coverage detected