维护 screened/candidates/working/export 四层数据。
| 37 | |
| 38 | |
| 39 | class SourceInventory: |
| 40 | """维护 screened/candidates/working/export 四层数据。""" |
| 41 | |
| 42 | def __init__(self, base_dir: Path | str | None = None): |
| 43 | self.base_dir = resolve_legado_dir(base_dir) |
| 44 | self.project_root = self.base_dir.parent.parent |
| 45 | |
| 46 | self.policy = SourcePolicy(self.project_root) |
| 47 | self.updater = SafeUpdater(self.base_dir) |
| 48 | |
| 49 | self.raw_file = raw_pool_file(self.base_dir) |
| 50 | self.screened_file = screened_pool_file(self.base_dir) |
| 51 | self.screened_report = screened_report_file(self.base_dir) |
| 52 | self.candidate_file = candidate_pool_file(self.base_dir) |
| 53 | self.candidate_report = candidate_report_file(self.base_dir) |
| 54 | self.working_file = working_source_file(self.base_dir) |
| 55 | self.export_file = canonical_source_file(self.base_dir) |
| 56 | self.metadata_file = metadata_file(self.base_dir) |
| 57 | |
| 58 | config = _load_json(self.project_root / "config" / "supplement_config.json", {}) |
| 59 | inventory_cfg = config.get("inventory", {}) |
| 60 | supplement_cfg = config.get("supplement", {}) |
| 61 | |
| 62 | self.export_target = int(inventory_cfg.get("export_target", supplement_cfg.get("target_sources", 1000))) |
| 63 | self.working_target = int(inventory_cfg.get("working_target", self.export_target + 30)) |
| 64 | self.min_working_sources = int(inventory_cfg.get("min_working_sources", 950)) |
| 65 | self.max_working_sources = int(inventory_cfg.get("max_working_sources", 1050)) |
| 66 | self.min_candidate_sources = int(inventory_cfg.get("min_candidate_sources", 1800)) |
| 67 | self.screened_validation_batch = int(inventory_cfg.get("screened_validation_batch", 360)) |
| 68 | self.validation_oversample_factor = int(inventory_cfg.get("validation_oversample_factor", 3)) |
| 69 | self.max_per_domain = int(supplement_cfg.get("max_per_domain", 2)) |
| 70 | |
| 71 | def load_raw_sources(self) -> List[Dict]: |
| 72 | return _load_json(self.raw_file, []) |
| 73 | |
| 74 | def load_screened_sources(self) -> List[Dict]: |
| 75 | return _load_json(self.screened_file, []) |
| 76 | |
| 77 | def load_candidate_sources(self) -> List[Dict]: |
| 78 | return _load_json(self.candidate_file, []) |
| 79 | |
| 80 | def load_working_sources(self) -> List[Dict]: |
| 81 | if self.working_file.exists(): |
| 82 | return _load_json(self.working_file, []) |
| 83 | return _load_json(self.export_file, []) |
| 84 | |
| 85 | def _write_json(self, path: Path, payload) -> None: |
| 86 | path.parent.mkdir(parents=True, exist_ok=True) |
| 87 | with open(path, "w", encoding="utf-8") as f: |
| 88 | json.dump(payload, f, ensure_ascii=False, indent=2) |
| 89 | |
| 90 | def _stage_json(self, path: Path, payload) -> Path: |
| 91 | path.parent.mkdir(parents=True, exist_ok=True) |
| 92 | temp_path = path.with_name(f"{path.name}.tmp") |
| 93 | with open(temp_path, "w", encoding="utf-8") as f: |
| 94 | json.dump(payload, f, ensure_ascii=False, indent=2) |
| 95 | return temp_path |
| 96 |
no outgoing calls