Vectorize and store processed chunks in Elasticsearch Args: processed_data: Dict containing chunks and metadata index_name: Name of the index to store documents source: Original source path (for metadata) source_type: The type of the source("local", "minio")
(
self,
processed_data: Dict,
index_name: str,
source: str,
source_type: str = 'minio',
original_filename: Optional[str] = None,
authorization: Optional[str] = None
)
| 1620 | |
| 1621 | @app.task(bind=True, base=LoggingTask, name='data_process.tasks.forward', queue='forward_q') |
| 1622 | def forward( |
| 1623 | self, |
| 1624 | processed_data: Dict, |
| 1625 | index_name: str, |
| 1626 | source: str, |
| 1627 | source_type: str = 'minio', |
| 1628 | original_filename: Optional[str] = None, |
| 1629 | authorization: Optional[str] = None |
| 1630 | ) -> Dict: |
| 1631 | """ |
| 1632 | Vectorize and store processed chunks in Elasticsearch |
| 1633 | |
| 1634 | Args: |
| 1635 | processed_data: Dict containing chunks and metadata |
| 1636 | index_name: Name of the index to store documents |
| 1637 | source: Original source path (for metadata) |
| 1638 | source_type: The type of the source("local", "minio") |
| 1639 | original_filename: The original name of the file |
| 1640 | authorization: Authorization header for API calls |
| 1641 | |
| 1642 | Returns: |
| 1643 | Dict containing storage results and metadata |
| 1644 | """ |
| 1645 | start_time = time.time() |
| 1646 | task_id = self.request.id |
| 1647 | # _warn_if_queue_mismatch("FORWARD TASK", "forward_q", self.request) |
| 1648 | original_source = source |
| 1649 | original_index_name = index_name |
| 1650 | filename = original_filename |
| 1651 | |
| 1652 | try: |
| 1653 | ctx = _init_forward_context( |
| 1654 | task_id=task_id, |
| 1655 | request_id=str(self.request.id), |
| 1656 | start_time=start_time, |
| 1657 | source=source, |
| 1658 | index_name=index_name, |
| 1659 | source_type=source_type, |
| 1660 | original_filename=original_filename, |
| 1661 | ) |
| 1662 | |
| 1663 | # Before doing any heavy work, check whether this task has been explicitly cancelled. |
| 1664 | if _is_forward_task_cancelled(ctx): |
| 1665 | logger.info( |
| 1666 | f"[{self.request.id}] FORWARD TASK: Detected cancellation flag for task {task_id}; " |
| 1667 | f"skipping chunk forwarding for source '{source}' in index '{index_name}'." |
| 1668 | ) |
| 1669 | return _build_forward_cancelled_result(ctx) |
| 1670 | |
| 1671 | chunks, split_async, original_source, original_index_name, filename = _load_forward_chunks( |
| 1672 | self, |
| 1673 | processed_data=processed_data, |
| 1674 | original_source=original_source, |
| 1675 | original_index_name=original_index_name, |
| 1676 | filename=filename, |
| 1677 | ) |
| 1678 | |
| 1679 | # Calculate total chunks for progress tracking |
nothing calls this directly
no test coverage detected