| 446 | |
| 447 | |
| 448 | def _load_forward_chunks( |
| 449 | self: Task, |
| 450 | *, |
| 451 | processed_data: Dict[str, Any], |
| 452 | original_source: str, |
| 453 | original_index_name: str, |
| 454 | filename: Optional[str], |
| 455 | ) -> Tuple[Optional[List[Dict[str, Any]]], bool, str, str, Optional[str]]: |
| 456 | chunks = processed_data.get('chunks') |
| 457 | split_async = bool(processed_data.get('split_async')) |
| 458 | |
| 459 | # If chunks are not in payload, try loading from Redis via the redis_key |
| 460 | if (not chunks) and processed_data.get('redis_key'): |
| 461 | redis_key = processed_data.get('redis_key') |
| 462 | if not REDIS_BACKEND_URL: |
| 463 | raise Exception(json.dumps({ |
| 464 | "message": "REDIS_BACKEND_URL not configured to retrieve chunks", |
| 465 | "index_name": original_index_name, |
| 466 | "task_name": "forward", |
| 467 | "source": original_source, |
| 468 | "original_filename": filename |
| 469 | }, ensure_ascii=False)) |
| 470 | try: |
| 471 | import redis |
| 472 | client = redis.Redis.from_url( |
| 473 | REDIS_BACKEND_URL, decode_responses=True) |
| 474 | ready_key = f"{redis_key}:ready" |
| 475 | if split_async: |
| 476 | ready_flag = client.get(ready_key) |
| 477 | if not ready_flag: |
| 478 | retry_num = getattr(self.request, 'retries', 0) |
| 479 | logger.info( |
| 480 | f"[{self.request.id}] FORWARD TASK: Async split not ready for key {redis_key}. Retry {retry_num + 1}/{ASYNC_SPLIT_RETRY_MAX} in {FORWARD_REDIS_RETRY_DELAY_S}s") |
| 481 | raise self.retry( |
| 482 | countdown=FORWARD_REDIS_RETRY_DELAY_S, |
| 483 | max_retries=ASYNC_SPLIT_RETRY_MAX, |
| 484 | exc=Exception(json.dumps({ |
| 485 | "message": "Async split not ready; will retry", |
| 486 | "index_name": original_index_name, |
| 487 | "task_name": "forward", |
| 488 | "source": original_source, |
| 489 | "original_filename": filename |
| 490 | }, ensure_ascii=False)) |
| 491 | ) |
| 492 | cached = client.get(redis_key) |
| 493 | if cached: |
| 494 | try: |
| 495 | logger.debug( |
| 496 | f"[{self.request.id}] FORWARD TASK: Retrieved Redis key '{redis_key}', payload_length={len(cached)}") |
| 497 | chunks = json.loads(cached) |
| 498 | except json.JSONDecodeError as jde: |
| 499 | # Log raw prefix to help diagnose incorrect writes |
| 500 | raw_preview = cached[:120] if isinstance( |
| 501 | cached, str) else str(type(cached)) |
| 502 | logger.error( |
| 503 | f"[{self.request.id}] FORWARD TASK: JSON decode error for key '{redis_key}': {str(jde)}; raw_prefix={raw_preview!r}") |
| 504 | raise |
| 505 | else: |