| 123 | |
| 124 | |
| 125 | class SnowflakeRegistry(BaseRegistry): |
| 126 | def __init__( |
| 127 | self, |
| 128 | registry_config, |
| 129 | project: str, |
| 130 | repo_path, |
| 131 | ): |
| 132 | assert registry_config is not None and isinstance( |
| 133 | registry_config, SnowflakeRegistryConfig |
| 134 | ), "SnowflakeRegistry needs a valid registry_config, a path does not work" |
| 135 | |
| 136 | self.registry_config = registry_config |
| 137 | self.registry_path = ( |
| 138 | f'"{self.registry_config.database}"."{self.registry_config.schema_}"' |
| 139 | ) |
| 140 | |
| 141 | with GetSnowflakeConnection(self.registry_config) as conn: |
| 142 | sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql" |
| 143 | with open(sql_function_file, "r") as file: |
| 144 | sql_cmds = [ |
| 145 | cmd.strip() for cmd in file.read().split(";") if cmd.strip() |
| 146 | ] |
| 147 | for command in sql_cmds: |
| 148 | query = command.replace("REGISTRY_PATH", f"{self.registry_path}") |
| 149 | execute_snowflake_statement(conn, query) |
| 150 | |
| 151 | self.purge_feast_metadata = registry_config.purge_feast_metadata |
| 152 | self.project = project |
| 153 | |
| 154 | # Initialize cache state before any method that may trigger |
| 155 | # _refresh_cached_registry_if_necessary (e.g. proto(), get_project()). |
| 156 | self._refresh_lock = Lock() |
| 157 | self.cached_registry_proto = None |
| 158 | self.cached_registry_proto_created = None |
| 159 | self.cached_registry_proto_ttl = timedelta( |
| 160 | seconds=( |
| 161 | registry_config.cache_ttl_seconds |
| 162 | if registry_config.cache_ttl_seconds is not None |
| 163 | else 0 |
| 164 | ) |
| 165 | ) |
| 166 | |
| 167 | self._sync_feast_metadata_to_projects_table() |
| 168 | if not self.purge_feast_metadata: |
| 169 | self._maybe_init_project_metadata(project) |
| 170 | |
| 171 | self.cached_registry_proto = self.proto() |
| 172 | self.cached_registry_proto_created = _utc_now() |
| 173 | |
| 174 | def _sync_feast_metadata_to_projects_table(self): |
| 175 | feast_metadata_projects: set[str] = set() |
| 176 | projects_set: set[str] = set() |
| 177 | |
| 178 | with GetSnowflakeConnection(self.registry_config) as conn: |
| 179 | query = ( |
| 180 | f'SELECT DISTINCT project_id FROM {self.registry_path}."FEAST_METADATA"' |
| 181 | ) |
| 182 | df = execute_snowflake_statement(conn, query).fetch_pandas_all() |
no outgoing calls