From c43831063411d923566f1c607933c1d721c1f5f2 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 27 May 2016 00:57:20 -0700 Subject: [PATCH 1/2] v3rpc: make watcher wait for its send goroutine to finish --- etcdserver/api/v3rpc/watch.go | 31 +++++++++++++++++++++---------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 19a8ea70a..f6e2abfae 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -94,9 +94,12 @@ type serverWatchStream struct { // closec indicates the stream is closed. closec chan struct{} + + // wg waits for the send loop to complete + wg sync.WaitGroup } -func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { +func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { sws := serverWatchStream{ clusterID: ws.clusterID, memberID: ws.memberID, @@ -109,23 +112,30 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { closec: make(chan struct{}), } - go sws.sendLoop() - errc := make(chan error, 1) + sws.wg.Add(1) go func() { - errc <- sws.recvLoop() - sws.close() + sws.sendLoop() + sws.wg.Done() }() + + errc := make(chan error, 1) + // Ideally recvLoop would also use sws.wg to signal its completion + // but when stream.Context().Done() is closed, the stream's recv + // may continue to block since it uses a different context, leading to + // deadlock when calling sws.close(). + go func() { errc <- sws.recvLoop() }() + select { - case err := <-errc: - return err + case err = <-errc: case <-stream.Context().Done(): - err := stream.Context().Err() + err = stream.Context().Err() // the only server-side cancellation is noleader for now. if err == context.Canceled { - return rpctypes.ErrGRPCNoLeader + err = rpctypes.ErrGRPCNoLeader } - return err } + sws.close() + return err } func (sws *serverWatchStream) recvLoop() error { @@ -292,6 +302,7 @@ func (sws *serverWatchStream) close() { sws.watchStream.Close() close(sws.closec) close(sws.ctrlStream) + sws.wg.Wait() } func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader { From cfb3f96c2b24a557e234ae96c764752e8d66789c Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 26 May 2016 16:10:58 -0600 Subject: [PATCH 2/2] mvcc: tighten up watcher cancelation and revision handling Makes w.cur into w.minrev, the minimum revision for the next update, and retries cancelation if the watcher isn't found (because it's being processed by moveVictims). Fixes: #5459 --- mvcc/watchable_store.go | 109 ++++++++++++++++++++++------------- mvcc/watchable_store_test.go | 4 +- mvcc/watcher_group.go | 11 ++-- 3 files changed, 76 insertions(+), 48 deletions(-) diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 25c84ce99..8339500ec 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -190,19 +190,19 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c defer s.mu.Unlock() wa := &watcher{ - key: key, - end: end, - cur: startRev, - id: id, - ch: ch, + key: key, + end: end, + minRev: startRev, + id: id, + ch: ch, } s.store.mu.Lock() synced := startRev > s.store.currentRev.main || startRev == 0 if synced { - wa.cur = s.store.currentRev.main + 1 - if startRev > wa.cur { - wa.cur = startRev + wa.minRev = s.store.currentRev.main + 1 + if startRev > wa.minRev { + wa.minRev = startRev } } s.store.mu.Unlock() @@ -214,30 +214,47 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c } watcherGauge.Inc() - cancel := cancelFunc(func() { + return wa, func() { s.cancelWatcher(wa) } +} + +// cancelWatcher removes references of the watcher from the watchableStore +func (s *watchableStore) cancelWatcher(wa *watcher) { + for { s.mu.Lock() - // remove references of the watcher + if s.unsynced.delete(wa) { slowWatcherGauge.Dec() - watcherGauge.Dec() + break } else if s.synced.delete(wa) { - watcherGauge.Dec() - } else { - for _, wb := range s.victims { - if wb[wa] != nil { - slowWatcherGauge.Dec() - watcherGauge.Dec() - delete(wb, wa) - break - } + break + } else if wa.compacted { + break + } + + if !wa.victim { + panic("watcher not victim but not in watch groups") + } + + var victimBatch watcherBatch + for _, wb := range s.victims { + if wb[wa] != nil { + victimBatch = wb + break } } + if victimBatch != nil { + slowWatcherGauge.Dec() + delete(victimBatch, wa) + break + } + + // victim being processed so not accessible; retry s.mu.Unlock() + time.Sleep(time.Millisecond) + } - // If we cannot find it, it should have finished watch. - }) - - return wa, cancel + watcherGauge.Dec() + s.mu.Unlock() } // syncWatchersLoop syncs the watcher in the unsynced map every 100ms. @@ -306,8 +323,10 @@ func (s *watchableStore) moveVictims() (moved int) { for _, wb := range victims { // try to send responses again for w, eb := range wb { + // watcher has observed the store up to, but not including, w.minRev + rev := w.minRev - 1 select { - case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: w.cur}: + case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}: pendingEventsGauge.Add(float64(len(eb.evs))) default: if newVictim == nil { @@ -328,10 +347,11 @@ func (s *watchableStore) moveVictims() (moved int) { // couldn't send watch response; stays victim continue } + w.victim = false if eb.moreRev != 0 { - w.cur = eb.moreRev + w.minRev = eb.moreRev } - if w.cur < curRev { + if w.minRev <= curRev { s.unsynced.add(w) } else { slowWatcherGauge.Dec() @@ -385,17 +405,20 @@ func (s *watchableStore) syncWatchers() { var victims watcherBatch wb := newWatcherBatch(wg, evs) for w := range wg.watchers { + w.minRev = curRev + 1 + eb, ok := wb[w] if !ok { // bring un-notified watcher to synced - w.cur = curRev s.synced.add(w) s.unsynced.delete(w) continue } - w.cur = curRev - isBlocked := false + if eb.moreRev != 0 { + w.minRev = eb.moreRev + } + select { case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}: pendingEventsGauge.Add(float64(len(eb.evs))) @@ -403,14 +426,14 @@ func (s *watchableStore) syncWatchers() { if victims == nil { victims = make(watcherBatch) } - isBlocked = true + w.victim = true } - if isBlocked { + if w.victim { victims[w] = eb } else { if eb.moreRev != 0 { - w.cur = eb.moreRev + // stay unsynced; more to read continue } s.synced.add(w) @@ -458,14 +481,15 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { plog.Panicf("unexpected multiple revisions in notification") } select { - case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}: + case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}: pendingEventsGauge.Add(float64(len(eb.evs))) default: // move slow watcher to victims - w.cur = rev + w.minRev = rev + 1 if victim == nil { victim = make(watcherBatch) } + w.victim = true victim[w] = eb s.synced.delete(w) slowWatcherGauge.Inc() @@ -508,12 +532,15 @@ type watcher struct { // If end is set, the watcher is on a range. end []byte - // cur is the current watcher revision of a unsynced watcher. - // cur will be updated for unsynced watcher while it is catching up. - // cur is startRev of a synced watcher. - // cur will not be updated for synced watcher. - cur int64 - id WatchID + // victim is set when ch is blocked and undergoing victim processing + victim bool + + // compacted is set when the watcher is removed because of compaction + compacted bool + + // minRev is the minimum revision update the watcher will accept + minRev int64 + id WatchID // a chan to send out the watch response. // The chan might be shared with other watchers. diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 9a8b1688f..42f69ee75 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -193,8 +193,8 @@ func TestSyncWatchers(t *testing.T) { } for w := range sws { - if w.cur != s.Rev() { - t.Errorf("w.cur = %d, want %d", w.cur, s.Rev()) + if w.minRev != s.Rev()+1 { + t.Errorf("w.minRev = %d, want %d", w.minRev, s.Rev()+1) } } diff --git a/mvcc/watcher_group.go b/mvcc/watcher_group.go index 3735cb2e5..7a4d7daa8 100644 --- a/mvcc/watcher_group.go +++ b/mvcc/watcher_group.go @@ -81,7 +81,7 @@ func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch { wb := make(watcherBatch) for _, ev := range evs { for w := range wg.watcherSetByKey(string(ev.Kv.Key)) { - if ev.Kv.ModRevision >= w.cur { + if ev.Kv.ModRevision >= w.minRev { // don't double notify wb.add(w, ev) } @@ -233,20 +233,21 @@ func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watc func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 { minRev := int64(math.MaxInt64) for w := range wg.watchers { - if w.cur > curRev { + if w.minRev > curRev { panic("watcher current revision should not exceed current revision") } - if w.cur < compactRev { + if w.minRev < compactRev { select { case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}: + w.compacted = true wg.delete(w) default: // retry next time } continue } - if minRev > w.cur { - minRev = w.cur + if minRev > w.minRev { + minRev = w.minRev } } return minRev