diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 19a8ea70a..f6e2abfae 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -94,9 +94,12 @@ type serverWatchStream struct { // closec indicates the stream is closed. closec chan struct{} + + // wg waits for the send loop to complete + wg sync.WaitGroup } -func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { +func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { sws := serverWatchStream{ clusterID: ws.clusterID, memberID: ws.memberID, @@ -109,23 +112,30 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) error { closec: make(chan struct{}), } - go sws.sendLoop() - errc := make(chan error, 1) + sws.wg.Add(1) go func() { - errc <- sws.recvLoop() - sws.close() + sws.sendLoop() + sws.wg.Done() }() + + errc := make(chan error, 1) + // Ideally recvLoop would also use sws.wg to signal its completion + // but when stream.Context().Done() is closed, the stream's recv + // may continue to block since it uses a different context, leading to + // deadlock when calling sws.close(). + go func() { errc <- sws.recvLoop() }() + select { - case err := <-errc: - return err + case err = <-errc: case <-stream.Context().Done(): - err := stream.Context().Err() + err = stream.Context().Err() // the only server-side cancellation is noleader for now. if err == context.Canceled { - return rpctypes.ErrGRPCNoLeader + err = rpctypes.ErrGRPCNoLeader } - return err } + sws.close() + return err } func (sws *serverWatchStream) recvLoop() error { @@ -292,6 +302,7 @@ func (sws *serverWatchStream) close() { sws.watchStream.Close() close(sws.closec) close(sws.ctrlStream) + sws.wg.Wait() } func (sws *serverWatchStream) newResponseHeader(rev int64) *pb.ResponseHeader { diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 25c84ce99..8339500ec 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -190,19 +190,19 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c defer s.mu.Unlock() wa := &watcher{ - key: key, - end: end, - cur: startRev, - id: id, - ch: ch, + key: key, + end: end, + minRev: startRev, + id: id, + ch: ch, } s.store.mu.Lock() synced := startRev > s.store.currentRev.main || startRev == 0 if synced { - wa.cur = s.store.currentRev.main + 1 - if startRev > wa.cur { - wa.cur = startRev + wa.minRev = s.store.currentRev.main + 1 + if startRev > wa.minRev { + wa.minRev = startRev } } s.store.mu.Unlock() @@ -214,30 +214,47 @@ func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch c } watcherGauge.Inc() - cancel := cancelFunc(func() { + return wa, func() { s.cancelWatcher(wa) } +} + +// cancelWatcher removes references of the watcher from the watchableStore +func (s *watchableStore) cancelWatcher(wa *watcher) { + for { s.mu.Lock() - // remove references of the watcher + if s.unsynced.delete(wa) { slowWatcherGauge.Dec() - watcherGauge.Dec() + break } else if s.synced.delete(wa) { - watcherGauge.Dec() - } else { - for _, wb := range s.victims { - if wb[wa] != nil { - slowWatcherGauge.Dec() - watcherGauge.Dec() - delete(wb, wa) - break - } + break + } else if wa.compacted { + break + } + + if !wa.victim { + panic("watcher not victim but not in watch groups") + } + + var victimBatch watcherBatch + for _, wb := range s.victims { + if wb[wa] != nil { + victimBatch = wb + break } } + if victimBatch != nil { + slowWatcherGauge.Dec() + delete(victimBatch, wa) + break + } + + // victim being processed so not accessible; retry s.mu.Unlock() + time.Sleep(time.Millisecond) + } - // If we cannot find it, it should have finished watch. - }) - - return wa, cancel + watcherGauge.Dec() + s.mu.Unlock() } // syncWatchersLoop syncs the watcher in the unsynced map every 100ms. @@ -306,8 +323,10 @@ func (s *watchableStore) moveVictims() (moved int) { for _, wb := range victims { // try to send responses again for w, eb := range wb { + // watcher has observed the store up to, but not including, w.minRev + rev := w.minRev - 1 select { - case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: w.cur}: + case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}: pendingEventsGauge.Add(float64(len(eb.evs))) default: if newVictim == nil { @@ -328,10 +347,11 @@ func (s *watchableStore) moveVictims() (moved int) { // couldn't send watch response; stays victim continue } + w.victim = false if eb.moreRev != 0 { - w.cur = eb.moreRev + w.minRev = eb.moreRev } - if w.cur < curRev { + if w.minRev <= curRev { s.unsynced.add(w) } else { slowWatcherGauge.Dec() @@ -385,17 +405,20 @@ func (s *watchableStore) syncWatchers() { var victims watcherBatch wb := newWatcherBatch(wg, evs) for w := range wg.watchers { + w.minRev = curRev + 1 + eb, ok := wb[w] if !ok { // bring un-notified watcher to synced - w.cur = curRev s.synced.add(w) s.unsynced.delete(w) continue } - w.cur = curRev - isBlocked := false + if eb.moreRev != 0 { + w.minRev = eb.moreRev + } + select { case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}: pendingEventsGauge.Add(float64(len(eb.evs))) @@ -403,14 +426,14 @@ func (s *watchableStore) syncWatchers() { if victims == nil { victims = make(watcherBatch) } - isBlocked = true + w.victim = true } - if isBlocked { + if w.victim { victims[w] = eb } else { if eb.moreRev != 0 { - w.cur = eb.moreRev + // stay unsynced; more to read continue } s.synced.add(w) @@ -458,14 +481,15 @@ func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) { plog.Panicf("unexpected multiple revisions in notification") } select { - case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: s.Rev()}: + case w.ch <- WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}: pendingEventsGauge.Add(float64(len(eb.evs))) default: // move slow watcher to victims - w.cur = rev + w.minRev = rev + 1 if victim == nil { victim = make(watcherBatch) } + w.victim = true victim[w] = eb s.synced.delete(w) slowWatcherGauge.Inc() @@ -508,12 +532,15 @@ type watcher struct { // If end is set, the watcher is on a range. end []byte - // cur is the current watcher revision of a unsynced watcher. - // cur will be updated for unsynced watcher while it is catching up. - // cur is startRev of a synced watcher. - // cur will not be updated for synced watcher. - cur int64 - id WatchID + // victim is set when ch is blocked and undergoing victim processing + victim bool + + // compacted is set when the watcher is removed because of compaction + compacted bool + + // minRev is the minimum revision update the watcher will accept + minRev int64 + id WatchID // a chan to send out the watch response. // The chan might be shared with other watchers. diff --git a/mvcc/watchable_store_test.go b/mvcc/watchable_store_test.go index 9a8b1688f..42f69ee75 100644 --- a/mvcc/watchable_store_test.go +++ b/mvcc/watchable_store_test.go @@ -193,8 +193,8 @@ func TestSyncWatchers(t *testing.T) { } for w := range sws { - if w.cur != s.Rev() { - t.Errorf("w.cur = %d, want %d", w.cur, s.Rev()) + if w.minRev != s.Rev()+1 { + t.Errorf("w.minRev = %d, want %d", w.minRev, s.Rev()+1) } } diff --git a/mvcc/watcher_group.go b/mvcc/watcher_group.go index 3735cb2e5..7a4d7daa8 100644 --- a/mvcc/watcher_group.go +++ b/mvcc/watcher_group.go @@ -81,7 +81,7 @@ func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch { wb := make(watcherBatch) for _, ev := range evs { for w := range wg.watcherSetByKey(string(ev.Kv.Key)) { - if ev.Kv.ModRevision >= w.cur { + if ev.Kv.ModRevision >= w.minRev { // don't double notify wb.add(w, ev) } @@ -233,20 +233,21 @@ func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watc func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 { minRev := int64(math.MaxInt64) for w := range wg.watchers { - if w.cur > curRev { + if w.minRev > curRev { panic("watcher current revision should not exceed current revision") } - if w.cur < compactRev { + if w.minRev < compactRev { select { case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}: + w.compacted = true wg.delete(w) default: // retry next time } continue } - if minRev > w.cur { - minRev = w.cur + if minRev > w.minRev { + minRev = w.minRev } } return minRev