Load SQL fragment for a data source
(self, source_name: str)
| 459 | return required_sources |
| 460 | |
| 461 | def _load_fragment(self, source_name: str) -> str: |
| 462 | """Load SQL fragment for a data source""" |
| 463 | fragment_file = self.fragments_path / self.DATA_SOURCES[source_name]['fragment'] |
| 464 | |
| 465 | if not fragment_file.exists(): |
| 466 | raise FileNotFoundError(f"Fragment file not found: {fragment_file}") |
| 467 | |
| 468 | with open(fragment_file, 'r') as f: |
| 469 | return f.read() |
| 470 | |
| 471 | def build_dynamic_query(self, params: QueryParams, requested_columns: List[str], |
| 472 | latency_columns: Optional[List[str]] = None) -> str: |