TestV3NoEventsLostOnCompact verifies that slow watchers exit with compacted watch response if its next revision of events are compacted and no lost events sent to client.
(t *testing.T)
| 1536 | // TestV3NoEventsLostOnCompact verifies that slow watchers exit with compacted watch response |
| 1537 | // if its next revision of events are compacted and no lost events sent to client. |
| 1538 | func TestV3NoEventsLostOnCompact(t *testing.T) { |
| 1539 | if integration.ThroughProxy { |
| 1540 | t.Skip("grpc proxy currently does not support requesting progress notifications") |
| 1541 | } |
| 1542 | integration.BeforeTest(t) |
| 1543 | if len(gofail.List()) == 0 { |
| 1544 | t.Skip("please run 'make gofail-enable' before running the test") |
| 1545 | } |
| 1546 | clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) |
| 1547 | defer clus.Terminate(t) |
| 1548 | |
| 1549 | client := clus.RandClient() |
| 1550 | ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) |
| 1551 | defer cancel() |
| 1552 | |
| 1553 | // sendLoop throughput is rate-limited to 1 event per second |
| 1554 | require.NoError(t, gofail.Enable("beforeSendWatchResponse", `sleep("1s")`)) |
| 1555 | wch := client.Watch(ctx, "foo") |
| 1556 | |
| 1557 | var rev int64 |
| 1558 | writeCount := mvcc.ChanBufLen() * 11 / 10 |
| 1559 | for i := 0; i < writeCount; i++ { |
| 1560 | resp, err := client.Put(ctx, "foo", "bar") |
| 1561 | require.NoError(t, err) |
| 1562 | rev = resp.Header.Revision |
| 1563 | } |
| 1564 | _, err := client.Compact(ctx, rev) |
| 1565 | require.NoError(t, err) |
| 1566 | |
| 1567 | time.Sleep(time.Second) |
| 1568 | require.NoError(t, gofail.Disable("beforeSendWatchResponse")) |
| 1569 | |
| 1570 | eventCount := 0 |
| 1571 | compacted := false |
| 1572 | for resp := range wch { |
| 1573 | err = resp.Err() |
| 1574 | if err != nil { |
| 1575 | if !errors.Is(err, rpctypes.ErrCompacted) { |
| 1576 | t.Fatalf("want watch response err %v but got %v", rpctypes.ErrCompacted, err) |
| 1577 | } |
| 1578 | compacted = true |
| 1579 | break |
| 1580 | } |
| 1581 | eventCount += len(resp.Events) |
| 1582 | if eventCount == writeCount { |
| 1583 | break |
| 1584 | } |
| 1585 | } |
| 1586 | assert.Truef(t, compacted, "Expected stream to get compacted, instead we got %d events out of %d events", eventCount, writeCount) |
| 1587 | } |
nothing calls this directly
no test coverage detected
searching dependent graphs…