Run a non-agent BaseNode in live mode.
(
self,
*,
session: Session,
live_request_queue: LiveRequestQueue,
run_config: Optional[RunConfig] = None,
)
| 585 | ) |
| 586 | |
| 587 | async def _run_node_live( |
| 588 | self, |
| 589 | *, |
| 590 | session: Session, |
| 591 | live_request_queue: LiveRequestQueue, |
| 592 | run_config: Optional[RunConfig] = None, |
| 593 | ) -> AsyncGenerator[Event, None]: |
| 594 | """Run a non-agent BaseNode in live mode.""" |
| 595 | from .agents.context import Context |
| 596 | from .workflow._dynamic_node_scheduler import DynamicNodeScheduler |
| 597 | from .workflow._node_runner import NodeRunner |
| 598 | from .workflow._workflow import _LoopState |
| 599 | from .workflow._workflow import Workflow |
| 600 | |
| 601 | ic = self._new_invocation_context_for_live( |
| 602 | session, |
| 603 | live_request_queue=live_request_queue, |
| 604 | run_config=run_config or RunConfig(), |
| 605 | ) |
| 606 | ic._event_queue = asyncio.Queue() |
| 607 | |
| 608 | root_ctx = Context(ic) |
| 609 | root_agent = self.agent |
| 610 | is_workflow = isinstance(root_agent, Workflow) |
| 611 | |
| 612 | if not is_workflow: |
| 613 | root_node_runner = NodeRunner(node=root_agent, parent_ctx=root_ctx) |
| 614 | |
| 615 | done_sentinel = object() |
| 616 | |
| 617 | async def _drive_root_node(): |
| 618 | try: |
| 619 | if is_workflow: |
| 620 | scheduler = DynamicNodeScheduler(state=_LoopState()) |
| 621 | root_ctx._workflow_scheduler = scheduler |
| 622 | ctx = await scheduler( |
| 623 | root_ctx, |
| 624 | root_agent, |
| 625 | None, |
| 626 | run_id='1', |
| 627 | ) |
| 628 | else: |
| 629 | ctx = await root_node_runner.run( |
| 630 | node_input=None, |
| 631 | ) |
| 632 | if ctx.error: |
| 633 | raise ctx.error |
| 634 | finally: |
| 635 | await ic._event_queue.put((done_sentinel, None)) |
| 636 | |
| 637 | task = asyncio.create_task(_drive_root_node()) |
| 638 | |
| 639 | try: |
| 640 | async with aclosing(self._consume_event_queue(ic, done_sentinel)) as agen: |
| 641 | async for event in agen: |
| 642 | yield event |
| 643 | finally: |
| 644 | await self._cleanup_root_task(task, self.agent.name) |
no test coverage detected