Handles tool discovery, preparation, and execution. Extracted from BaseAgent to separate concerns and enable tool caching.
| 146 | |
| 147 | |
| 148 | class ToolExecutor: |
| 149 | """Handles tool discovery, preparation, and execution. |
| 150 | |
| 151 | Extracted from BaseAgent to separate concerns and enable tool caching. |
| 152 | """ |
| 153 | |
| 154 | def __init__( |
| 155 | self, |
| 156 | user_api_key: Optional[str] = None, |
| 157 | user: Optional[str] = None, |
| 158 | decoded_token: Optional[Dict] = None, |
| 159 | agent_id: Optional[str] = None, |
| 160 | *, |
| 161 | headless: bool = False, |
| 162 | tool_allowlist: Optional[List[str]] = None, |
| 163 | ): |
| 164 | self.user_api_key = user_api_key |
| 165 | self.user = user |
| 166 | self.decoded_token = decoded_token |
| 167 | self.agent_id = agent_id |
| 168 | # Headless mode (scheduled / webhook): no human to resolve a pause, |
| 169 | # so check_pause returns headless_denied sentinels instead. |
| 170 | self.headless = bool(headless) |
| 171 | # Tool-instance ids pre-authorized for headless approval-gated execution. |
| 172 | self.tool_allowlist: set = ( |
| 173 | {str(x) for x in tool_allowlist} if tool_allowlist else set() |
| 174 | ) |
| 175 | self.tool_calls: List[Dict] = [] |
| 176 | self._loaded_tools: Dict[str, object] = {} |
| 177 | self.conversation_id: Optional[str] = None |
| 178 | self.message_id: Optional[str] = None |
| 179 | self.client_tools: Optional[List[Dict]] = None |
| 180 | self._name_to_tool: Dict[str, Tuple[str, str]] = {} |
| 181 | self._tool_to_name: Dict[Tuple[str, str], str] = {} |
| 182 | # Filled by the LLMHandler.handle_tool_calls headless loop. |
| 183 | self.headless_denials: List[Dict] = [] |
| 184 | |
| 185 | def get_tools(self) -> Dict[str, Dict]: |
| 186 | """Load tool configs from DB based on user context. |
| 187 | |
| 188 | If *client_tools* have been set on this executor, they are |
| 189 | automatically merged into the returned dict. |
| 190 | """ |
| 191 | if self.user_api_key: |
| 192 | tools = self._get_tools_by_api_key(self.user_api_key) |
| 193 | else: |
| 194 | tools = self._get_user_tools(self.user or "local") |
| 195 | if self.client_tools: |
| 196 | self.merge_client_tools(tools, self.client_tools) |
| 197 | return tools |
| 198 | |
| 199 | def _get_tools_by_api_key(self, api_key: str) -> Dict[str, Dict]: |
| 200 | """Resolve an agent's toolset — exactly ``agents.tools``, no defaults.""" |
| 201 | # Per-operation session: the answer pipeline spans a long-lived |
| 202 | # generator; wrapping it in a single connection would pin a PG |
| 203 | # conn for the whole stream. Open, fetch, close. |
| 204 | with db_readonly() as conn: |
| 205 | agent_data = AgentsRepository(conn).find_by_key(api_key) |
no outgoing calls