A router that registers /trigger/* routes on a FastAPI application. Each trigger endpoint auto-creates an ephemeral session, runs the agent, and returns the result in the format expected by the calling service. Features include: - Semaphore limits concurrent agent calls (default: 10)
| 214 | |
| 215 | |
| 216 | class TriggerRouter: |
| 217 | """A router that registers /trigger/* routes on a FastAPI application. |
| 218 | |
| 219 | Each trigger endpoint auto-creates an ephemeral session, runs the agent, |
| 220 | and returns the result in the format expected by the calling service. |
| 221 | |
| 222 | Features include: |
| 223 | - Semaphore limits concurrent agent calls (default: 10) |
| 224 | - Transient errors (429 / RESOURCE_EXHAUSTED) are retried with |
| 225 | exponential backoff + jitter |
| 226 | """ |
| 227 | |
| 228 | DEFAULT_TRIGGER_SOURCES = [] |
| 229 | """Trigger sources registered when ``trigger_sources`` is not specified. |
| 230 | By default, no triggers are registered to require explicit opt-in via CLI. |
| 231 | """ |
| 232 | VALID_TRIGGER_SOURCES = ["pubsub", "eventarc"] |
| 233 | """All trigger sources supported by this router.""" |
| 234 | |
| 235 | def __init__( |
| 236 | self, |
| 237 | adk_web_server: "AdkWebServer", |
| 238 | *, |
| 239 | trigger_sources: Optional[list[str]] = None, |
| 240 | max_concurrent: int = DEFAULT_MAX_CONCURRENT, |
| 241 | max_retries: int = DEFAULT_MAX_RETRIES, |
| 242 | retry_base_delay: float = DEFAULT_RETRY_BASE_DELAY, |
| 243 | retry_max_delay: float = DEFAULT_RETRY_MAX_DELAY, |
| 244 | ): |
| 245 | self._server = adk_web_server |
| 246 | resolved_sources = ( |
| 247 | trigger_sources |
| 248 | if trigger_sources is not None |
| 249 | else self.DEFAULT_TRIGGER_SOURCES |
| 250 | ) |
| 251 | unknown = set(resolved_sources) - set(self.VALID_TRIGGER_SOURCES) |
| 252 | if unknown: |
| 253 | logger.warning( |
| 254 | "Unknown trigger source(s) ignored: %s. Valid sources: %s", |
| 255 | ", ".join(sorted(unknown)), |
| 256 | ", ".join(self.VALID_TRIGGER_SOURCES), |
| 257 | ) |
| 258 | self._trigger_sources = [ |
| 259 | s for s in resolved_sources if s in self.VALID_TRIGGER_SOURCES |
| 260 | ] |
| 261 | self._semaphore = asyncio.Semaphore(max_concurrent) |
| 262 | self._max_retries = max_retries |
| 263 | self._retry_base_delay = retry_base_delay |
| 264 | self._retry_max_delay = retry_max_delay |
| 265 | |
| 266 | async def _run_agent( |
| 267 | self, |
| 268 | *, |
| 269 | app_name: str, |
| 270 | user_id: str, |
| 271 | message_text: str, |
| 272 | session_id: str, |
| 273 | ) -> list[Event]: |