From 8b52fd0d2dc7be3e68270d051cb921ab21e89fb3 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 2 May 2016 16:16:18 -0700 Subject: [PATCH] clientv3: gracefully handle watcher resume on compacted revision Fixes #5239 --- clientv3/integration/watch_test.go | 56 ++++++++++++++++++++++++++++++ clientv3/watch.go | 3 +- integration/cluster.go | 2 ++ 3 files changed, 60 insertions(+), 1 deletion(-) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 904a7839d..bc2d3c8f3 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -334,6 +334,62 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) { } } +// TestWatchResumeComapcted checks that the watcher gracefully closes in case +// that it tries to resume to a revision that's been compacted out of the store. +func TestWatchResumeCompacted(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // create a waiting watcher at rev 1 + w := clientv3.NewWatcher(clus.Client(0)) + defer w.Close() + wch := w.Watch(context.Background(), "foo", clientv3.WithRev(1)) + select { + case w := <-wch: + t.Errorf("unexpected message from wch %v", w) + default: + } + clus.Members[0].Stop(t) + + ticker := time.After(time.Second * 10) + for clus.WaitLeader(t) <= 0 { + select { + case <-ticker: + t.Fatalf("failed to wait for new leader") + default: + time.Sleep(10 * time.Millisecond) + } + } + + // put some data and compact away + kv := clientv3.NewKV(clus.Client(1)) + for i := 0; i < 5; i++ { + if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil { + t.Fatal(err) + } + } + if err := kv.Compact(context.TODO(), 3); err != nil { + t.Fatal(err) + } + + clus.Members[0].Restart(t) + + // get compacted error message + wresp, ok := <-wch + if !ok { + t.Fatalf("expected wresp, but got closed channel") + } + if wresp.Err() != rpctypes.ErrCompacted { + t.Fatalf("wresp.Err() expected %v, but got %v", rpctypes.ErrCompacted, wresp.Err()) + } + // ensure the channel is closed + if wresp, ok = <-wch; ok { + t.Fatalf("expected closed channel, but got %v", wresp) + } +} + // TestWatchCompactRevision ensures the CompactRevision error is given on a // compaction event ahead of a watcher. func TestWatchCompactRevision(t *testing.T) { diff --git a/clientv3/watch.go b/clientv3/watch.go index 34636f153..e2cf99146 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -440,7 +440,7 @@ func (w *watcher) serveStream(ws *watcherStream) { return } // resume up to last seen event if disconnected - if resuming { + if resuming && wr.Err() == nil { resuming = false // trim events already seen for i := 0; i < len(wr.Events); i++ { @@ -454,6 +454,7 @@ func (w *watcher) serveStream(ws *watcherStream) { break } } + resuming = false // TODO don't keep buffering if subscriber stops reading wrs = append(wrs, wr) case resumeRev := <-ws.resumec: diff --git a/integration/cluster.go b/integration/cluster.go index 32b60c907..4f1e65f8f 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -318,6 +318,8 @@ func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) { return } +func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) } + func (c *cluster) waitLeader(t *testing.T, membs []*member) int { possibleLead := make(map[uint64]bool) var lead uint64