Connect to servers and discover their tools. Returns tool count.
(self, servers: Optional[List[str]] = None)
| 176 | |
| 177 | # -- step 2: load ------------------------------------------------------ # |
| 178 | def load_tools(self, servers: Optional[List[str]] = None) -> int: |
| 179 | """Connect to servers and discover their tools. Returns tool count.""" |
| 180 | names = list(servers) if servers else list(self.servers) |
| 181 | for server in names: |
| 182 | self._connect(server) |
| 183 | for t in self._worker.list(server).tools: |
| 184 | qualified = f"{server}.{t.name}" |
| 185 | self.all_tools[qualified] = { |
| 186 | "name": qualified, |
| 187 | "server": server, |
| 188 | "tool": t.name, |
| 189 | "description": t.description or "", |
| 190 | "parameters": getattr(t, "inputSchema", {}) or {}, |
| 191 | } |
| 192 | # Register a bare alias when it is unambiguous. |
| 193 | if t.name not in self._alias and t.name not in self.all_tools: |
| 194 | self._alias[t.name] = qualified |
| 195 | elif self._alias.get(t.name) not in (None, qualified): |
| 196 | _log.warning("Ambiguous tool name '%s'; use '%s'.", |
| 197 | t.name, qualified) |
| 198 | self._alias.pop(t.name, None) |
| 199 | return len(self.all_tools) |
| 200 | |
| 201 | # -- step 3: run ------------------------------------------------------- # |
| 202 | def run(self, query, arguments: Optional[dict] = None) -> Any: |