Core logic to process feishu events from both webhook and WS client. Manages its own short-lived database transactions to avoid holding connections open during slow LLM/HTTP operations.
(agent_id: uuid.UUID, body: dict)
| 490 | |
| 491 | |
| 492 | async def process_feishu_event(agent_id: uuid.UUID, body: dict): |
| 493 | """Core logic to process feishu events from both webhook and WS client. |
| 494 | |
| 495 | Manages its own short-lived database transactions to avoid holding connections |
| 496 | open during slow LLM/HTTP operations. |
| 497 | """ |
| 498 | import json as _json |
| 499 | logger.info(f"[Feishu] Event processing for {agent_id}: event_type={body.get('header', {}).get('event_type', 'N/A')}") |
| 500 | |
| 501 | # Deduplicate — Feishu retries on slow responses |
| 502 | # Only mark as processed AFTER successful handling so retries work on crash |
| 503 | event_id = body.get("header", {}).get("event_id", "") |
| 504 | if event_id in _processed_events: |
| 505 | return {"code": 0, "msg": "already processed"} |
| 506 | |
| 507 | # ── Phase 1: Short transaction — load config + agent/model for LLM ── |
| 508 | async with _async_session() as db: |
| 509 | result = await db.execute( |
| 510 | select(ChannelConfig).where( |
| 511 | ChannelConfig.agent_id == agent_id, |
| 512 | ChannelConfig.channel_type == "feishu", |
| 513 | ) |
| 514 | ) |
| 515 | config = result.scalar_one_or_none() |
| 516 | # Pre-load agent and model configs for LLM call (avoids extra session later) |
| 517 | _agent_model, _llm_model, _fallback_model = await _load_agent_and_model(db, agent_id) |
| 518 | # Objects are now detached but their column attributes are already loaded. |
| 519 | if not config: |
| 520 | return {"code": 1, "msg": "Channel not found"} |
| 521 | |
| 522 | # Mark event as processed after config is loaded successfully |
| 523 | if event_id: |
| 524 | _processed_events.add(event_id) |
| 525 | # Keep set bounded |
| 526 | if len(_processed_events) > 1000: |
| 527 | _processed_events.clear() |
| 528 | |
| 529 | # Handle events |
| 530 | event = body.get("event", {}) |
| 531 | event_type = body.get("header", {}).get("event_type", "") |
| 532 | |
| 533 | if event_type == "im.message.receive_v1": |
| 534 | message = event.get("message", {}) |
| 535 | sender = event.get("sender", {}).get("sender_id", {}) |
| 536 | sender_open_id = sender.get("open_id", "") |
| 537 | sender_user_id_from_event = sender.get("user_id", "") # tenant-stable ID, available directly in event body |
| 538 | msg_type = message.get("message_type", "text") |
| 539 | chat_type = message.get("chat_type", "p2p") # p2p or group |
| 540 | chat_id = message.get("chat_id", "") |
| 541 | |
| 542 | logger.info(f"[Feishu] Received {msg_type} message, chat_type={chat_type}, open_id={sender_open_id!r}, user_id_from_event={sender_user_id_from_event!r}") |
| 543 | |
| 544 | # ── Normalize post (rich text) → extract text + schedule image downloads ── |
| 545 | if msg_type == "post": |
| 546 | import json as _json_post |
| 547 | _post_body = _json_post.loads(message.get("content", "{}")) |
| 548 | # Feishu post content: {"title": "...", "content": [[{"tag":"text","text":"..."},...],...]} |
| 549 | # The content may be nested under a locale key like "zh_cn" |
no test coverage detected