From e129223dbebaa5a13b9564dea27fdcd5bbebb317 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Sun, 27 Mar 2016 02:27:22 -0700 Subject: [PATCH] clientv3: fix race in watcher resume --- clientv3/watch.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 17b1bc9d1..d9edd2b8c 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -436,11 +436,15 @@ func (w *watcher) serveStream(ws *watcherStream) { // TODO don't keep buffering if subscriber stops reading wrs = append(wrs, wr) case resumeRev := <-ws.resumec: + wrs = nil + resuming = true + if resumeRev == -1 { + // pause serving stream while resume gets set up + break + } if resumeRev != ws.lastRev { panic("unexpected resume revision") } - wrs = nil - resuming = true case <-w.donec: closing = true case <-ws.initReq.ctx.Done(): @@ -502,6 +506,9 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error { w.mu.RUnlock() for _, ws := range streams { + // pause serveStream + ws.resumec <- -1 + // reconstruct watcher from initial request if ws.lastRev != 0 { ws.initReq.rev = ws.lastRev @@ -525,6 +532,7 @@ func (w *watcher) resumeWatchers(wc pb.Watch_WatchClient) error { w.streams[ws.id] = ws w.mu.Unlock() + // unpause serveStream ws.resumec <- ws.lastRev } return nil