MCPcopy Index your code
hub / github.com/dataelement/Clawith / process_feishu_event

Function process_feishu_event

backend/app/api/feishu.py:492–1201  ·  view source on GitHub ↗

Core logic to process feishu events from both webhook and WS client. Manages its own short-lived database transactions to avoid holding connections open during slow LLM/HTTP operations.

(agent_id: uuid.UUID, body: dict)

Source from the content-addressed store, hash-verified

490
491
492async def process_feishu_event(agent_id: uuid.UUID, body: dict):
493 """Core logic to process feishu events from both webhook and WS client.
494
495 Manages its own short-lived database transactions to avoid holding connections
496 open during slow LLM/HTTP operations.
497 """
498 import json as _json
499 logger.info(f"[Feishu] Event processing for {agent_id}: event_type={body.get('header', {}).get('event_type', 'N/A')}")
500
501 # Deduplicate — Feishu retries on slow responses
502 # Only mark as processed AFTER successful handling so retries work on crash
503 event_id = body.get("header", {}).get("event_id", "")
504 if event_id in _processed_events:
505 return {"code": 0, "msg": "already processed"}
506
507 # ── Phase 1: Short transaction — load config + agent/model for LLM ──
508 async with _async_session() as db:
509 result = await db.execute(
510 select(ChannelConfig).where(
511 ChannelConfig.agent_id == agent_id,
512 ChannelConfig.channel_type == "feishu",
513 )
514 )
515 config = result.scalar_one_or_none()
516 # Pre-load agent and model configs for LLM call (avoids extra session later)
517 _agent_model, _llm_model, _fallback_model = await _load_agent_and_model(db, agent_id)
518 # Objects are now detached but their column attributes are already loaded.
519 if not config:
520 return {"code": 1, "msg": "Channel not found"}
521
522 # Mark event as processed after config is loaded successfully
523 if event_id:
524 _processed_events.add(event_id)
525 # Keep set bounded
526 if len(_processed_events) > 1000:
527 _processed_events.clear()
528
529 # Handle events
530 event = body.get("event", {})
531 event_type = body.get("header", {}).get("event_type", "")
532
533 if event_type == "im.message.receive_v1":
534 message = event.get("message", {})
535 sender = event.get("sender", {}).get("sender_id", {})
536 sender_open_id = sender.get("open_id", "")
537 sender_user_id_from_event = sender.get("user_id", "") # tenant-stable ID, available directly in event body
538 msg_type = message.get("message_type", "text")
539 chat_type = message.get("chat_type", "p2p") # p2p or group
540 chat_id = message.get("chat_id", "")
541
542 logger.info(f"[Feishu] Received {msg_type} message, chat_type={chat_type}, open_id={sender_open_id!r}, user_id_from_event={sender_user_id_from_event!r}")
543
544 # ── Normalize post (rich text) → extract text + schedule image downloads ──
545 if msg_type == "post":
546 import json as _json_post
547 _post_body = _json_post.loads(message.get("content", "{}"))
548 # Feishu post content: {"title": "...", "content": [[{"tag":"text","text":"..."},...],...]}
549 # The content may be nested under a locale key like "zh_cn"

Callers 2

_async_handle_messageMethod · 0.90
feishu_event_webhookFunction · 0.85

Calls 15

store_agent_uploadFunction · 0.90
ChatMessageClass · 0.90
get_storage_backendFunction · 0.90
agent_upload_keyFunction · 0.90
execute_taskFunction · 0.90
log_activityFunction · 0.90
_load_agent_and_modelFunction · 0.85
_handle_feishu_fileFunction · 0.85
_storage_mtimeFunction · 0.85
_SerialPatchQueueClass · 0.85

Tested by

no test coverage detected