diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index dcaa0619f..cf5b7f493 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -163,13 +163,18 @@ func TestWatchReconnRequest(t *testing.T) { } func testWatchReconnRequest(t *testing.T, wctx *watchctx) { - // take down watcher connection - donec := make(chan struct{}) + donec, stopc := make(chan struct{}), make(chan struct{}, 1) go func() { + timer := time.After(2 * time.Second) + defer close(donec) + // take down watcher connection for { wctx.wclient.ActiveConnection().Close() select { - case <-donec: + case <-timer: + // spinning on close may live lock reconnection + return + case <-stopc: return default: } @@ -179,7 +184,11 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) { if wctx.ch = wctx.w.Watch(context.TODO(), "a", 0); wctx.ch == nil { t.Fatalf("expected non-nil channel") } - close(donec) + + // wait for disconnections to stop + stopc <- struct{}{} + <-donec + // ensure watcher works putAndWatch(t, wctx, "a", "a") } diff --git a/clientv3/watch.go b/clientv3/watch.go index 46960b4a8..cbc7df456 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -198,6 +198,12 @@ func (w *watcher) addStream(resp *pb.WatchResponse, pendingReq *watchRequest) { resumec: make(chan int64), } + if pendingReq.rev == 0 { + // note the header revision so that a put following a current watcher + // disconnect will arrive on the watcher channel after reconnect + ws.initReq.rev = resp.Header.Revision + } + w.mu.Lock() w.streams[ws.id] = ws w.mu.Unlock()