MCPcopy
hub / github.com/666ghj/BettaFish / stream_invoke

Method stream_invoke

QueryEngine/llms/base.py:86–131  ·  view source on GitHub ↗

流式调用LLM,逐步返回响应内容 Args: system_prompt: 系统提示词 user_prompt: 用户提示词 **kwargs: 额外参数(temperature, top_p等) Yields: 响应文本块(str)

(self, system_prompt: str, user_prompt: str, **kwargs)

Source from the content-addressed store, hash-verified

84 return ""
85
86 def stream_invoke(self, system_prompt: str, user_prompt: str, **kwargs) -> Generator[str, None, None]:
87 """
88 流式调用LLM,逐步返回响应内容
89
90 Args:
91 system_prompt: 系统提示词
92 user_prompt: 用户提示词
93 **kwargs: 额外参数(temperature, top_p等)
94
95 Yields:
96 响应文本块(str)
97 """
98 current_time = datetime.now().strftime("%Y年%m月%d日%H时%M分")
99 time_prefix = f"今天的实际时间是{current_time}"
100 if user_prompt:
101 user_prompt = f"{time_prefix}\n{user_prompt}"
102 else:
103 user_prompt = time_prefix
104 messages = [
105 {"role": "system", "content": system_prompt},
106 {"role": "user", "content": user_prompt},
107 ]
108
109 allowed_keys = {"temperature", "top_p", "presence_penalty", "frequency_penalty"}
110 extra_params = {key: value for key, value in kwargs.items() if key in allowed_keys and value is not None}
111 # 强制使用流式
112 extra_params["stream"] = True
113
114 timeout = kwargs.pop("timeout", self.timeout)
115
116 try:
117 stream = self.client.chat.completions.create(
118 model=self.model_name,
119 messages=messages,
120 timeout=timeout,
121 **extra_params,
122 )
123
124 for chunk in stream:
125 if chunk.choices and len(chunk.choices) > 0:
126 delta = chunk.choices[0].delta
127 if delta and delta.content:
128 yield delta.content
129 except Exception as e:
130 logger.error(f"流式请求失败: {str(e)}")
131 raise e
132
133 @with_retry(LLM_RETRY_CONFIG)
134 def stream_invoke_to_string(self, system_prompt: str, user_prompt: str, **kwargs) -> str:

Callers 1

Calls 3

popMethod · 0.80
createMethod · 0.80
errorMethod · 0.45

Tested by

no test coverage detected