MCPcopy
hub / github.com/ModelEngine-Group/nexent / _load_forward_chunks

Function _load_forward_chunks

backend/data_process/tasks.py:448–584  ·  view source on GitHub ↗
(
    self: Task,
    *,
    processed_data: Dict[str, Any],
    original_source: str,
    original_index_name: str,
    filename: Optional[str],
)

Source from the content-addressed store, hash-verified

446
447
448def _load_forward_chunks(
449 self: Task,
450 *,
451 processed_data: Dict[str, Any],
452 original_source: str,
453 original_index_name: str,
454 filename: Optional[str],
455) -> Tuple[Optional[List[Dict[str, Any]]], bool, str, str, Optional[str]]:
456 chunks = processed_data.get('chunks')
457 split_async = bool(processed_data.get('split_async'))
458
459 # If chunks are not in payload, try loading from Redis via the redis_key
460 if (not chunks) and processed_data.get('redis_key'):
461 redis_key = processed_data.get('redis_key')
462 if not REDIS_BACKEND_URL:
463 raise Exception(json.dumps({
464 "message": "REDIS_BACKEND_URL not configured to retrieve chunks",
465 "index_name": original_index_name,
466 "task_name": "forward",
467 "source": original_source,
468 "original_filename": filename
469 }, ensure_ascii=False))
470 try:
471 import redis
472 client = redis.Redis.from_url(
473 REDIS_BACKEND_URL, decode_responses=True)
474 ready_key = f"{redis_key}:ready"
475 if split_async:
476 ready_flag = client.get(ready_key)
477 if not ready_flag:
478 retry_num = getattr(self.request, 'retries', 0)
479 logger.info(
480 f"[{self.request.id}] FORWARD TASK: Async split not ready for key {redis_key}. Retry {retry_num + 1}/{ASYNC_SPLIT_RETRY_MAX} in {FORWARD_REDIS_RETRY_DELAY_S}s")
481 raise self.retry(
482 countdown=FORWARD_REDIS_RETRY_DELAY_S,
483 max_retries=ASYNC_SPLIT_RETRY_MAX,
484 exc=Exception(json.dumps({
485 "message": "Async split not ready; will retry",
486 "index_name": original_index_name,
487 "task_name": "forward",
488 "source": original_source,
489 "original_filename": filename
490 }, ensure_ascii=False))
491 )
492 cached = client.get(redis_key)
493 if cached:
494 try:
495 logger.debug(
496 f"[{self.request.id}] FORWARD TASK: Retrieved Redis key '{redis_key}', payload_length={len(cached)}")
497 chunks = json.loads(cached)
498 except json.JSONDecodeError as jde:
499 # Log raw prefix to help diagnose incorrect writes
500 raw_preview = cached[:120] if isinstance(
501 cached, str) else str(type(cached))
502 logger.error(
503 f"[{self.request.id}] FORWARD TASK: JSON decode error for key '{redis_key}': {str(jde)}; raw_prefix={raw_preview!r}")
504 raise
505 else:

Callers 1

forwardFunction · 0.85

Calls 4

retryMethod · 0.80
errorMethod · 0.80
getMethod · 0.45
from_urlMethod · 0.45

Tested by

no test coverage detected