MCPcopy
hub / github.com/ModelEngine-Group/nexent / _send_chunks_to_es

Function _send_chunks_to_es

backend/data_process/tasks.py:613–701  ·  view source on GitHub ↗
(
    chunks: List[Dict[str, Any]],
    index_name: str,
    authorization: str | None,
    task_id: Optional[str] = None,
    source: str = "",
    original_filename: str = "",
    large_mode: bool = False,
)

Source from the content-addressed store, hash-verified

611
612
613def _send_chunks_to_es(
614 chunks: List[Dict[str, Any]],
615 index_name: str,
616 authorization: str | None,
617 task_id: Optional[str] = None,
618 source: str = "",
619 original_filename: str = "",
620 large_mode: bool = False,
621) -> Dict[str, Any]:
622 async def _post():
623 elasticsearch_url = ELASTICSEARCH_SERVICE
624 if not elasticsearch_url:
625 raise _build_forward_error(
626 message="ELASTICSEARCH_SERVICE env is not set",
627 index_name=index_name,
628 source=source,
629 original_filename=original_filename,
630 )
631 route_url = f"/indices/{index_name}/documents"
632 full_url = elasticsearch_url + route_url
633 headers = {"Content-Type": "application/json"}
634 if authorization:
635 headers["Authorization"] = authorization
636 if task_id:
637 headers["X-Task-Id"] = task_id
638 try:
639 connector = aiohttp.TCPConnector(verify_ssl=False)
640 timeout = aiohttp.ClientTimeout(total=600)
641
642 request_params: Dict[str, str] = {}
643
644 if large_mode:
645 request_params["large_mode"] = "true"
646
647 async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
648 async with session.post(
649 full_url,
650 headers=headers,
651 json=chunks,
652 params=request_params,
653 raise_for_status=False
654 ) as response:
655 text = await response.text()
656 status = response.status
657 parsed_body = _parse_json_or_none(text)
658
659 if status >= 400:
660 error_code = _extract_error_code_from_es_response(
661 parsed_body, text)
662 if error_code:
663 raise Exception(json.dumps({
664 "error_code": error_code
665 }, ensure_ascii=False))
666
667 raise Exception(
668 f"ElasticSearch service returned HTTP {status}")
669
670 result = parsed_body if isinstance(parsed_body, dict) else await response.json()

Callers 2

forward_partFunction · 0.85
forwardFunction · 0.85

Calls 2

_postFunction · 0.85
run_asyncFunction · 0.70

Tested by

no test coverage detected