Set the lifecycle state of a feature view.
(ctx: click.Context, name: str, state: str)
| 158 | ) |
| 159 | @click.pass_context |
| 160 | def feature_view_set_state(ctx: click.Context, name: str, state: str): |
| 161 | """ |
| 162 | Set the lifecycle state of a feature view. |
| 163 | """ |
| 164 | store = create_feature_store(ctx) |
| 165 | try: |
| 166 | fv = store.registry.get_any_feature_view(name, store.project) |
| 167 | except FeastObjectNotFoundException as e: |
| 168 | print(e) |
| 169 | sys.exit(1) |
| 170 | |
| 171 | if not isinstance(fv, (FeatureView, OnDemandFeatureView)): |
| 172 | print(f"Feature view '{name}' does not support state management.") |
| 173 | return |
| 174 | |
| 175 | new_state = FeatureViewState[state.upper()] |
| 176 | if fv.state == new_state: |
| 177 | print(f"Feature view '{name}' is already in state {new_state.name}.") |
| 178 | return |
| 179 | |
| 180 | if not fv.state.can_transition_to(new_state): |
| 181 | current = fv.state.name |
| 182 | allowed = _VALID_STATE_TRANSITIONS.get(fv.state, set()) |
| 183 | allowed_names = ", ".join(sorted(s.name for s in allowed)) or "none" |
| 184 | print( |
| 185 | f"Invalid state transition: {current} -> {new_state.name}. " |
| 186 | f"Allowed transitions from {current}: {allowed_names}." |
| 187 | ) |
| 188 | return |
| 189 | |
| 190 | fv.state = new_state |
| 191 | store.registry.apply_feature_view(fv, store.project) |
| 192 | print(f"Feature view '{name}' state set to {new_state.name}.") |
| 193 | |
| 194 | |
| 195 | @feature_views_cmd.command("list-versions") |
nothing calls this directly
no test coverage detected