run_live should auto-create session when missing and yield events.
()
| 689 | |
| 690 | @pytest.mark.asyncio |
| 691 | async def test_run_live_auto_create_session(): |
| 692 | """run_live should auto-create session when missing and yield events.""" |
| 693 | session_service = InMemorySessionService() |
| 694 | artifact_service = InMemoryArtifactService() |
| 695 | runner = Runner( |
| 696 | app_name="live_app", |
| 697 | agent=MockLiveAgent("live_agent"), |
| 698 | session_service=session_service, |
| 699 | artifact_service=artifact_service, |
| 700 | auto_create_session=True, |
| 701 | ) |
| 702 | |
| 703 | # An empty LiveRequestQueue is sufficient for our mock agent. |
| 704 | from google.adk.agents.live_request_queue import LiveRequestQueue |
| 705 | |
| 706 | live_queue = LiveRequestQueue() |
| 707 | |
| 708 | agen = runner.run_live( |
| 709 | user_id="user", |
| 710 | session_id="missing", |
| 711 | live_request_queue=live_queue, |
| 712 | ) |
| 713 | |
| 714 | event = await agen.__anext__() |
| 715 | await agen.aclose() |
| 716 | |
| 717 | assert event.author == "live_agent" |
| 718 | assert event.content.parts[0].text == "live hello" |
| 719 | |
| 720 | # Session should have been created automatically. |
| 721 | session = await session_service.get_session( |
| 722 | app_name="live_app", user_id="user", session_id="missing" |
| 723 | ) |
| 724 | assert session is not None |
| 725 | |
| 726 | |
| 727 | def test_run_passes_state_delta(): |
nothing calls this directly
no test coverage detected