Resolve workflow inputs against definitions and provided values.
(
self,
definition: WorkflowDefinition,
provided: dict[str, Any],
)
| 1182 | return slots[:collected] |
| 1183 | |
| 1184 | def _resolve_inputs( |
| 1185 | self, |
| 1186 | definition: WorkflowDefinition, |
| 1187 | provided: dict[str, Any], |
| 1188 | ) -> dict[str, Any]: |
| 1189 | """Resolve workflow inputs against definitions and provided values.""" |
| 1190 | resolved: dict[str, Any] = {} |
| 1191 | for name, input_def in definition.inputs.items(): |
| 1192 | if not isinstance(input_def, dict): |
| 1193 | continue |
| 1194 | if name in provided: |
| 1195 | # Resolve sentinels for explicitly-provided values too: a |
| 1196 | # caller passing ``{"integration": "auto"}`` (which the |
| 1197 | # workflow prompt advertises as a valid value) must be |
| 1198 | # treated identically to omitting the input and letting the |
| 1199 | # default flow through, so dispatch never sees the literal |
| 1200 | # sentinel. |
| 1201 | value = self._resolve_default(name, provided[name]) |
| 1202 | elif "default" in input_def: |
| 1203 | value = self._resolve_default(name, input_def["default"]) |
| 1204 | elif input_def.get("required", False): |
| 1205 | msg = f"Required input {name!r} not provided." |
| 1206 | raise ValueError(msg) |
| 1207 | else: |
| 1208 | continue |
| 1209 | |
| 1210 | # When the ``integration`` default could not be resolved against |
| 1211 | # project state and falls back to the literal ``"auto"`` |
| 1212 | # sentinel, strip ``enum`` from the input definition before |
| 1213 | # coercion so a workflow that lists specific integrations in |
| 1214 | # ``enum`` does not crash at runtime on the sentinel value. |
| 1215 | # NOTE: only enum-membership is skipped; ``_coerce_input`` |
| 1216 | # still enforces the declared ``type`` against the filtered |
| 1217 | # definition (``string`` rejects non-strings, ``number`` rejects |
| 1218 | # bools and uncoercible values, ``boolean`` rejects non-bools), |
| 1219 | # so ill-typed values still fail fast here. |
| 1220 | coerce_input_def = input_def |
| 1221 | if ( |
| 1222 | name == "integration" |
| 1223 | and value == "auto" |
| 1224 | and "enum" in input_def |
| 1225 | ): |
| 1226 | coerce_input_def = { |
| 1227 | key: val |
| 1228 | for key, val in input_def.items() |
| 1229 | if key != "enum" |
| 1230 | } |
| 1231 | resolved[name] = self._coerce_input(name, value, coerce_input_def) |
| 1232 | return resolved |
| 1233 | |
| 1234 | def _resolve_default(self, name: str, default: Any) -> Any: |
| 1235 | """Resolve special default sentinels against project state. |