Materialize CSV data into DuckDB tables. Args: sources: List of sources to materialize. If None, materialize all. Returns: Dictionary mapping source names to materialization times
(self, sources: Optional[List[str]] = None)
| 38 | self.is_materialized = False |
| 39 | |
| 40 | def materialize_all(self, sources: Optional[List[str]] = None) -> Dict[str, float]: |
| 41 | """ |
| 42 | Materialize CSV data into DuckDB tables. |
| 43 | |
| 44 | Args: |
| 45 | sources: List of sources to materialize. If None, materialize all. |
| 46 | |
| 47 | Returns: |
| 48 | Dictionary mapping source names to materialization times |
| 49 | """ |
| 50 | if sources is None: |
| 51 | sources = list(self.TABLE_NAMES.keys()) |
| 52 | |
| 53 | timings = {} |
| 54 | |
| 55 | for source in sources: |
| 56 | if source not in self.TABLE_NAMES: |
| 57 | self.logger.warning(f"Unknown source: {source}") |
| 58 | continue |
| 59 | |
| 60 | start_time = time.time() |
| 61 | try: |
| 62 | self._materialize_source(source) |
| 63 | elapsed = time.time() - start_time |
| 64 | timings[source] = elapsed |
| 65 | self.logger.info(f"Materialized {source} in {elapsed:.2f}s") |
| 66 | except Exception as e: |
| 67 | self.logger.error(f"Failed to materialize {source}: {e}") |
| 68 | timings[source] = -1 |
| 69 | |
| 70 | self.is_materialized = len(timings) > 0 and all(t >= 0 for t in timings.values()) |
| 71 | return timings |
| 72 | |
| 73 | def _materialize_source(self, source: str): |
| 74 | """Materialize a single data source""" |
no test coverage detected