Register /trigger/* routes on the FastAPI app. Only endpoints whose source name appears in ``self._trigger_sources`` are registered.
(self, app: FastAPI)
| 389 | ) |
| 390 | |
| 391 | def register(self, app: FastAPI) -> None: |
| 392 | """Register /trigger/* routes on the FastAPI app. |
| 393 | |
| 394 | Only endpoints whose source name appears in ``self._trigger_sources`` |
| 395 | are registered. |
| 396 | """ |
| 397 | |
| 398 | if "pubsub" in self._trigger_sources: |
| 399 | |
| 400 | @app.post( |
| 401 | "/apps/{app_name}/trigger/pubsub", |
| 402 | response_model=TriggerResponse, |
| 403 | tags=[TAG_TRIGGERS], |
| 404 | summary="Pub/Sub push subscription trigger", |
| 405 | description=( |
| 406 | "Processes a message from a Pub/Sub push subscription." |
| 407 | " Returns 200 on success; errors trigger Pub/Sub retry." |
| 408 | " Includes automatic retry with backoff on 429 errors." |
| 409 | ), |
| 410 | ) |
| 411 | async def trigger_pubsub( |
| 412 | app_name: str, req: PubSubTriggerRequest, request: Request |
| 413 | ) -> TriggerResponse: |
| 414 | subscription = req.subscription or "pubsub-caller" |
| 415 | user_id = subscription.replace("/", "--") |
| 416 | |
| 417 | decoded_data = None |
| 418 | data_payload = None |
| 419 | if req.message.data: |
| 420 | try: |
| 421 | decoded_data = base64.b64decode(req.message.data).decode("utf-8") |
| 422 | try: |
| 423 | data_payload = json.loads(decoded_data) |
| 424 | except json.JSONDecodeError: |
| 425 | data_payload = decoded_data |
| 426 | except Exception as e: |
| 427 | logger.exception("Failed to decode Pub/Sub message data") |
| 428 | raise HTTPException( |
| 429 | status_code=400, |
| 430 | detail=f"Invalid base64 message data: {e}", |
| 431 | ) from e |
| 432 | |
| 433 | message_text = json.dumps( |
| 434 | {"data": data_payload, "attributes": req.message.attributes or {}} |
| 435 | ) |
| 436 | |
| 437 | logger.info( |
| 438 | "Pub/Sub trigger: subscription=%s, messageId=%s", |
| 439 | req.subscription, |
| 440 | req.message.messageId, |
| 441 | ) |
| 442 | |
| 443 | try: |
| 444 | await self._run_agent_with_retry( |
| 445 | app_name=app_name, |
| 446 | user_id=user_id, |
| 447 | message_text=message_text, |
| 448 | ) |