| 282 | |
| 283 | |
| 284 | class SqlRegistry(CachingRegistry): |
| 285 | def __init__( |
| 286 | self, |
| 287 | registry_config, |
| 288 | project: str, |
| 289 | repo_path: Optional[Path], |
| 290 | ): |
| 291 | assert registry_config is not None and isinstance( |
| 292 | registry_config, SqlRegistryConfig |
| 293 | ), "SqlRegistry needs a valid registry_config" |
| 294 | |
| 295 | self.registry_config = registry_config |
| 296 | |
| 297 | self.write_engine: Engine = create_engine( |
| 298 | registry_config.path, **registry_config.sqlalchemy_config_kwargs |
| 299 | ) |
| 300 | if registry_config.read_path: |
| 301 | self.read_engine: Engine = create_engine( |
| 302 | registry_config.read_path, |
| 303 | **registry_config.sqlalchemy_config_kwargs, |
| 304 | ) |
| 305 | else: |
| 306 | self.read_engine = self.write_engine |
| 307 | metadata.create_all(self.write_engine) |
| 308 | self.thread_pool_executor_worker_count = ( |
| 309 | registry_config.thread_pool_executor_worker_count |
| 310 | ) |
| 311 | self.purge_feast_metadata = registry_config.purge_feast_metadata |
| 312 | self.enable_online_versioning = ( |
| 313 | registry_config.enable_online_feature_view_versioning |
| 314 | ) |
| 315 | super().__init__( |
| 316 | project=project, |
| 317 | cache_ttl_seconds=registry_config.cache_ttl_seconds, |
| 318 | cache_mode=registry_config.cache_mode, |
| 319 | ) |
| 320 | # Sync feast_metadata to projects table |
| 321 | # when purge_feast_metadata is set to True, Delete data from |
| 322 | # feast_metadata table and list_project_metadata will not return any data |
| 323 | self._sync_feast_metadata_to_projects_table() |
| 324 | if not self.purge_feast_metadata: |
| 325 | self._maybe_init_project_metadata(project) |
| 326 | |
| 327 | def _sync_feast_metadata_to_projects_table(self): |
| 328 | feast_metadata_projects: dict = {} |
| 329 | projects_set: set = [] |
| 330 | with self.read_engine.begin() as conn: |
| 331 | stmt = select(feast_metadata).where( |
| 332 | feast_metadata.c.metadata_key == FeastMetadataKeys.PROJECT_UUID.value |
| 333 | ) |
| 334 | rows = conn.execute(stmt).all() |
| 335 | for row in rows: |
| 336 | feast_metadata_projects[row._mapping["project_id"]] = int( |
| 337 | row._mapping["last_updated_timestamp"] |
| 338 | ) |
| 339 | |
| 340 | if len(feast_metadata_projects) > 0: |
| 341 | with self.read_engine.begin() as conn: |
no outgoing calls