(client,
messages: list[dict[str, str]],
model: str = "gpt-4-0125-preview",
max_tokens=2000,
temperature=0.7,
tools=None,
stream=False,)
| 1 | def get_completion(client, |
| 2 | messages: list[dict[str, str]], |
| 3 | model: str = "gpt-4-0125-preview", |
| 4 | max_tokens=2000, |
| 5 | temperature=0.7, |
| 6 | tools=None, |
| 7 | stream=False,): |
| 8 | |
| 9 | # Prepare the request parameters |
| 10 | request_params = { |
| 11 | "model": model, |
| 12 | "messages": messages, |
| 13 | "max_tokens": max_tokens, |
| 14 | "temperature": temperature, |
| 15 | "stream": stream, |
| 16 | } |
| 17 | |
| 18 | if tools and isinstance(tools, list): |
| 19 | request_params["tools"] = tools # Tools are already in dictionary format |
| 20 | |
| 21 | # Make the API call with the possibility of streaming |
| 22 | if stream: |
| 23 | completion = client.chat.completions.create(**request_params) |
| 24 | # create variables to collect the stream of chunks |
| 25 | collected_chunks = [] |
| 26 | collected_messages = [] |
| 27 | for chunk in completion: |
| 28 | collected_chunks.append(chunk) # save the event response |
| 29 | chunk_message = chunk.choices[0].delta.content # extract the message |
| 30 | collected_messages.append(chunk_message) # save the message |
| 31 | print(chunk_message, end="") # print the message |
| 32 | # yield chunk_message # Yield each part of the completion as it arrives |
| 33 | return collected_messages # Returns the whole completion |
| 34 | else: |
| 35 | completion = client.chat.completions.create(**request_params) |
| 36 | return completion.choices[0].message # Returns the whole completion |
| 37 | |
| 38 | |
| 39 | def is_dict_empty(d): |
no outgoing calls
no test coverage detected
searching dependent graphs…