List resolved labels from the offline store with conflict policy enforcement.
(request: ListLabelsRequest)
| 170 | |
| 171 | @rest_app.post("/list-labels") |
| 172 | def list_labels(request: ListLabelsRequest): |
| 173 | """List resolved labels from the offline store with conflict policy enforcement.""" |
| 174 | try: |
| 175 | from feast.labeling.conflict_policy import ConflictPolicy |
| 176 | from feast.labeling.conflict_resolver import resolve_conflicts |
| 177 | |
| 178 | fv_name = request.feature_view |
| 179 | fv: Any = None |
| 180 | try: |
| 181 | fv = store.registry.get_label_view(fv_name, store.project) |
| 182 | except Exception: |
| 183 | try: |
| 184 | fv = store.get_feature_view(fv_name) |
| 185 | except Exception: |
| 186 | pass |
| 187 | |
| 188 | if fv is None: |
| 189 | return Response( |
| 190 | content=json.dumps({"detail": f"Label view '{fv_name}' not found"}), |
| 191 | status_code=status.HTTP_404_NOT_FOUND, |
| 192 | media_type="application/json", |
| 193 | ) |
| 194 | |
| 195 | batch_source = getattr(fv, "batch_source", None) |
| 196 | if batch_source is None: |
| 197 | return Response( |
| 198 | content=json.dumps({"detail": f"No batch source for '{fv_name}'"}), |
| 199 | status_code=status.HTTP_400_BAD_REQUEST, |
| 200 | media_type="application/json", |
| 201 | ) |
| 202 | |
| 203 | feature_names = [f.name for f in fv.features] |
| 204 | join_keys = ( |
| 205 | fv.join_keys |
| 206 | if hasattr(fv, "join_keys") |
| 207 | else (fv.entities if fv.entities else []) |
| 208 | ) |
| 209 | |
| 210 | provider = store._get_provider() |
| 211 | timestamp_field = batch_source.timestamp_field |
| 212 | |
| 213 | conflict_policy = getattr( |
| 214 | fv, "conflict_policy", ConflictPolicy.LAST_WRITE_WINS |
| 215 | ) |
| 216 | labeler_field = getattr(fv, "labeler_field", "labeler") |
| 217 | |
| 218 | try: |
| 219 | job = provider.offline_store.pull_all_from_table_or_query( |
| 220 | config=store.config, |
| 221 | data_source=batch_source, |
| 222 | join_key_columns=join_keys, |
| 223 | feature_name_columns=feature_names, |
| 224 | timestamp_field=timestamp_field, |
| 225 | ) |
| 226 | df = job.to_df() |
| 227 | except Exception: |
| 228 | return { |
| 229 | "labels": [], |
nothing calls this directly
no test coverage detected