MCPcopy Index your code
hub / github.com/openai/openai-agents-python / FakeStreamingModel

Class FakeStreamingModel

tests/voice/test_workflow.py:32–98  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

30
31
32class FakeStreamingModel(Model):
33 def __init__(self):
34 self.turn_outputs: list[list[TResponseOutputItem]] = []
35
36 def set_next_output(self, output: list[TResponseOutputItem]):
37 self.turn_outputs.append(output)
38
39 def add_multiple_turn_outputs(self, outputs: list[list[TResponseOutputItem]]):
40 self.turn_outputs.extend(outputs)
41
42 def get_next_output(self) -> list[TResponseOutputItem]:
43 if not self.turn_outputs:
44 return []
45 return self.turn_outputs.pop(0)
46
47 async def get_response(
48 self,
49 system_instructions: str | None,
50 input: str | list[TResponseInputItem],
51 model_settings: ModelSettings,
52 tools: list[Tool],
53 output_schema: AgentOutputSchemaBase | None,
54 handoffs: list[Handoff],
55 tracing: ModelTracing,
56 *,
57 previous_response_id: str | None,
58 conversation_id: str | None,
59 prompt: Any | None,
60 ) -> ModelResponse:
61 raise NotImplementedError("Not implemented")
62
63 async def stream_response(
64 self,
65 system_instructions: str | None,
66 input: str | list[TResponseInputItem],
67 model_settings: ModelSettings,
68 tools: list[Tool],
69 output_schema: AgentOutputSchemaBase | None,
70 handoffs: list[Handoff],
71 tracing: ModelTracing,
72 *,
73 previous_response_id: str | None,
74 conversation_id: str | None,
75 prompt: Any | None,
76 ) -> AsyncIterator[TResponseStreamEvent]:
77 output = self.get_next_output()
78 for item in output:
79 if (
80 item.type == "message"
81 and len(item.content) == 1
82 and item.content[0].type == "output_text"
83 ):
84 yield ResponseTextDeltaEvent(
85 content_index=0,
86 delta=item.content[0].text,
87 type="response.output_text.delta",
88 output_index=0,
89 item_id=item.id,

Callers 1

Calls

no outgoing calls

Tested by 1