MCPcopy Index your code
hub / github.com/feast-dev/feast / SnowflakeRegistry

Class SnowflakeRegistry

sdk/python/feast/infra/registry/snowflake.py:125–1586  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

123
124
125class SnowflakeRegistry(BaseRegistry):
126 def __init__(
127 self,
128 registry_config,
129 project: str,
130 repo_path,
131 ):
132 assert registry_config is not None and isinstance(
133 registry_config, SnowflakeRegistryConfig
134 ), "SnowflakeRegistry needs a valid registry_config, a path does not work"
135
136 self.registry_config = registry_config
137 self.registry_path = (
138 f'"{self.registry_config.database}"."{self.registry_config.schema_}"'
139 )
140
141 with GetSnowflakeConnection(self.registry_config) as conn:
142 sql_function_file = f"{os.path.dirname(feast.__file__)}/infra/utils/snowflake/registry/snowflake_table_creation.sql"
143 with open(sql_function_file, "r") as file:
144 sql_cmds = [
145 cmd.strip() for cmd in file.read().split(";") if cmd.strip()
146 ]
147 for command in sql_cmds:
148 query = command.replace("REGISTRY_PATH", f"{self.registry_path}")
149 execute_snowflake_statement(conn, query)
150
151 self.purge_feast_metadata = registry_config.purge_feast_metadata
152 self.project = project
153
154 # Initialize cache state before any method that may trigger
155 # _refresh_cached_registry_if_necessary (e.g. proto(), get_project()).
156 self._refresh_lock = Lock()
157 self.cached_registry_proto = None
158 self.cached_registry_proto_created = None
159 self.cached_registry_proto_ttl = timedelta(
160 seconds=(
161 registry_config.cache_ttl_seconds
162 if registry_config.cache_ttl_seconds is not None
163 else 0
164 )
165 )
166
167 self._sync_feast_metadata_to_projects_table()
168 if not self.purge_feast_metadata:
169 self._maybe_init_project_metadata(project)
170
171 self.cached_registry_proto = self.proto()
172 self.cached_registry_proto_created = _utc_now()
173
174 def _sync_feast_metadata_to_projects_table(self):
175 feast_metadata_projects: set[str] = set()
176 projects_set: set[str] = set()
177
178 with GetSnowflakeConnection(self.registry_config) as conn:
179 query = (
180 f'SELECT DISTINCT project_id FROM {self.registry_path}."FEAST_METADATA"'
181 )
182 df = execute_snowflake_statement(conn, query).fetch_pandas_all()

Callers 4

_create_registryMethod · 0.90
snowflake_registryFunction · 0.90
registryFunction · 0.90
_make_registryMethod · 0.90

Calls

no outgoing calls

Tested by 3

snowflake_registryFunction · 0.72
registryFunction · 0.72
_make_registryMethod · 0.72