Apply a data source to the registry with project-scoped deduplication. Filters existing data sources by both name and project (fixes feast-dev/feast#6206), preserving the original created_timestamp if the source already exists in the target project.
(
self, data_source: DataSource, project: str, commit: bool = True
)
| 388 | return proto_registry_utils.list_data_sources(registry_proto, project, tags) |
| 389 | |
| 390 | def apply_data_source( |
| 391 | self, data_source: DataSource, project: str, commit: bool = True |
| 392 | ): |
| 393 | """Apply a data source to the registry with project-scoped deduplication. |
| 394 | |
| 395 | Filters existing data sources by both name and project (fixes feast-dev/feast#6206), |
| 396 | preserving the original created_timestamp if the source already exists in the |
| 397 | target project. |
| 398 | """ |
| 399 | now = _utc_now() |
| 400 | if not data_source.created_timestamp: |
| 401 | data_source.created_timestamp = now |
| 402 | data_source.last_updated_timestamp = now |
| 403 | |
| 404 | registry = self._prepare_registry_for_changes(project) |
| 405 | |
| 406 | for idx, existing_data_source_proto in enumerate(registry.data_sources): |
| 407 | if ( |
| 408 | existing_data_source_proto.name == data_source.name |
| 409 | and existing_data_source_proto.project == project |
| 410 | ): |
| 411 | existing_data_source = DataSource.from_proto(existing_data_source_proto) |
| 412 | # Check if the data source has actually changed |
| 413 | if existing_data_source == data_source: |
| 414 | return |
| 415 | else: |
| 416 | # Preserve created_timestamp from existing data source |
| 417 | data_source.created_timestamp = ( |
| 418 | existing_data_source.created_timestamp |
| 419 | ) |
| 420 | del registry.data_sources[idx] |
| 421 | break |
| 422 | |
| 423 | data_source_proto = data_source.to_proto() |
| 424 | data_source_proto.project = project |
| 425 | data_source_proto.data_source_class_type = ( |
| 426 | f"{data_source.__class__.__module__}.{data_source.__class__.__name__}" |
| 427 | ) |
| 428 | self.cached_registry_proto.data_sources.append(data_source_proto) |
| 429 | if commit: |
| 430 | self.commit() |
| 431 | |
| 432 | def delete_data_source(self, name: str, project: str, commit: bool = True): |
| 433 | self._prepare_registry_for_changes(project) |
nothing calls this directly
no test coverage detected