In streaming scenarios, each time new_text is received, it calls multi_format_parser.parse_streaming_increment and returns the resulting normal_text and calls to the upper layer (or SSE).
| 467 | |
| 468 | |
| 469 | class FunctionCallParser: |
| 470 | """ |
| 471 | In streaming scenarios, each time new_text is received, it calls multi_format_parser.parse_streaming_increment |
| 472 | and returns the resulting normal_text and calls to the upper layer (or SSE). |
| 473 | """ |
| 474 | |
| 475 | ToolCallParserEnum: Dict[str, BaseFormatDetector] = { |
| 476 | "llama3": Llama32Detector, |
| 477 | "qwen25": Qwen25Detector, |
| 478 | "mistral": MistralDetector, |
| 479 | } |
| 480 | |
| 481 | def __init__(self, tools: List[Function], tool_call_parser: str = None): |
| 482 | detectors = [] |
| 483 | if tool_call_parser: |
| 484 | detector_class = self.ToolCallParserEnum.get(tool_call_parser) |
| 485 | if detector_class: |
| 486 | detectors.append(detector_class()) |
| 487 | else: |
| 488 | raise ValueError(f"Unsupported tool_call_parser: {tool_call_parser}") |
| 489 | else: |
| 490 | raise ValueError("Tool Call Parser Not Given!") |
| 491 | |
| 492 | self.multi_format_parser = MultiFormatParser(detectors) |
| 493 | self.tools = tools |
| 494 | |
| 495 | def parse_non_stream(self, full_text: str): |
| 496 | """ |
| 497 | Non-streaming call: one-time parsing |
| 498 | """ |
| 499 | full_normal_text, calls = self.multi_format_parser.parse_once(full_text, self.tools) |
| 500 | return full_normal_text, calls |
| 501 | |
| 502 | def parse_stream_chunk(self, chunk_text: str): |
| 503 | """ |
| 504 | Streaming call: incremental parsing |
| 505 | """ |
| 506 | normal_text, calls = self.multi_format_parser.parse_streaming_increment(chunk_text, self.tools) |
| 507 | return normal_text, calls |
no outgoing calls
no test coverage detected