Test connectivity to Atlassian Rovo MCP and list available tools.
(
agent_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
)
| 128 | |
| 129 | @router.post("/agents/{agent_id}/atlassian-channel/test") |
| 130 | async def test_atlassian_channel( |
| 131 | agent_id: uuid.UUID, |
| 132 | current_user: User = Depends(get_current_user), |
| 133 | db: AsyncSession = Depends(get_db), |
| 134 | ): |
| 135 | """Test connectivity to Atlassian Rovo MCP and list available tools.""" |
| 136 | await check_agent_access(db, current_user, agent_id) |
| 137 | result = await db.execute( |
| 138 | select(ChannelConfig).where( |
| 139 | ChannelConfig.agent_id == agent_id, |
| 140 | ChannelConfig.channel_type == "atlassian", |
| 141 | ) |
| 142 | ) |
| 143 | config = result.scalar_one_or_none() |
| 144 | if not config or not config.app_secret: |
| 145 | raise HTTPException(status_code=400, detail="Atlassian not configured") |
| 146 | |
| 147 | from app.services.mcp_client import MCPClient |
| 148 | try: |
| 149 | client = MCPClient(ATLASSIAN_MCP_URL, api_key=config.app_secret) |
| 150 | tools = await client.list_tools() |
| 151 | return { |
| 152 | "ok": True, |
| 153 | "tool_count": len(tools), |
| 154 | "tools": [{"name": t["name"], "description": t.get("description", "")[:100]} for t in tools[:10]], |
| 155 | "message": f"✅ Connected to Atlassian Rovo MCP — {len(tools)} tools available", |
| 156 | } |
| 157 | except Exception as e: |
| 158 | return {"ok": False, "error": str(e)[:300]} |
| 159 | |
| 160 | |
| 161 | # ─── Internal helper ──────────────────────────────────── |
no test coverage detected