From f66162932c49b8ddd818ac2493f11198ec66f16e Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 18 Feb 2016 14:28:52 -0800 Subject: [PATCH] clientv3: fix current watcher reconnection If a current watcher didn't receive any events, a reconnect cycle would advance its revision to the store's current revision. Instead, reconnect using the watcher's creation header revision if the watcher hasn't received any events. Fixes #4502 --- clientv3/integration/watch_test.go | 17 +++++++++++++---- clientv3/watch.go | 6 ++++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index dcaa0619f..cf5b7f493 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -163,13 +163,18 @@ func TestWatchReconnRequest(t *testing.T) { } func testWatchReconnRequest(t *testing.T, wctx *watchctx) { - // take down watcher connection - donec := make(chan struct{}) + donec, stopc := make(chan struct{}), make(chan struct{}, 1) go func() { + timer := time.After(2 * time.Second) + defer close(donec) + // take down watcher connection for { wctx.wclient.ActiveConnection().Close() select { - case <-donec: + case <-timer: + // spinning on close may live lock reconnection + return + case <-stopc: return default: } @@ -179,7 +184,11 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) { if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil { t.Fatalf("expected non-nil channel") } - close(donec) + + // wait for disconnections to stop + stopc <- struct{}{} + <-donec + // ensure watcher works putAndWatch(t, wctx, "a", "a") } diff --git a/clientv3/watch.go b/clientv3/watch.go index 46960b4a8..cbc7df456 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -198,6 +198,12 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { resumec: make(chan int64), } + if pendingReq.rev == 0 { + // note the header revision so that a put following a current watcher + // disconnect will arrive on the watcher channel after reconnect + ws.initReq.rev = resp.Header.Revision + } + w.mu.Lock() w.streams[ws.id] = ws w.mu.Unlock()