Wraps execution with plugin callbacks. Args: invocation_context: The invocation context session: The current session (ignored, kept for backward compatibility) execute_fn: A callable that returns an AsyncGenerator of Events is_live_call: Whether this is a live call
(
self,
invocation_context: InvocationContext,
session: Session,
execute_fn: Callable[[InvocationContext], AsyncGenerator[Event, None]],
is_live_call: bool = False,
)
| 1333 | return output_event |
| 1334 | |
| 1335 | async def _exec_with_plugin( |
| 1336 | self, |
| 1337 | invocation_context: InvocationContext, |
| 1338 | session: Session, |
| 1339 | execute_fn: Callable[[InvocationContext], AsyncGenerator[Event, None]], |
| 1340 | is_live_call: bool = False, |
| 1341 | ) -> AsyncGenerator[Event, None]: |
| 1342 | """Wraps execution with plugin callbacks. |
| 1343 | |
| 1344 | Args: |
| 1345 | invocation_context: The invocation context |
| 1346 | session: The current session (ignored, kept for backward compatibility) |
| 1347 | execute_fn: A callable that returns an AsyncGenerator of Events |
| 1348 | is_live_call: Whether this is a live call |
| 1349 | |
| 1350 | Yields: |
| 1351 | Events from the execution, including any generated by plugins |
| 1352 | """ |
| 1353 | |
| 1354 | plugin_manager = invocation_context.plugin_manager |
| 1355 | |
| 1356 | # Step 1: Run the before_run callbacks to see if we should early exit. |
| 1357 | early_exit_result = await plugin_manager.run_before_run_callback( |
| 1358 | invocation_context=invocation_context |
| 1359 | ) |
| 1360 | if isinstance(early_exit_result, types.Content): |
| 1361 | early_exit_event = Event( |
| 1362 | invocation_id=invocation_context.invocation_id, |
| 1363 | author='model', |
| 1364 | content=early_exit_result, |
| 1365 | ) |
| 1366 | _apply_run_config_custom_metadata( |
| 1367 | early_exit_event, invocation_context.run_config |
| 1368 | ) |
| 1369 | if self._should_append_event(early_exit_event, is_live_call): |
| 1370 | await self.session_service.append_event( |
| 1371 | session=invocation_context.session, |
| 1372 | event=early_exit_event, |
| 1373 | ) |
| 1374 | yield early_exit_event |
| 1375 | else: |
| 1376 | # Step 2: Otherwise continue with normal execution |
| 1377 | async with aclosing(execute_fn(invocation_context)) as agen: |
| 1378 | async for event in agen: |
| 1379 | _apply_run_config_custom_metadata( |
| 1380 | event, invocation_context.run_config |
| 1381 | ) |
| 1382 | # Step 3: Run the on_event callbacks before persisting so callback |
| 1383 | # changes are stored in the session and match the streamed event. |
| 1384 | modified_event = await plugin_manager.run_on_event_callback( |
| 1385 | invocation_context=invocation_context, event=event |
| 1386 | ) |
| 1387 | output_event = self._get_output_event( |
| 1388 | original_event=event, |
| 1389 | modified_event=modified_event, |
| 1390 | run_config=invocation_context.run_config, |
| 1391 | ) |
| 1392 |
no test coverage detected