MCPcopy Index your code
hub / github.com/google/adk-python / test_run_live_auto_create_session

Function test_run_live_auto_create_session

tests/unittests/test_runners.py:691–724  ·  view source on GitHub ↗

run_live should auto-create session when missing and yield events.

()

Source from the content-addressed store, hash-verified

689
690@pytest.mark.asyncio
691async def test_run_live_auto_create_session():
692 """run_live should auto-create session when missing and yield events."""
693 session_service = InMemorySessionService()
694 artifact_service = InMemoryArtifactService()
695 runner = Runner(
696 app_name="live_app",
697 agent=MockLiveAgent("live_agent"),
698 session_service=session_service,
699 artifact_service=artifact_service,
700 auto_create_session=True,
701 )
702
703 # An empty LiveRequestQueue is sufficient for our mock agent.
704 from google.adk.agents.live_request_queue import LiveRequestQueue
705
706 live_queue = LiveRequestQueue()
707
708 agen = runner.run_live(
709 user_id="user",
710 session_id="missing",
711 live_request_queue=live_queue,
712 )
713
714 event = await agen.__anext__()
715 await agen.aclose()
716
717 assert event.author == "live_agent"
718 assert event.content.parts[0].text == "live hello"
719
720 # Session should have been created automatically.
721 session = await session_service.get_session(
722 app_name="live_app", user_id="user", session_id="missing"
723 )
724 assert session is not None
725
726
727def test_run_passes_state_delta():

Callers

nothing calls this directly

Calls 9

run_liveMethod · 0.95
get_sessionMethod · 0.95
RunnerClass · 0.90
LiveRequestQueueClass · 0.90
MockLiveAgentClass · 0.85
__anext__Method · 0.45
acloseMethod · 0.45

Tested by

no test coverage detected