From af225e74337ed8a009d7471f92dfadffab86759d Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 17 Feb 2016 03:17:37 -0800 Subject: [PATCH] v3rpc: don't race on current watcher header revision --- etcdserver/api/v3rpc/watch.go | 11 +++++++++-- storage/watchable_store.go | 11 ++++++++++- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index fd16d44bc..8487cc46a 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -102,9 +102,16 @@ func (sws *serverWatchStream) recvLoop() error { toWatch = creq.Prefix prefix = true } - id := sws.watchStream.Watch(toWatch, prefix, creq.StartRevision) + + rev := creq.StartRevision + wsrev := sws.watchStream.Rev() + if rev == 0 { + // rev 0 watches past the current revision + rev = wsrev + 1 + } + id := sws.watchStream.Watch(toWatch, prefix, rev) sws.ctrlStream <- &pb.WatchResponse{ - Header: sws.newResponseHeader(sws.watchStream.Rev()), + Header: sws.newResponseHeader(wsrev), WatchId: int64(id), Created: true, } diff --git a/storage/watchable_store.go b/storage/watchable_store.go index d0a54a7d0..05f6fc6c3 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -227,7 +227,16 @@ func (s *watchableStore) watch(key []byte, prefix bool, startRev int64, id Watch ch: ch, } - if startRev == 0 { + s.store.mu.Lock() + synced := startRev > s.store.currentRev.main || startRev == 0 + if synced { + wa.cur = s.store.currentRev.main + 1 + } + s.store.mu.Unlock() + if synced { + if startRev > wa.cur { + panic("can't watch past sync revision") + } s.synced.add(wa) } else { slowWatcherGauge.Inc()