(
query,
knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage,
text_chunks_db: BaseKVStorage[TextChunkSchema],
query_param: QueryParam,
global_config: dict,
)
| 398 | |
| 399 | |
| 400 | async def local_query( |
| 401 | query, |
| 402 | knowledge_graph_inst: BaseGraphStorage, |
| 403 | entities_vdb: BaseVectorStorage, |
| 404 | relationships_vdb: BaseVectorStorage, |
| 405 | text_chunks_db: BaseKVStorage[TextChunkSchema], |
| 406 | query_param: QueryParam, |
| 407 | global_config: dict, |
| 408 | ) -> str: |
| 409 | context = None |
| 410 | use_model_func = global_config["llm_model_func"] |
| 411 | |
| 412 | kw_prompt_temp = PROMPTS["keywords_extraction"] |
| 413 | kw_prompt = kw_prompt_temp.format(query=query) |
| 414 | result = await use_model_func(kw_prompt) |
| 415 | json_text = locate_json_string_body_from_string(result) |
| 416 | |
| 417 | try: |
| 418 | keywords_data = json.loads(json_text) |
| 419 | keywords = keywords_data.get("low_level_keywords", []) |
| 420 | keywords = ", ".join(keywords) |
| 421 | except json.JSONDecodeError: |
| 422 | try: |
| 423 | result = ( |
| 424 | result.replace(kw_prompt[:-1], "") |
| 425 | .replace("user", "") |
| 426 | .replace("model", "") |
| 427 | .strip() |
| 428 | ) |
| 429 | result = "{" + result.split("{")[1].split("}")[0] + "}" |
| 430 | |
| 431 | keywords_data = json.loads(result) |
| 432 | keywords = keywords_data.get("low_level_keywords", []) |
| 433 | keywords = ", ".join(keywords) |
| 434 | # Handle parsing error |
| 435 | except json.JSONDecodeError as e: |
| 436 | print(f"JSON parsing error: {e}") |
| 437 | return PROMPTS["fail_response"] |
| 438 | if keywords: |
| 439 | context = await _build_local_query_context( |
| 440 | keywords, |
| 441 | knowledge_graph_inst, |
| 442 | entities_vdb, |
| 443 | text_chunks_db, |
| 444 | query_param, |
| 445 | ) |
| 446 | if query_param.only_need_context: |
| 447 | return context |
| 448 | if context is None: |
| 449 | return PROMPTS["fail_response"] |
| 450 | sys_prompt_temp = PROMPTS["rag_response"] |
| 451 | sys_prompt = sys_prompt_temp.format( |
| 452 | context_data=context, response_type=query_param.response_type |
| 453 | ) |
| 454 | response = await use_model_func( |
| 455 | query, |
| 456 | system_prompt=sys_prompt, |
| 457 | ) |
nothing calls this directly
no test coverage detected