(t *testing.T)
| 130 | } |
| 131 | |
| 132 | func TestBrokerFlowControl(t *testing.T) { |
| 133 | broker1, broker2 := setupBrokerPair() |
| 134 | |
| 135 | smallWindow := int64(10) |
| 136 | reader, meta := broker1.CreateStreamReader("reader1", "writer1", smallWindow) |
| 137 | writer, err := broker2.CreateStreamWriter(meta) |
| 138 | if err != nil { |
| 139 | t.Fatalf("CreateStreamWriter failed: %v", err) |
| 140 | } |
| 141 | |
| 142 | largeData := make([]byte, 100) |
| 143 | for i := range largeData { |
| 144 | largeData[i] = byte(i) |
| 145 | } |
| 146 | |
| 147 | writeDone := make(chan error) |
| 148 | go func() { |
| 149 | _, err := writer.Write(largeData) |
| 150 | writeDone <- err |
| 151 | }() |
| 152 | |
| 153 | received := make([]byte, 0, 100) |
| 154 | buf := make([]byte, 20) |
| 155 | for len(received) < len(largeData) { |
| 156 | n, err := reader.Read(buf) |
| 157 | if err != nil { |
| 158 | t.Fatalf("Read failed: %v", err) |
| 159 | } |
| 160 | received = append(received, buf[:n]...) |
| 161 | } |
| 162 | |
| 163 | select { |
| 164 | case err := <-writeDone: |
| 165 | if err != nil { |
| 166 | t.Fatalf("Write failed: %v", err) |
| 167 | } |
| 168 | case <-time.After(2 * time.Second): |
| 169 | t.Fatal("Write didn't complete in time") |
| 170 | } |
| 171 | |
| 172 | if !bytes.Equal(received, largeData) { |
| 173 | t.Fatal("Received data doesn't match sent data") |
| 174 | } |
| 175 | |
| 176 | writer.Close() |
| 177 | } |
| 178 | |
| 179 | func TestBrokerError(t *testing.T) { |
| 180 | broker1, broker2 := setupBrokerPair() |
nothing calls this directly
no test coverage detected