MCPcopy
hub / github.com/TransformerOptimus/SuperAGI / replace_old_iteration_workflows

Function replace_old_iteration_workflows

main.py:173–192  ·  view source on GitHub ↗
(session)

Source from the content-addressed store, hash-verified

171
172
173def replace_old_iteration_workflows(session):
174 templates = session.query(AgentTemplate).all()
175 for template in templates:
176 iter_workflow = IterationWorkflow.find_by_id(session, template.agent_workflow_id)
177 if not iter_workflow:
178 continue
179 if iter_workflow.name == "Fixed Task Queue":
180 agent_workflow = AgentWorkflow.find_by_name(session, "Fixed Task Workflow")
181 template.agent_workflow_id = agent_workflow.id
182 session.commit()
183
184 if iter_workflow.name == "Maintain Task Queue":
185 agent_workflow = AgentWorkflow.find_by_name(session, "Dynamic Task Workflow")
186 template.agent_workflow_id = agent_workflow.id
187 session.commit()
188
189 if iter_workflow.name == "Don't Maintain Task Queue" or iter_workflow.name == "Goal Based Agent":
190 agent_workflow = AgentWorkflow.find_by_name(session, "Goal Based Workflow")
191 template.agent_workflow_id = agent_workflow.id
192 session.commit()
193
194@app.on_event("startup")
195async def startup_event():

Callers 1

startup_eventFunction · 0.85

Calls 2

find_by_nameMethod · 0.80
find_by_idMethod · 0.45

Tested by

no test coverage detected