MCPcopy
hub / github.com/feast-dev/feast / apply_data_source

Method apply_data_source

sdk/python/feast/infra/registry/registry.py:390–430  ·  view source on GitHub ↗

Apply a data source to the registry with project-scoped deduplication. Filters existing data sources by both name and project (fixes feast-dev/feast#6206), preserving the original created_timestamp if the source already exists in the target project.

(
        self, data_source: DataSource, project: str, commit: bool = True
    )

Source from the content-addressed store, hash-verified

388 return proto_registry_utils.list_data_sources(registry_proto, project, tags)
389
390 def apply_data_source(
391 self, data_source: DataSource, project: str, commit: bool = True
392 ):
393 """Apply a data source to the registry with project-scoped deduplication.
394
395 Filters existing data sources by both name and project (fixes feast-dev/feast#6206),
396 preserving the original created_timestamp if the source already exists in the
397 target project.
398 """
399 now = _utc_now()
400 if not data_source.created_timestamp:
401 data_source.created_timestamp = now
402 data_source.last_updated_timestamp = now
403
404 registry = self._prepare_registry_for_changes(project)
405
406 for idx, existing_data_source_proto in enumerate(registry.data_sources):
407 if (
408 existing_data_source_proto.name == data_source.name
409 and existing_data_source_proto.project == project
410 ):
411 existing_data_source = DataSource.from_proto(existing_data_source_proto)
412 # Check if the data source has actually changed
413 if existing_data_source == data_source:
414 return
415 else:
416 # Preserve created_timestamp from existing data source
417 data_source.created_timestamp = (
418 existing_data_source.created_timestamp
419 )
420 del registry.data_sources[idx]
421 break
422
423 data_source_proto = data_source.to_proto()
424 data_source_proto.project = project
425 data_source_proto.data_source_class_type = (
426 f"{data_source.__class__.__module__}.{data_source.__class__.__name__}"
427 )
428 self.cached_registry_proto.data_sources.append(data_source_proto)
429 if commit:
430 self.commit()
431
432 def delete_data_source(self, name: str, project: str, commit: bool = True):
433 self._prepare_registry_for_changes(project)

Callers

nothing calls this directly

Calls 5

commitMethod · 0.95
_utc_nowFunction · 0.90
from_protoMethod · 0.45
to_protoMethod · 0.45

Tested by

no test coverage detected