MCPcopy
hub / github.com/wavetermdev/waveterm / TestBrokerFlowControl

Function TestBrokerFlowControl

pkg/streamclient/streambroker_test.go:132–177  ·  view source on GitHub ↗
(t *testing.T)

Source from the content-addressed store, hash-verified

130}
131
132func TestBrokerFlowControl(t *testing.T) {
133 broker1, broker2 := setupBrokerPair()
134
135 smallWindow := int64(10)
136 reader, meta := broker1.CreateStreamReader("reader1", "writer1", smallWindow)
137 writer, err := broker2.CreateStreamWriter(meta)
138 if err != nil {
139 t.Fatalf("CreateStreamWriter failed: %v", err)
140 }
141
142 largeData := make([]byte, 100)
143 for i := range largeData {
144 largeData[i] = byte(i)
145 }
146
147 writeDone := make(chan error)
148 go func() {
149 _, err := writer.Write(largeData)
150 writeDone <- err
151 }()
152
153 received := make([]byte, 0, 100)
154 buf := make([]byte, 20)
155 for len(received) < len(largeData) {
156 n, err := reader.Read(buf)
157 if err != nil {
158 t.Fatalf("Read failed: %v", err)
159 }
160 received = append(received, buf[:n]...)
161 }
162
163 select {
164 case err := <-writeDone:
165 if err != nil {
166 t.Fatalf("Write failed: %v", err)
167 }
168 case <-time.After(2 * time.Second):
169 t.Fatal("Write didn't complete in time")
170 }
171
172 if !bytes.Equal(received, largeData) {
173 t.Fatal("Received data doesn't match sent data")
174 }
175
176 writer.Close()
177}
178
179func TestBrokerError(t *testing.T) {
180 broker1, broker2 := setupBrokerPair()

Callers

nothing calls this directly

Calls 6

setupBrokerPairFunction · 0.85
CreateStreamReaderMethod · 0.80
CreateStreamWriterMethod · 0.80
WriteMethod · 0.65
CloseMethod · 0.65
ReadMethod · 0.45

Tested by

no test coverage detected