(t *testing.T)
| 199 | } |
| 200 | |
| 201 | func TestBrokerCancel(t *testing.T) { |
| 202 | broker1, broker2 := setupBrokerPair() |
| 203 | |
| 204 | reader, meta := broker1.CreateStreamReader("reader1", "writer1", 1024) |
| 205 | writer, err := broker2.CreateStreamWriter(meta) |
| 206 | if err != nil { |
| 207 | t.Fatalf("CreateStreamWriter failed: %v", err) |
| 208 | } |
| 209 | |
| 210 | reader.Close() |
| 211 | |
| 212 | select { |
| 213 | case <-writer.GetCanceledChan(): |
| 214 | // Success |
| 215 | case <-time.After(1 * time.Second): |
| 216 | t.Fatal("Writer not notified of cancellation") |
| 217 | } |
| 218 | |
| 219 | _, _, canceled := writer.GetAckState() |
| 220 | if !canceled { |
| 221 | t.Fatal("Writer should be in canceled state") |
| 222 | } |
| 223 | } |
| 224 | |
| 225 | func TestBrokerMultipleWrites(t *testing.T) { |
| 226 | broker1, broker2 := setupBrokerPair() |
nothing calls this directly
no test coverage detected