Create and initialize the registry.
(self)
| 386 | return self._registry |
| 387 | |
| 388 | def _create_registry(self) -> BaseRegistry: |
| 389 | """Create and initialize the registry.""" |
| 390 | registry_config = self.config.registry |
| 391 | if registry_config.registry_type == "sql": |
| 392 | return SqlRegistry(registry_config, self.config.project, None) |
| 393 | elif registry_config.registry_type == "snowflake.registry": |
| 394 | from feast.infra.registry.snowflake import SnowflakeRegistry |
| 395 | |
| 396 | return SnowflakeRegistry(registry_config, self.config.project, None) |
| 397 | elif registry_config and registry_config.registry_type == "remote": |
| 398 | from feast.infra.registry.remote import RemoteRegistry |
| 399 | |
| 400 | return RemoteRegistry( |
| 401 | registry_config, self.config.project, None, self.config.auth_config |
| 402 | ) |
| 403 | else: |
| 404 | return Registry( |
| 405 | self.config.project, |
| 406 | registry_config, |
| 407 | repo_path=self.repo_path, |
| 408 | auth_config=self.config.auth_config, |
| 409 | ) |
| 410 | |
| 411 | @property |
| 412 | def project(self) -> str: |
no test coverage detected