MCPcopy Index your code
hub / github.com/google/adk-python / run_async

Method run_async

tests/unittests/testing_utils.py:292–305  ·  view source on GitHub ↗
(
      self,
      new_message: Optional[types.ContentUnion] = None,
      invocation_id: Optional[str] = None,
  )

Source from the content-addressed store, hash-verified

290 )
291
292 async def run_async(
293 self,
294 new_message: Optional[types.ContentUnion] = None,
295 invocation_id: Optional[str] = None,
296 ) -> list[Event]:
297 events = []
298 async for event in self.runner.run_async(
299 user_id=self.session.user_id,
300 session_id=self.session.id,
301 invocation_id=invocation_id,
302 new_message=get_user_content(new_message) if new_message else None,
303 ):
304 events.append(event)
305 return events
306
307 def run_live(
308 self, live_request_queue: LiveRequestQueue, run_config: RunConfig = None

Calls 2

get_user_contentFunction · 0.70
appendMethod · 0.45

Tested by

no test coverage detected