(
chunks: List[Dict[str, Any]],
index_name: str,
authorization: str | None,
task_id: Optional[str] = None,
source: str = "",
original_filename: str = "",
large_mode: bool = False,
)
| 611 | |
| 612 | |
| 613 | def _send_chunks_to_es( |
| 614 | chunks: List[Dict[str, Any]], |
| 615 | index_name: str, |
| 616 | authorization: str | None, |
| 617 | task_id: Optional[str] = None, |
| 618 | source: str = "", |
| 619 | original_filename: str = "", |
| 620 | large_mode: bool = False, |
| 621 | ) -> Dict[str, Any]: |
| 622 | async def _post(): |
| 623 | elasticsearch_url = ELASTICSEARCH_SERVICE |
| 624 | if not elasticsearch_url: |
| 625 | raise _build_forward_error( |
| 626 | message="ELASTICSEARCH_SERVICE env is not set", |
| 627 | index_name=index_name, |
| 628 | source=source, |
| 629 | original_filename=original_filename, |
| 630 | ) |
| 631 | route_url = f"/indices/{index_name}/documents" |
| 632 | full_url = elasticsearch_url + route_url |
| 633 | headers = {"Content-Type": "application/json"} |
| 634 | if authorization: |
| 635 | headers["Authorization"] = authorization |
| 636 | if task_id: |
| 637 | headers["X-Task-Id"] = task_id |
| 638 | try: |
| 639 | connector = aiohttp.TCPConnector(verify_ssl=False) |
| 640 | timeout = aiohttp.ClientTimeout(total=600) |
| 641 | |
| 642 | request_params: Dict[str, str] = {} |
| 643 | |
| 644 | if large_mode: |
| 645 | request_params["large_mode"] = "true" |
| 646 | |
| 647 | async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: |
| 648 | async with session.post( |
| 649 | full_url, |
| 650 | headers=headers, |
| 651 | json=chunks, |
| 652 | params=request_params, |
| 653 | raise_for_status=False |
| 654 | ) as response: |
| 655 | text = await response.text() |
| 656 | status = response.status |
| 657 | parsed_body = _parse_json_or_none(text) |
| 658 | |
| 659 | if status >= 400: |
| 660 | error_code = _extract_error_code_from_es_response( |
| 661 | parsed_body, text) |
| 662 | if error_code: |
| 663 | raise Exception(json.dumps({ |
| 664 | "error_code": error_code |
| 665 | }, ensure_ascii=False)) |
| 666 | |
| 667 | raise Exception( |
| 668 | f"ElasticSearch service returned HTTP {status}") |
| 669 | |
| 670 | result = parsed_body if isinstance(parsed_body, dict) else await response.json() |
no test coverage detected