MCPcopy Index your code
hub / github.com/google/adk-python / TriggerRouter

Class TriggerRouter

src/google/adk/cli/trigger_routes.py:216–580  ·  view source on GitHub ↗

A router that registers /trigger/* routes on a FastAPI application. Each trigger endpoint auto-creates an ephemeral session, runs the agent, and returns the result in the format expected by the calling service. Features include: - Semaphore limits concurrent agent calls (default: 10)

Source from the content-addressed store, hash-verified

214
215
216class TriggerRouter:
217 """A router that registers /trigger/* routes on a FastAPI application.
218
219 Each trigger endpoint auto-creates an ephemeral session, runs the agent,
220 and returns the result in the format expected by the calling service.
221
222 Features include:
223 - Semaphore limits concurrent agent calls (default: 10)
224 - Transient errors (429 / RESOURCE_EXHAUSTED) are retried with
225 exponential backoff + jitter
226 """
227
228 DEFAULT_TRIGGER_SOURCES = []
229 """Trigger sources registered when ``trigger_sources`` is not specified.
230 By default, no triggers are registered to require explicit opt-in via CLI.
231 """
232 VALID_TRIGGER_SOURCES = ["pubsub", "eventarc"]
233 """All trigger sources supported by this router."""
234
235 def __init__(
236 self,
237 adk_web_server: "AdkWebServer",
238 *,
239 trigger_sources: Optional[list[str]] = None,
240 max_concurrent: int = DEFAULT_MAX_CONCURRENT,
241 max_retries: int = DEFAULT_MAX_RETRIES,
242 retry_base_delay: float = DEFAULT_RETRY_BASE_DELAY,
243 retry_max_delay: float = DEFAULT_RETRY_MAX_DELAY,
244 ):
245 self._server = adk_web_server
246 resolved_sources = (
247 trigger_sources
248 if trigger_sources is not None
249 else self.DEFAULT_TRIGGER_SOURCES
250 )
251 unknown = set(resolved_sources) - set(self.VALID_TRIGGER_SOURCES)
252 if unknown:
253 logger.warning(
254 "Unknown trigger source(s) ignored: %s. Valid sources: %s",
255 ", ".join(sorted(unknown)),
256 ", ".join(self.VALID_TRIGGER_SOURCES),
257 )
258 self._trigger_sources = [
259 s for s in resolved_sources if s in self.VALID_TRIGGER_SOURCES
260 ]
261 self._semaphore = asyncio.Semaphore(max_concurrent)
262 self._max_retries = max_retries
263 self._retry_base_delay = retry_base_delay
264 self._retry_max_delay = retry_max_delay
265
266 async def _run_agent(
267 self,
268 *,
269 app_name: str,
270 user_id: str,
271 message_text: str,
272 session_id: str,
273 ) -> list[Event]:

Callers 1

get_fast_api_appMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected