mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Fix event loss after compaction
Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
parent
82925537e0
commit
405862e807
@ -449,6 +449,7 @@ func (sws *serverWatchStream) sendLoop() {
|
|||||||
sws.mu.RUnlock()
|
sws.mu.RUnlock()
|
||||||
|
|
||||||
var serr error
|
var serr error
|
||||||
|
// gofail: var beforeSendWatchResponse struct{}
|
||||||
if !fragmented && !ok {
|
if !fragmented && !ok {
|
||||||
serr = sws.gRPCStream.Send(wr)
|
serr = sws.gRPCStream.Send(wr)
|
||||||
} else {
|
} else {
|
||||||
|
@ -39,6 +39,8 @@ var (
|
|||||||
maxWatchersPerSync = 512
|
maxWatchersPerSync = 512
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func ChanBufLen() int { return chanBufLen }
|
||||||
|
|
||||||
type watchable interface {
|
type watchable interface {
|
||||||
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
|
watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
|
||||||
progress(w *watcher)
|
progress(w *watcher)
|
||||||
@ -370,6 +372,11 @@ func (s *watchableStore) syncWatchers() int {
|
|||||||
victims := make(watcherBatch)
|
victims := make(watcherBatch)
|
||||||
wb := newWatcherBatch(wg, evs)
|
wb := newWatcherBatch(wg, evs)
|
||||||
for w := range wg.watchers {
|
for w := range wg.watchers {
|
||||||
|
if w.minRev < compactionRev {
|
||||||
|
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
|
||||||
|
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
|
||||||
|
continue
|
||||||
|
}
|
||||||
w.minRev = curRev + 1
|
w.minRev = curRev + 1
|
||||||
|
|
||||||
eb, ok := wb[w]
|
eb, ok := wb[w]
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
|
|
||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
@ -250,6 +251,63 @@ func TestWatchCompacted(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWatchNoEventLossOnCompact(t *testing.T) {
|
||||||
|
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
|
||||||
|
|
||||||
|
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||||
|
lg := zaptest.NewLogger(t)
|
||||||
|
s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{})
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
cleanup(s, b)
|
||||||
|
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
|
||||||
|
}()
|
||||||
|
|
||||||
|
chanBufLen, maxWatchersPerSync = 1, 4
|
||||||
|
testKey, testValue := []byte("foo"), []byte("bar")
|
||||||
|
|
||||||
|
maxRev := 10
|
||||||
|
compactRev := int64(5)
|
||||||
|
for i := 0; i < maxRev; i++ {
|
||||||
|
s.Put(testKey, testValue, lease.NoLease)
|
||||||
|
}
|
||||||
|
_, err := s.Compact(traceutil.TODO(), compactRev)
|
||||||
|
require.NoErrorf(t, err, "failed to compact kv (%v)", err)
|
||||||
|
|
||||||
|
w := s.NewWatchStream()
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
watchers := map[WatchID]int64{
|
||||||
|
0: 1,
|
||||||
|
1: 1, // create unsyncd watchers with startRev < compactRev
|
||||||
|
2: 6, // create unsyncd watchers with compactRev < startRev < currentRev
|
||||||
|
}
|
||||||
|
for id, startRev := range watchers {
|
||||||
|
_, err := w.Watch(id, testKey, nil, startRev)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
// fill up w.Chan() with 1 buf via 2 compacted watch response
|
||||||
|
s.syncWatchers()
|
||||||
|
|
||||||
|
for len(watchers) > 0 {
|
||||||
|
resp := <-w.Chan()
|
||||||
|
if resp.CompactRevision != 0 {
|
||||||
|
require.Equal(t, resp.CompactRevision, compactRev)
|
||||||
|
require.Contains(t, watchers, resp.WatchID)
|
||||||
|
delete(watchers, resp.WatchID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
nextRev := watchers[resp.WatchID]
|
||||||
|
for _, ev := range resp.Events {
|
||||||
|
require.Equalf(t, nextRev, ev.Kv.ModRevision, "got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID)
|
||||||
|
nextRev++
|
||||||
|
}
|
||||||
|
if nextRev == s.rev()+1 {
|
||||||
|
delete(watchers, resp.WatchID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestWatchFutureRev(t *testing.T) {
|
func TestWatchFutureRev(t *testing.T) {
|
||||||
b, _ := betesting.NewDefaultTmpBackend(t)
|
b, _ := betesting.NewDefaultTmpBackend(t)
|
||||||
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
|
||||||
|
@ -17,6 +17,7 @@ package integration
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
@ -24,13 +25,17 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
"go.etcd.io/etcd/api/v3/mvccpb"
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
||||||
|
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||||
clientv3 "go.etcd.io/etcd/client/v3"
|
clientv3 "go.etcd.io/etcd/client/v3"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/v3rpc"
|
||||||
|
"go.etcd.io/etcd/server/v3/storage/mvcc"
|
||||||
"go.etcd.io/etcd/tests/v3/framework/integration"
|
"go.etcd.io/etcd/tests/v3/framework/integration"
|
||||||
|
gofail "go.etcd.io/gofail/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestV3WatchFromCurrentRevision tests Watch APIs from current revision.
|
// TestV3WatchFromCurrentRevision tests Watch APIs from current revision.
|
||||||
@ -1512,3 +1517,56 @@ func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
|
|||||||
}
|
}
|
||||||
require.True(t, gotProgressNotification, "Expected to get progress notification")
|
require.True(t, gotProgressNotification, "Expected to get progress notification")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestV3NoEventsLostOnCompact verifies that slow watchers exit with compacted watch response
|
||||||
|
// if its next revision of events are compacted and no lost events sent to client.
|
||||||
|
func TestV3NoEventsLostOnCompact(t *testing.T) {
|
||||||
|
if integration.ThroughProxy {
|
||||||
|
t.Skip("grpc proxy currently does not support requesting progress notifications")
|
||||||
|
}
|
||||||
|
integration.BeforeTest(t)
|
||||||
|
if len(gofail.List()) == 0 {
|
||||||
|
t.Skip("please run 'make gofail-enable' before running the test")
|
||||||
|
}
|
||||||
|
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
|
||||||
|
defer clus.Terminate(t)
|
||||||
|
|
||||||
|
client := clus.RandClient()
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// sendLoop throughput is rate-limited to 1 event per second
|
||||||
|
require.NoError(t, gofail.Enable("beforeSendWatchResponse", `sleep("1s")`))
|
||||||
|
wch := client.Watch(ctx, "foo")
|
||||||
|
|
||||||
|
var rev int64
|
||||||
|
writeCount := mvcc.ChanBufLen() * 11 / 10
|
||||||
|
for i := 0; i < writeCount; i++ {
|
||||||
|
resp, err := client.Put(ctx, "foo", "bar")
|
||||||
|
require.NoError(t, err)
|
||||||
|
rev = resp.Header.Revision
|
||||||
|
}
|
||||||
|
_, err := client.Compact(ctx, rev)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
require.NoError(t, gofail.Disable("beforeSendWatchResponse"))
|
||||||
|
|
||||||
|
eventCount := 0
|
||||||
|
compacted := false
|
||||||
|
for resp := range wch {
|
||||||
|
err = resp.Err()
|
||||||
|
if err != nil {
|
||||||
|
if !errors.Is(err, rpctypes.ErrCompacted) {
|
||||||
|
t.Fatalf("want watch response err %v but got %v", rpctypes.ErrCompacted, err)
|
||||||
|
}
|
||||||
|
compacted = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
eventCount += len(resp.Events)
|
||||||
|
if eventCount == writeCount {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert.Truef(t, compacted, "Expected stream to get compacted, instead we got %d events out of %d events", eventCount, writeCount)
|
||||||
|
}
|
||||||
|
@ -36,7 +36,7 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g
|
|||||||
|
|
||||||
.PHONY: gofail-enable
|
.PHONY: gofail-enable
|
||||||
gofail-enable: install-gofail
|
gofail-enable: install-gofail
|
||||||
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/
|
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
|
||||||
cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION}
|
cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION}
|
||||||
cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
|
cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
|
||||||
cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
|
cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
|
||||||
@ -44,7 +44,7 @@ gofail-enable: install-gofail
|
|||||||
|
|
||||||
.PHONY: gofail-disable
|
.PHONY: gofail-disable
|
||||||
gofail-disable: install-gofail
|
gofail-disable: install-gofail
|
||||||
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/
|
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
|
||||||
cd ./server && go mod tidy
|
cd ./server && go mod tidy
|
||||||
cd ./etcdutl && go mod tidy
|
cd ./etcdutl && go mod tidy
|
||||||
cd ./etcdctl && go mod tidy
|
cd ./etcdctl && go mod tidy
|
||||||
|
Loading…
x
Reference in New Issue
Block a user