Trigger data processing service to handle uploaded files
(files: List[dict], process_params: ProcessParams)
| 40 | |
| 41 | |
| 42 | async def trigger_data_process(files: List[dict], process_params: ProcessParams): |
| 43 | """Trigger data processing service to handle uploaded files""" |
| 44 | try: |
| 45 | if not files: |
| 46 | return None |
| 47 | |
| 48 | # Get tenant_id from authorization for downstream task processing |
| 49 | embedding_model_id = process_params.model_id |
| 50 | tenant_id = None |
| 51 | try: |
| 52 | _, tenant_id = get_current_user_id(process_params.authorization) |
| 53 | except Exception as e: |
| 54 | logger.warning(f"Failed to get tenant_id from authorization: {e}") |
| 55 | |
| 56 | # Build headers with authorization |
| 57 | headers = { |
| 58 | "Authorization": f"Bearer {process_params.authorization}" |
| 59 | } |
| 60 | |
| 61 | # Build source data list |
| 62 | if len(files) == 1: |
| 63 | # Single file request |
| 64 | file_details = files[0] |
| 65 | payload = { |
| 66 | "source": file_details.get("path_or_url"), |
| 67 | "source_type": process_params.source_type, |
| 68 | "chunking_strategy": process_params.chunking_strategy, |
| 69 | "index_name": process_params.index_name, |
| 70 | "original_filename": file_details.get("filename"), |
| 71 | "embedding_model_id": embedding_model_id, |
| 72 | "tenant_id": tenant_id |
| 73 | } |
| 74 | |
| 75 | try: |
| 76 | async with httpx.AsyncClient() as client: |
| 77 | response = await client.post(f"{DATA_PROCESS_SERVICE}/tasks", headers=headers, json=payload, timeout=30.0) |
| 78 | |
| 79 | if response.status_code == 201: |
| 80 | return response.json() |
| 81 | else: |
| 82 | logger.error( |
| 83 | "Error from data process service: %s - %s", response, |
| 84 | response.text if hasattr(response, 'text') else 'No response text') |
| 85 | return {"status": "error", "code": response.status_code, |
| 86 | "message": f"Data process service error: {response.status_code}"} |
| 87 | except httpx.RequestError as e: |
| 88 | logger.error("Failed to connect to data process service: %s", str(e)) |
| 89 | return {"status": "error", "code": "CONNECTION_ERROR", |
| 90 | "message": f"Failed to connect to data process service: {str(e)}"} |
| 91 | |
| 92 | else: |
| 93 | # Batch file request |
| 94 | sources = [] |
| 95 | for file_details in files: |
| 96 | source = { |
| 97 | "source": file_details.get("path_or_url"), |
| 98 | "source_type": process_params.source_type, |
| 99 | "chunking_strategy": process_params.chunking_strategy, |