| 1388 | self.apply_project(Project(name=project_name), commit=True) |
| 1389 | |
| 1390 | def _apply_object( |
| 1391 | self, |
| 1392 | table: Table, |
| 1393 | project: str, |
| 1394 | id_field_name: str, |
| 1395 | obj: Any, |
| 1396 | proto_field_name: str, |
| 1397 | name: Optional[str] = None, |
| 1398 | ): |
| 1399 | if not self.purge_feast_metadata: |
| 1400 | self._maybe_init_project_metadata(project) |
| 1401 | # Initialize project is necessary because FeatureStore object can apply objects individually without "feast apply" cli option |
| 1402 | if not isinstance(obj, Project): |
| 1403 | self._initialize_project_if_not_exists(project_name=project) |
| 1404 | name = name or (obj.name if hasattr(obj, "name") else None) |
| 1405 | assert name, f"name needs to be provided for {obj}" |
| 1406 | |
| 1407 | with self.write_engine.begin() as conn: |
| 1408 | update_datetime = _utc_now() |
| 1409 | update_time = int(update_datetime.timestamp()) |
| 1410 | stmt = select(table).where( |
| 1411 | getattr(table.c, id_field_name) == name, table.c.project_id == project |
| 1412 | ) |
| 1413 | row = conn.execute(stmt).first() |
| 1414 | |
| 1415 | if row: |
| 1416 | if proto_field_name in [ |
| 1417 | "entity_proto", |
| 1418 | "saved_dataset_proto", |
| 1419 | "feature_view_proto", |
| 1420 | "feature_service_proto", |
| 1421 | "permission_proto", |
| 1422 | "project_proto", |
| 1423 | "data_source_proto", |
| 1424 | ]: |
| 1425 | deserialized_proto = self.deserialize_registry_values( |
| 1426 | row._mapping[proto_field_name], type(obj) |
| 1427 | ) |
| 1428 | if deserialized_proto is not None: |
| 1429 | # Check if the object has actually changed (same as feature views) |
| 1430 | existing_obj = type(obj).from_proto(deserialized_proto) |
| 1431 | if existing_obj == obj: |
| 1432 | return # No changes, exit early |
| 1433 | |
| 1434 | # Object has changed, preserve created_timestamp, update last_updated_timestamp |
| 1435 | obj.created_timestamp = deserialized_proto.meta.created_timestamp.ToDatetime().replace( |
| 1436 | tzinfo=timezone.utc |
| 1437 | ) |
| 1438 | if hasattr(obj, "last_updated_timestamp"): |
| 1439 | obj.last_updated_timestamp = update_datetime |
| 1440 | if isinstance(obj, (FeatureView, StreamFeatureView)): |
| 1441 | obj.update_materialization_intervals( |
| 1442 | type(obj) |
| 1443 | .from_proto(deserialized_proto) |
| 1444 | .materialization_intervals |
| 1445 | ) |
| 1446 | |
| 1447 | values = { |