mvcc: avoid negative watcher count metrics (#11882)

The watch count metrics are not robust to duplicate cancellations. These
cause the count to be decremented twice, leading eventually to negative
counts. We are seeing this in production. The duplicate cancellations
themselves are not themselves a big problem (except performance), but
they are caused by the new proactive cancellation logic (#11850) which
cancels proactively even immediately before initiating a Close, thus
nearly guaranteeing a Close-cancel race, as discussed in
watchable_store.go. We can avoid this in most cases by not sending a
cancellation when we are going to Close.
This commit is contained in:
Jack Kleeman 2020-05-13 00:50:53 +01:00 committed by GitHub
parent ab494956bf
commit ec13797407
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 7 deletions

View File

@ -650,6 +650,12 @@ func (w *watchGrpcStream) run() {
return
case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
// no more watchers on this stream, shutdown, skip cancellation
if len(w.substreams)+len(w.resuming) == 0 {
return
}
if ws.id != -1 {
// client is closing an established watch; close it on the server proactively instead of waiting
// to close when the next message arrives
@ -665,12 +671,6 @@ func (w *watchGrpcStream) run() {
lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
}
}
w.closeSubstream(ws)
delete(closing, ws)
// no more watchers on this stream, shutdown
if len(w.substreams)+len(w.resuming) == 0 {
return
}
}
}
}

View File

@ -1245,3 +1245,32 @@ func TestV3WatchCancellation(t *testing.T) {
t.Fatalf("expected one watch, got %s", minWatches)
}
}
// TestV3WatchCloseCancelRace ensures that watch close doesn't decrement the watcher total too far.
func TestV3WatchCloseCancelRace(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
cli := clus.RandClient()
for i := 0; i < 1000; i++ {
ctx, cancel := context.WithCancel(ctx)
cli.Watch(ctx, "/foo")
cancel()
}
// Wait a little for cancellations to take hold
time.Sleep(3 * time.Second)
minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
if err != nil {
t.Fatal(err)
}
if minWatches != "0" {
t.Fatalf("expected zero watches, got %s", minWatches)
}
}

View File

@ -153,10 +153,13 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
s.mu.Lock()
if s.unsynced.delete(wa) {
slowWatcherGauge.Dec()
watcherGauge.Dec()
break
} else if s.synced.delete(wa) {
watcherGauge.Dec()
break
} else if wa.compacted {
watcherGauge.Dec()
break
} else if wa.ch == nil {
// already canceled (e.g., cancel/close race)
@ -177,6 +180,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
}
if victimBatch != nil {
slowWatcherGauge.Dec()
watcherGauge.Dec()
delete(victimBatch, wa)
break
}
@ -186,7 +190,6 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
time.Sleep(time.Millisecond)
}
watcherGauge.Dec()
wa.ch = nil
s.mu.Unlock()
}