(self, query: str, table_name: str = None, session_id: str = None, compose_sub_answers: Optional[bool] = None, query_decompose: Optional[bool] = None, ai_rerank: Optional[bool] = None, context_expand: Optional[bool] = None, verify: Optional[bool] = None, retrieval_k: Optional[int] = None, context_window_size: Optional[int] = None, reranker_top_k: Optional[int] = None, search_type: Optional[str] = None, dense_weight: Optional[float] = None, max_retries: int = 1, event_callback: Optional[callable] = None)
| 258 | |
| 259 | # ---------------- Main async implementation -------------------------------------- |
| 260 | async def _run_async(self, query: str, table_name: str = None, session_id: str = None, compose_sub_answers: Optional[bool] = None, query_decompose: Optional[bool] = None, ai_rerank: Optional[bool] = None, context_expand: Optional[bool] = None, verify: Optional[bool] = None, retrieval_k: Optional[int] = None, context_window_size: Optional[int] = None, reranker_top_k: Optional[int] = None, search_type: Optional[str] = None, dense_weight: Optional[float] = None, max_retries: int = 1, event_callback: Optional[callable] = None) -> Dict[str, Any]: |
| 261 | start_time = time.time() |
| 262 | |
| 263 | # Emit analyze event at the start |
| 264 | if event_callback: |
| 265 | event_callback("analyze", {"query": query}) |
| 266 | |
| 267 | # 🚀 NEW: Get conversation history |
| 268 | history = self.chat_histories.get(session_id, []) if session_id else [] |
| 269 | |
| 270 | # 🔄 Refresh overviews for this session if available |
| 271 | # if session_id and session_id != getattr(self, "_current_overview_session", None): |
| 272 | # candidate_path = os.path.join("index_store", "overviews", f"{session_id}.jsonl") |
| 273 | # if os.path.exists(candidate_path): |
| 274 | # self._load_overviews(candidate_path) |
| 275 | # self._current_overview_session = session_id |
| 276 | # else: |
| 277 | # # Fall back to global overviews if per-session file not found |
| 278 | # if self._current_overview_session != "GLOBAL": |
| 279 | # self._load_overviews(self._global_overview_path) |
| 280 | # self._current_overview_session = "GLOBAL" |
| 281 | |
| 282 | query_type = await self._triage_query_async(query, history) |
| 283 | print(f"🎯 ROUTING DEBUG: Final triage decision: '{query_type}'") |
| 284 | print(f"Agent Triage Decision: '{query_type}'") |
| 285 | |
| 286 | # Create a contextual query that includes history for most operations |
| 287 | contextual_query = self._format_query_with_history(query, history) |
| 288 | raw_query = query.strip() |
| 289 | |
| 290 | # --- Apply runtime AI reranker override (must happen before any retrieval calls) --- |
| 291 | if ai_rerank is not None: |
| 292 | rr_cfg = self.retrieval_pipeline.config.setdefault("reranker", {}) |
| 293 | rr_cfg["enabled"] = bool(ai_rerank) |
| 294 | if ai_rerank: |
| 295 | # Ensure the pipeline knows to use the external ColBERT reranker |
| 296 | rr_cfg.setdefault("type", "ai") |
| 297 | rr_cfg.setdefault("strategy", "rerankers-lib") |
| 298 | rr_cfg.setdefault( |
| 299 | "model_name", |
| 300 | # Falls back to ColBERT-small if the caller did not supply one |
| 301 | self.ollama_config.get("rerank_model", "answerai-colbert-small-v1"), |
| 302 | ) |
| 303 | |
| 304 | # --- Apply runtime retrieval configuration overrides --- |
| 305 | if retrieval_k is not None: |
| 306 | self.retrieval_pipeline.config["retrieval_k"] = retrieval_k |
| 307 | print(f"🔍 Retrieval K set to: {retrieval_k}") |
| 308 | |
| 309 | if context_window_size is not None: |
| 310 | self.retrieval_pipeline.config["context_window_size"] = context_window_size |
| 311 | print(f"🔍 Context window size set to: {context_window_size}") |
| 312 | |
| 313 | if reranker_top_k is not None: |
| 314 | rr_cfg = self.retrieval_pipeline.config.setdefault("reranker", {}) |
| 315 | rr_cfg["top_k"] = reranker_top_k |
| 316 | print(f"🔍 Reranker top K set to: {reranker_top_k}") |
| 317 |
no test coverage detected