Cancel a running DeepCode task.
| 341 | |
| 342 | |
| 343 | class DeepCodeCancelTool(Tool): |
| 344 | """Cancel a running DeepCode task.""" |
| 345 | |
| 346 | def __init__(self, api_url: str | None = None): |
| 347 | self._api_url = api_url or _get_deepcode_url() |
| 348 | |
| 349 | @property |
| 350 | def name(self) -> str: |
| 351 | return "deepcode_cancel" |
| 352 | |
| 353 | @property |
| 354 | def description(self) -> str: |
| 355 | return "Cancel a running DeepCode code generation task." |
| 356 | |
| 357 | @property |
| 358 | def parameters(self) -> dict[str, Any]: |
| 359 | return { |
| 360 | "type": "object", |
| 361 | "properties": { |
| 362 | "task_id": { |
| 363 | "type": "string", |
| 364 | "description": "The task ID to cancel", |
| 365 | }, |
| 366 | }, |
| 367 | "required": ["task_id"], |
| 368 | } |
| 369 | |
| 370 | async def execute(self, task_id: str, **kwargs: Any) -> str: |
| 371 | try: |
| 372 | async with httpx.AsyncClient(timeout=15.0) as client: |
| 373 | response = await client.post(f"{self._api_url}/api/v1/workflows/cancel/{task_id}") |
| 374 | response.raise_for_status() |
| 375 | return f"Task '{task_id}' has been cancelled successfully." |
| 376 | except httpx.ConnectError: |
| 377 | return "Error: Cannot connect to DeepCode backend. Is the DeepCode service running?" |
| 378 | except httpx.HTTPStatusError as e: |
| 379 | if e.response.status_code == 400: |
| 380 | return f"Error: Task '{task_id}' not found or cannot be cancelled." |
| 381 | return ( |
| 382 | f"Error: DeepCode API returned status {e.response.status_code}: {e.response.text}" |
| 383 | ) |
| 384 | except Exception as e: |
| 385 | return f"Error cancelling task: {str(e)}" |
| 386 | |
| 387 | |
| 388 | class DeepCodeRespondTool(Tool): |