mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17612 from chaochn47/release-3.5-backport-fix-watch-event-loss-after-compact
[release-3.5] backport fix watch event loss after compaction
This commit is contained in:
commit
2ee038d67f
@ -366,6 +366,11 @@ func (s *watchableStore) syncWatchers() int {
|
||||
var victims watcherBatch
|
||||
wb := newWatcherBatch(wg, evs)
|
||||
for w := range wg.watchers {
|
||||
if w.minRev < compactionRev {
|
||||
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
|
||||
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
|
||||
continue
|
||||
}
|
||||
w.minRev = curRev + 1
|
||||
|
||||
eb, ok := wb[w]
|
||||
|
@ -23,11 +23,14 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||
"go.etcd.io/etcd/server/v3/lease"
|
||||
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestWatch(t *testing.T) {
|
||||
@ -259,6 +262,62 @@ func TestWatchCompacted(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchNoEventLossOnCompact(t *testing.T) {
|
||||
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
|
||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||
lg := zaptest.NewLogger(t)
|
||||
s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
||||
defer func() {
|
||||
cleanup(s, b, tmpPath)
|
||||
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
|
||||
}()
|
||||
|
||||
chanBufLen, maxWatchersPerSync = 1, 4
|
||||
testKey, testValue := []byte("foo"), []byte("bar")
|
||||
|
||||
maxRev := 10
|
||||
compactRev := int64(5)
|
||||
for i := 0; i < maxRev; i++ {
|
||||
s.Put(testKey, testValue, lease.NoLease)
|
||||
}
|
||||
_, err := s.Compact(traceutil.TODO(), compactRev)
|
||||
require.NoErrorf(t, err, "failed to compact kv (%v)", err)
|
||||
|
||||
w := s.NewWatchStream()
|
||||
defer w.Close()
|
||||
|
||||
watchers := map[WatchID]int64{
|
||||
0: 1,
|
||||
1: 1, // create unsyncd watchers with startRev < compactRev
|
||||
2: 6, // create unsyncd watchers with compactRev < startRev < currentRev
|
||||
}
|
||||
for id, startRev := range watchers {
|
||||
_, err := w.Watch(id, testKey, nil, startRev)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
// fill up w.Chan() with 1 buf via 2 compacted watch response
|
||||
s.syncWatchers()
|
||||
|
||||
for len(watchers) > 0 {
|
||||
resp := <-w.Chan()
|
||||
if resp.CompactRevision != 0 {
|
||||
require.Equal(t, resp.CompactRevision, compactRev)
|
||||
require.Contains(t, watchers, resp.WatchID)
|
||||
delete(watchers, resp.WatchID)
|
||||
continue
|
||||
}
|
||||
nextRev := watchers[resp.WatchID]
|
||||
for _, ev := range resp.Events {
|
||||
require.Equalf(t, nextRev, ev.Kv.ModRevision, "got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID)
|
||||
nextRev++
|
||||
}
|
||||
if nextRev == s.rev()+1 {
|
||||
delete(watchers, resp.WatchID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchFutureRev(t *testing.T) {
|
||||
b, tmpPath := betesting.NewDefaultTmpBackend(t)
|
||||
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
|
||||
|
Loading…
x
Reference in New Issue
Block a user