Resolve the background callback manager for a specific MCP tool.
(tool_name: str)
| 28 | |
| 29 | |
| 30 | def _get_callback_manager(tool_name: str): |
| 31 | """Resolve the background callback manager for a specific MCP tool.""" |
| 32 | adapter = get_app().mcp_callback_map.find_by_tool_name(tool_name) |
| 33 | if adapter is None: |
| 34 | return None |
| 35 | # pylint: disable-next=protected-access |
| 36 | return adapter._cb_info.get("manager") |
| 37 | |
| 38 | |
| 39 | def create_task(dispatch_response: dict[str, Any], callback) -> CreateTaskResult: |
no test coverage detected
searching dependent graphs…