MCPcopy Index your code
hub / github.com/google/adk-python / run_live

Method run_live

tests/unittests/testing_utils.py:307–331  ·  view source on GitHub ↗
(
      self, live_request_queue: LiveRequestQueue, run_config: RunConfig = None
  )

Source from the content-addressed store, hash-verified

305 return events
306
307 def run_live(
308 self, live_request_queue: LiveRequestQueue, run_config: RunConfig = None
309 ) -> list[Event]:
310 collected_responses = []
311
312 async def consume_responses(session: Session):
313 run_res = self.runner.run_live(
314 session=session,
315 live_request_queue=live_request_queue,
316 run_config=run_config or RunConfig(),
317 )
318
319 async for response in run_res:
320 collected_responses.append(response)
321 # When we have enough response, we should return
322 if len(collected_responses) >= 1:
323 return
324
325 try:
326 session = self.session
327 asyncio.run(consume_responses(session))
328 except asyncio.TimeoutError:
329 print('Returning any partial results collected so far.')
330
331 return collected_responses
332
333
334class MockModel(BaseLlm):

Callers 1

consume_responsesMethod · 0.45

Calls 1

runMethod · 0.45

Tested by

no test coverage detected