Load and aggregate overviews for the given index IDs. Strategy: 1. Attempt to load each index's dedicated overview file. 2. Aggregate all overviews found across available files (deduplicated). 3. If none of the index files exist, fall back to the legacy globa
(self, idx_ids: List[str])
| 366 | return self._simple_pattern_routing(message, idx_ids) |
| 367 | |
| 368 | def _load_document_overviews(self, idx_ids: List[str]) -> List[str]: |
| 369 | """Load and aggregate overviews for the given index IDs. |
| 370 | |
| 371 | Strategy: |
| 372 | 1. Attempt to load each index's dedicated overview file. |
| 373 | 2. Aggregate all overviews found across available files (deduplicated). |
| 374 | 3. If none of the index files exist, fall back to the legacy global overview file. |
| 375 | """ |
| 376 | import os, json |
| 377 | |
| 378 | aggregated: list[str] = [] |
| 379 | |
| 380 | # 1️⃣ Collect overviews from per-index files |
| 381 | for idx in idx_ids: |
| 382 | candidate_paths = [ |
| 383 | f"../index_store/overviews/{idx}.jsonl", |
| 384 | f"index_store/overviews/{idx}.jsonl", |
| 385 | f"./index_store/overviews/{idx}.jsonl", |
| 386 | ] |
| 387 | for p in candidate_paths: |
| 388 | if os.path.exists(p): |
| 389 | print(f"📖 Loading overviews from: {p}") |
| 390 | try: |
| 391 | with open(p, "r", encoding="utf-8") as f: |
| 392 | for line in f: |
| 393 | if not line.strip(): |
| 394 | continue |
| 395 | try: |
| 396 | record = json.loads(line) |
| 397 | overview = record.get("overview", "").strip() |
| 398 | if overview: |
| 399 | aggregated.append(overview) |
| 400 | except json.JSONDecodeError: |
| 401 | continue # skip malformed lines |
| 402 | break # Stop after the first existing path for this idx |
| 403 | except Exception as e: |
| 404 | print(f"⚠️ Error reading {p}: {e}") |
| 405 | break # Don't keep trying other paths for this idx if read failed |
| 406 | |
| 407 | # 2️⃣ Fall back to legacy global file if no per-index overviews found |
| 408 | if not aggregated: |
| 409 | legacy_paths = [ |
| 410 | "../index_store/overviews/overviews.jsonl", |
| 411 | "index_store/overviews/overviews.jsonl", |
| 412 | "./index_store/overviews/overviews.jsonl", |
| 413 | ] |
| 414 | for p in legacy_paths: |
| 415 | if os.path.exists(p): |
| 416 | print(f"⚠️ Falling back to legacy overviews file: {p}") |
| 417 | try: |
| 418 | with open(p, "r", encoding="utf-8") as f: |
| 419 | for line in f: |
| 420 | if not line.strip(): |
| 421 | continue |
| 422 | try: |
| 423 | record = json.loads(line) |
| 424 | overview = record.get("overview", "").strip() |
| 425 | if overview: |