MCPcopy
hub / github.com/HKUDS/DeepCode / DeepCodeListTasksTool

Class DeepCodeListTasksTool

nanobot/nanobot/agent/tools/deepcode.py:256–340  ·  view source on GitHub ↗

List active and recent DeepCode tasks.

Source from the content-addressed store, hash-verified

254
255
256class DeepCodeListTasksTool(Tool):
257 """List active and recent DeepCode tasks."""
258
259 def __init__(self, api_url: str | None = None):
260 self._api_url = api_url or _get_deepcode_url()
261
262 @property
263 def name(self) -> str:
264 return "deepcode_list_tasks"
265
266 @property
267 def description(self) -> str:
268 return (
269 "List all active and recent DeepCode code generation tasks. "
270 "Shows task IDs, status, progress, and results summary."
271 )
272
273 @property
274 def parameters(self) -> dict[str, Any]:
275 return {
276 "type": "object",
277 "properties": {
278 "limit": {
279 "type": "integer",
280 "description": "Maximum number of recent tasks to show. Default: 10",
281 "minimum": 1,
282 "maximum": 50,
283 },
284 },
285 }
286
287 async def execute(self, limit: int = 10, **kwargs: Any) -> str:
288 try:
289 async with httpx.AsyncClient(timeout=15.0) as client:
290 # Fetch active tasks
291 active_resp = await client.get(f"{self._api_url}/api/v1/workflows/active")
292 active_resp.raise_for_status()
293 active_data = active_resp.json()
294
295 # Fetch recent tasks
296 recent_resp = await client.get(
297 f"{self._api_url}/api/v1/workflows/recent",
298 params={"limit": limit},
299 )
300 recent_resp.raise_for_status()
301 recent_data = recent_resp.json()
302
303 lines = []
304
305 # Active tasks
306 active_tasks = active_data.get("tasks", [])
307 if active_tasks:
308 lines.append(f"=== Active Tasks ({len(active_tasks)}) ===")
309 for task in active_tasks:
310 lines.append(
311 f" [{task.get('status', '?')}] {task.get('task_id', '?')} "
312 f"- {task.get('progress', 0)}% - {task.get('message', '')}"
313 )

Callers 1

create_all_toolsFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected