MCPcopy
hub / github.com/ModelEngine-Group/nexent / trigger_data_process

Function trigger_data_process

backend/utils/file_management_utils.py:42–127  ·  view source on GitHub ↗

Trigger data processing service to handle uploaded files

(files: List[dict], process_params: ProcessParams)

Source from the content-addressed store, hash-verified

40
41
42async def trigger_data_process(files: List[dict], process_params: ProcessParams):
43 """Trigger data processing service to handle uploaded files"""
44 try:
45 if not files:
46 return None
47
48 # Get tenant_id from authorization for downstream task processing
49 embedding_model_id = process_params.model_id
50 tenant_id = None
51 try:
52 _, tenant_id = get_current_user_id(process_params.authorization)
53 except Exception as e:
54 logger.warning(f"Failed to get tenant_id from authorization: {e}")
55
56 # Build headers with authorization
57 headers = {
58 "Authorization": f"Bearer {process_params.authorization}"
59 }
60
61 # Build source data list
62 if len(files) == 1:
63 # Single file request
64 file_details = files[0]
65 payload = {
66 "source": file_details.get("path_or_url"),
67 "source_type": process_params.source_type,
68 "chunking_strategy": process_params.chunking_strategy,
69 "index_name": process_params.index_name,
70 "original_filename": file_details.get("filename"),
71 "embedding_model_id": embedding_model_id,
72 "tenant_id": tenant_id
73 }
74
75 try:
76 async with httpx.AsyncClient() as client:
77 response = await client.post(f"{DATA_PROCESS_SERVICE}/tasks", headers=headers, json=payload, timeout=30.0)
78
79 if response.status_code == 201:
80 return response.json()
81 else:
82 logger.error(
83 "Error from data process service: %s - %s", response,
84 response.text if hasattr(response, 'text') else 'No response text')
85 return {"status": "error", "code": response.status_code,
86 "message": f"Data process service error: {response.status_code}"}
87 except httpx.RequestError as e:
88 logger.error("Failed to connect to data process service: %s", str(e))
89 return {"status": "error", "code": "CONNECTION_ERROR",
90 "message": f"Failed to connect to data process service: {str(e)}"}
91
92 else:
93 # Batch file request
94 sources = []
95 for file_details in files:
96 source = {
97 "source": file_details.get("path_or_url"),
98 "source_type": process_params.source_type,
99 "chunking_strategy": process_params.chunking_strategy,

Callers 2

process_filesFunction · 0.90
upload_filesFunction · 0.90

Calls 6

get_current_user_idFunction · 0.90
errorMethod · 0.80
appendMethod · 0.80
getMethod · 0.45
postMethod · 0.45
jsonMethod · 0.45

Tested by

no test coverage detected