clientv3: gracefully handle watcher resume on compacted revision

Fixes #5239
This commit is contained in:
Anthony Romano 2016-05-02 16:16:18 -07:00
parent fee71b18a3
commit 8b52fd0d2d
3 changed files with 60 additions and 1 deletions

View File

@ -334,6 +334,62 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
}
}
// TestWatchResumeComapcted checks that the watcher gracefully closes in case
// that it tries to resume to a revision that's been compacted out of the store.
func TestWatchResumeCompacted(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
// create a waiting watcher at rev 1
w := clientv3.NewWatcher(clus.Client(0))
defer w.Close()
wch := w.Watch(context.Background(), "foo", clientv3.WithRev(1))
select {
case w := <-wch:
t.Errorf("unexpected message from wch %v", w)
default:
}
clus.Members[0].Stop(t)
ticker := time.After(time.Second * 10)
for clus.WaitLeader(t) <= 0 {
select {
case <-ticker:
t.Fatalf("failed to wait for new leader")
default:
time.Sleep(10 * time.Millisecond)
}
}
// put some data and compact away
kv := clientv3.NewKV(clus.Client(1))
for i := 0; i < 5; i++ {
if _, err := kv.Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
}
}
if err := kv.Compact(context.TODO(), 3); err != nil {
t.Fatal(err)
}
clus.Members[0].Restart(t)
// get compacted error message
wresp, ok := <-wch
if !ok {
t.Fatalf("expected wresp, but got closed channel")
}
if wresp.Err() != rpctypes.ErrCompacted {
t.Fatalf("wresp.Err() expected %v, but got %v", rpctypes.ErrCompacted, wresp.Err())
}
// ensure the channel is closed
if wresp, ok = <-wch; ok {
t.Fatalf("expected closed channel, but got %v", wresp)
}
}
// TestWatchCompactRevision ensures the CompactRevision error is given on a
// compaction event ahead of a watcher.
func TestWatchCompactRevision(t *testing.T) {

View File

@ -440,7 +440,7 @@ func (w *watcher) serveStream(ws *watcherStream) {
return
}
// resume up to last seen event if disconnected
if resuming {
if resuming && wr.Err() == nil {
resuming = false
// trim events already seen
for i := 0; i < len(wr.Events); i++ {
@ -454,6 +454,7 @@ func (w *watcher) serveStream(ws *watcherStream) {
break
}
}
resuming = false
// TODO don't keep buffering if subscriber stops reading
wrs = append(wrs, wr)
case resumeRev := <-ws.resumec:

View File

@ -318,6 +318,8 @@ func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
return
}
func (c *cluster) WaitLeader(t *testing.T) int { return c.waitLeader(t, c.Members) }
func (c *cluster) waitLeader(t *testing.T, membs []*member) int {
possibleLead := make(map[uint64]bool)
var lead uint64