MCPcopy
hub / github.com/ModelEngine-Group/nexent / forward

Function forward

backend/data_process/tasks.py:1622–1951  ·  view source on GitHub ↗

Vectorize and store processed chunks in Elasticsearch Args: processed_data: Dict containing chunks and metadata index_name: Name of the index to store documents source: Original source path (for metadata) source_type: The type of the source("local", "minio")

(
        self,
        processed_data: Dict,
        index_name: str,
        source: str,
        source_type: str = 'minio',
        original_filename: Optional[str] = None,
        authorization: Optional[str] = None
)

Source from the content-addressed store, hash-verified

1620
1621@app.task(bind=True, base=LoggingTask, name='data_process.tasks.forward', queue='forward_q')
1622def forward(
1623 self,
1624 processed_data: Dict,
1625 index_name: str,
1626 source: str,
1627 source_type: str = 'minio',
1628 original_filename: Optional[str] = None,
1629 authorization: Optional[str] = None
1630) -> Dict:
1631 """
1632 Vectorize and store processed chunks in Elasticsearch
1633
1634 Args:
1635 processed_data: Dict containing chunks and metadata
1636 index_name: Name of the index to store documents
1637 source: Original source path (for metadata)
1638 source_type: The type of the source("local", "minio")
1639 original_filename: The original name of the file
1640 authorization: Authorization header for API calls
1641
1642 Returns:
1643 Dict containing storage results and metadata
1644 """
1645 start_time = time.time()
1646 task_id = self.request.id
1647 # _warn_if_queue_mismatch("FORWARD TASK", "forward_q", self.request)
1648 original_source = source
1649 original_index_name = index_name
1650 filename = original_filename
1651
1652 try:
1653 ctx = _init_forward_context(
1654 task_id=task_id,
1655 request_id=str(self.request.id),
1656 start_time=start_time,
1657 source=source,
1658 index_name=index_name,
1659 source_type=source_type,
1660 original_filename=original_filename,
1661 )
1662
1663 # Before doing any heavy work, check whether this task has been explicitly cancelled.
1664 if _is_forward_task_cancelled(ctx):
1665 logger.info(
1666 f"[{self.request.id}] FORWARD TASK: Detected cancellation flag for task {task_id}; "
1667 f"skipping chunk forwarding for source '{source}' in index '{index_name}'."
1668 )
1669 return _build_forward_cancelled_result(ctx)
1670
1671 chunks, split_async, original_source, original_index_name, filename = _load_forward_chunks(
1672 self,
1673 processed_data=processed_data,
1674 original_source=original_source,
1675 original_index_name=original_index_name,
1676 filename=filename,
1677 )
1678
1679 # Calculate total chunks for progress tracking

Callers

nothing calls this directly

Calls 15

get_file_sizeFunction · 0.90
get_redis_serviceFunction · 0.90
_init_forward_contextFunction · 0.85
_load_forward_chunksFunction · 0.85
_send_chunks_to_esFunction · 0.85
_build_balanced_batchesFunction · 0.85
extract_error_codeFunction · 0.85
save_error_to_redisFunction · 0.85
appendMethod · 0.80
update_stateMethod · 0.80

Tested by

no test coverage detected