diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 6a8f3cb5e..1b544ca41 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -349,6 +349,9 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) { // TestWatchResumeComapcted checks that the watcher gracefully closes in case // that it tries to resume to a revision that's been compacted out of the store. +// Since the watcher's server restarts with stale data, the watcher will receive +// either a compaction error or all keys by staying in sync before the compaction +// is finally applied. func TestWatchResumeCompacted(t *testing.T) { defer testutil.AfterTest(t) @@ -377,8 +380,9 @@ func TestWatchResumeCompacted(t *testing.T) { } // put some data and compact away + numPuts := 5 kv := clientv3.NewKV(clus.Client(1)) - for i := 0; i < 5; i++ { + for i := 0; i < numPuts; i++ { if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil { t.Fatal(err) } @@ -389,17 +393,48 @@ func TestWatchResumeCompacted(t *testing.T) { clus.Members[0].Restart(t) - // get compacted error message - wresp, ok := <-wch - if !ok { - t.Fatalf("expected wresp, but got closed channel") + // since watch's server isn't guaranteed to be synced with the cluster when + // the watch resumes, there is a window where the watch can stay synced and + // read off all events; if the watcher misses the window, it will go out of + // sync and get a compaction error. + wRev := int64(2) + for int(wRev) <= numPuts+1 { + var wresp clientv3.WatchResponse + var ok bool + select { + case wresp, ok = <-wch: + if !ok { + t.Fatalf("expected wresp, but got closed channel") + } + case <-time.After(5 * time.Second): + t.Fatalf("compacted watch timed out") + } + for _, ev := range wresp.Events { + if ev.Kv.ModRevision != wRev { + t.Fatalf("expected modRev %v, got %+v", wRev, ev) + } + wRev++ + } + if wresp.Err() == nil { + continue + } + if wresp.Err() != rpctypes.ErrCompacted { + t.Fatalf("wresp.Err() expected %v, but got %v %+v", rpctypes.ErrCompacted, wresp.Err()) + } + break } - if wresp.Err() != rpctypes.ErrCompacted { - t.Fatalf("wresp.Err() expected %v, but got %v", rpctypes.ErrCompacted, wresp.Err()) + if int(wRev) > numPuts+1 { + // got data faster than the compaction + return } - // ensure the channel is closed - if wresp, ok = <-wch; ok { - t.Fatalf("expected closed channel, but got %v", wresp) + // received compaction error; ensure the channel closes + select { + case wresp, ok := <-wch: + if ok { + t.Fatalf("expected closed channel, but got %v", wresp) + } + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for channel close") } }