(t *testing.T, watchOnPut bool)
| 528 | func TestWatchWithProgressNotifyNoEvent(t *testing.T) { testWatchWithProgressNotify(t, false) } |
| 529 | |
| 530 | func testWatchWithProgressNotify(t *testing.T, watchOnPut bool) { |
| 531 | integration2.BeforeTest(t) |
| 532 | |
| 533 | // accelerate report interval so test terminates quickly |
| 534 | oldpi := v3rpc.GetProgressReportInterval() |
| 535 | // using atomics to avoid race warnings |
| 536 | v3rpc.SetProgressReportInterval(3 * time.Second) |
| 537 | pi := 3 * time.Second |
| 538 | defer func() { v3rpc.SetProgressReportInterval(oldpi) }() |
| 539 | |
| 540 | clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3}) |
| 541 | defer clus.Terminate(t) |
| 542 | |
| 543 | wc := clus.RandClient() |
| 544 | |
| 545 | opts := []clientv3.OpOption{clientv3.WithProgressNotify()} |
| 546 | if watchOnPut { |
| 547 | opts = append(opts, clientv3.WithPrefix()) |
| 548 | } |
| 549 | rch := wc.Watch(context.Background(), "foo", opts...) |
| 550 | |
| 551 | select { |
| 552 | case resp := <-rch: // wait for notification |
| 553 | if len(resp.Events) != 0 { |
| 554 | t.Fatalf("resp.Events expected none, got %+v", resp.Events) |
| 555 | } |
| 556 | case <-time.After(2 * pi): |
| 557 | t.Fatalf("watch response expected in %v, but timed out", pi) |
| 558 | } |
| 559 | |
| 560 | kvc := clus.RandClient() |
| 561 | _, err := kvc.Put(context.TODO(), "foox", "bar") |
| 562 | require.NoError(t, err) |
| 563 | |
| 564 | select { |
| 565 | case resp := <-rch: |
| 566 | if resp.Header.Revision != 2 { |
| 567 | t.Fatalf("resp.Header.Revision expected 2, got %d", resp.Header.Revision) |
| 568 | } |
| 569 | if watchOnPut { // wait for put if watch on the put key |
| 570 | ev := []*clientv3.Event{{ |
| 571 | Type: clientv3.EventTypePut, |
| 572 | Kv: &mvccpb.KeyValue{Key: []byte("foox"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, |
| 573 | }} |
| 574 | if !reflect.DeepEqual(ev, resp.Events) { |
| 575 | t.Fatalf("expected %+v, got %+v", ev, resp.Events) |
| 576 | } |
| 577 | } else if len(resp.Events) != 0 { // wait for notification otherwise |
| 578 | t.Fatalf("expected no events, but got %+v", resp.Events) |
| 579 | } |
| 580 | case <-time.After(time.Duration(1.5 * float64(pi))): |
| 581 | t.Fatalf("watch response expected in %v, but timed out", pi) |
| 582 | } |
| 583 | } |
| 584 | |
| 585 | func TestConfigurableWatchProgressNotifyInterval(t *testing.T) { |
| 586 | integration2.BeforeTest(t) |
no test coverage detected
searching dependent graphs…