MCPcopy
hub / github.com/dataelement/Clawith / trigger_task

Function trigger_task

backend/app/api/tasks.py:153–174  ·  view source on GitHub ↗

Manually trigger a supervision task execution (for testing).

(
    agent_id: uuid.UUID,
    task_id: uuid.UUID,
    current_user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
)

Source from the content-addressed store, hash-verified

151
152@router.post("/{task_id}/trigger")
153async def trigger_task(
154 agent_id: uuid.UUID,
155 task_id: uuid.UUID,
156 current_user: User = Depends(get_current_user),
157 db: AsyncSession = Depends(get_db),
158):
159 """Manually trigger a supervision task execution (for testing)."""
160 from app.core.permissions import is_agent_expired
161 agent, _access = await check_agent_access(db, current_user, agent_id)
162 if is_agent_expired(agent):
163 raise HTTPException(status_code=403, detail="Agent has expired")
164
165 result = await db.execute(select(Task).where(Task.id == task_id, Task.agent_id == agent_id))
166 task = result.scalar_one_or_none()
167 if not task:
168 raise HTTPException(status_code=404, detail="Task not found")
169
170 import asyncio
171 from app.services.task_executor import execute_task
172 asyncio.create_task(execute_task(task.id, agent_id))
173
174 return {"status": "triggered", "task_id": str(task_id)}

Callers

nothing calls this directly

Calls 6

check_agent_accessFunction · 0.90
is_agent_expiredFunction · 0.90
execute_taskFunction · 0.90
whereMethod · 0.80
executeMethod · 0.45
scalar_one_or_noneMethod · 0.45

Tested by

no test coverage detected