Execute a workflow with optional input data. If async_execution is True, returns immediately with a task ID. File objects in input will be automatically detected and converted to base64. Args: workflow_id: The ID of the workflow to execute i
(
self,
workflow_id: str,
input: Optional[Any] = None,
*,
timeout: float = 30.0,
stream: Optional[bool] = None,
selected_outputs: Optional[list] = None,
async_execution: Optional[bool] = None
)
| 148 | return value |
| 149 | |
| 150 | def execute_workflow( |
| 151 | self, |
| 152 | workflow_id: str, |
| 153 | input: Optional[Any] = None, |
| 154 | *, |
| 155 | timeout: float = 30.0, |
| 156 | stream: Optional[bool] = None, |
| 157 | selected_outputs: Optional[list] = None, |
| 158 | async_execution: Optional[bool] = None |
| 159 | ) -> Union[WorkflowExecutionResult, AsyncExecutionResult]: |
| 160 | """ |
| 161 | Execute a workflow with optional input data. |
| 162 | If async_execution is True, returns immediately with a task ID. |
| 163 | |
| 164 | File objects in input will be automatically detected and converted to base64. |
| 165 | |
| 166 | Args: |
| 167 | workflow_id: The ID of the workflow to execute |
| 168 | input: Input data to pass to the workflow. Can be a dict (spread at root level), |
| 169 | primitive value (string, number, bool), or list (wrapped in 'input' field). |
| 170 | File-like objects within dicts are automatically converted to base64. |
| 171 | timeout: Timeout in seconds (default: 30.0) |
| 172 | stream: Enable streaming responses (default: None) |
| 173 | selected_outputs: Block outputs to stream (e.g., ["agent1.content"]) |
| 174 | async_execution: Execute asynchronously (default: None) |
| 175 | |
| 176 | Returns: |
| 177 | WorkflowExecutionResult or AsyncExecutionResult object |
| 178 | |
| 179 | Raises: |
| 180 | SimStudioError: If the workflow execution fails |
| 181 | """ |
| 182 | url = f"{self.base_url}/api/workflows/{workflow_id}/execute" |
| 183 | |
| 184 | # Build headers - async execution uses X-Execution-Mode header |
| 185 | headers = self._session.headers.copy() |
| 186 | if async_execution: |
| 187 | headers['X-Execution-Mode'] = 'async' |
| 188 | |
| 189 | try: |
| 190 | # Build JSON body - spread dict inputs at root level, wrap primitives/lists in 'input' field |
| 191 | body = {} |
| 192 | if input is not None: |
| 193 | if isinstance(input, dict): |
| 194 | # Dict input: spread at root level (matches curl/API behavior) |
| 195 | body = input.copy() |
| 196 | else: |
| 197 | # Primitive or list input: wrap in 'input' field |
| 198 | body = {'input': input} |
| 199 | |
| 200 | # Convert any file objects in the input to base64 format |
| 201 | body = self._convert_files_to_base64(body) |
| 202 | |
| 203 | if stream is not None: |
| 204 | body['stream'] = stream |
| 205 | if selected_outputs is not None: |
| 206 | body['selectedOutputs'] = selected_outputs |
| 207 |