MCPcopy Index your code
hub / github.com/simstudioai/sim / batch_execution_example

Function batch_execution_example

packages/python-sdk/examples/basic_usage.py:104–154  ·  view source on GitHub ↗

Example 5: Batch workflow execution

()

Source from the content-addressed store, hash-verified

102
103
104def batch_execution_example():
105 """Example 5: Batch workflow execution"""
106 client = SimStudioClient(api_key=os.getenv("SIM_API_KEY"))
107
108 workflows = [
109 ("workflow-1", {"type": "analysis", "data": "sample1"}),
110 ("workflow-2", {"type": "processing", "data": "sample2"}),
111 ("workflow-3", {"type": "validation", "data": "sample3"}),
112 ]
113
114 results = []
115
116 for workflow_id, input_data in workflows:
117 try:
118 # Validate workflow before execution
119 if not client.validate_workflow(workflow_id):
120 print(f"⚠️ Skipping {workflow_id}: not deployed")
121 continue
122
123 result = client.execute_workflow(workflow_id, input_data)
124 results.append({
125 "workflow_id": workflow_id,
126 "success": result.success,
127 "output": result.output,
128 "error": result.error
129 })
130
131 status = "✅ Success" if result.success else "❌ Failed"
132 print(f"{status}: {workflow_id}")
133
134 except SimStudioError as error:
135 results.append({
136 "workflow_id": workflow_id,
137 "success": False,
138 "error": str(error)
139 })
140 print(f"❌ SDK Error in {workflow_id}: {error}")
141 except Exception as error:
142 results.append({
143 "workflow_id": workflow_id,
144 "success": False,
145 "error": str(error)
146 })
147 print(f"❌ Unexpected error in {workflow_id}: {error}")
148
149 # Summary
150 successful = sum(1 for r in results if r["success"])
151 total = len(results)
152 print(f"\n📊 Summary: {successful}/{total} workflows completed successfully")
153
154 return results
155
156
157def streaming_example():

Callers 1

basic_usage.pyFile · 0.85

Calls 4

validate_workflowMethod · 0.95
execute_workflowMethod · 0.95
SimStudioClientClass · 0.90
strFunction · 0.85

Tested by

no test coverage detected