Merge pull request #15667 from fuweid/deflake-issue-15545-TestV3WatchRestoreSnapshotUnsync

tests: deflake TestV3WatchRestoreSnapshotUnsync
This commit is contained in:
Benjamin Wang 2023-04-11 06:00:42 +08:00 committed by GitHub
commit 1683231216
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -17,10 +17,14 @@ package integration
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/testutil"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/integration"
)
@ -54,7 +58,9 @@ func MustFetchNotEmptyMetric(tb testing.TB, member *integration.Member, metric s
func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{
logMonitor := newTestingLogfMonitor(t)
clus := integration.NewCluster(logMonitor, &integration.ClusterConfig{
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
@ -81,7 +87,7 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}
clus.Members[0].InjectPartition(t, clus.Members[1:]...)
initialLead := clus.WaitMembersForLeader(t, clus.Members[1:])
initialLead := clus.WaitMembersForLeader(t, clus.Members[1:]) + 1
t.Logf("elected lead: %v", clus.Members[initialLead].Server.MemberId())
t.Logf("sleeping for 2 seconds")
time.Sleep(2 * time.Second)
@ -89,6 +95,23 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
kvc := integration.ToGRPC(clus.Client(1)).KV
// NOTE: When starting a new cluster with 3 members, each member will
// apply 3 ConfChange directly at the beginning before a leader is
// elected. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet
// changes. So member 0 has index 8 in raft log before network
// partition. We need to trigger EtcdServer.snapshot() at least twice.
//
// SnapshotCount: 10, SnapshotCatchUpEntries: 5
//
// T1: L(snapshot-index: 11, compacted-index: 6), F_m0(index:8)
// T2: L(snapshot-index: 22, compacted-index: 17), F_m0(index:8, out of date)
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
logSubID := "compacted"
logSub := newLineCountExpecter("compacted Raft logs", 4) // two members
logMonitor.addSubscriber(logSubID, logSub)
// to trigger snapshot from the leader to the stopped follower
for i := 0; i < 15; i++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
@ -97,8 +120,16 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}
}
// trigger snapshot send from leader to this slow follower
// which then calls watchable store Restore
// ensure two members has compacted the log twice.
if err := logSub.wait(5 * time.Second); err != nil {
t.Fatal("Failed to ensure that members compacted Raft log in 5 seconds")
}
logMonitor.delSubscriber(logSubID)
t.Logf("two members have compacted raft logs")
// After RecoverPartition, leader L will send snapshot to slow F_m0
// follower, because F_m0(index:8) is 'out of date' compared to
// L(compacted-index:17).
clus.Members[0].RecoverPartition(t, clus.Members[1:]...)
// We don't expect leadership change here, just recompute the leader'Server index
// within clus.Members list.
@ -154,3 +185,89 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
}
}
}
type lineCountExpecter struct {
doneOnce sync.Once
doneCh chan struct{}
content string
count int64
seen int64
}
func newLineCountExpecter(expectedContent string, expectedCount int64) *lineCountExpecter {
return &lineCountExpecter{
doneCh: make(chan struct{}),
content: expectedContent,
count: expectedCount,
}
}
func (le *lineCountExpecter) Notify(log string) {
if !strings.Contains(log, le.content) {
return
}
if atomic.AddInt64(&le.seen, 1) >= le.count {
le.doneOnce.Do(func() {
close(le.doneCh)
})
}
}
func (le *lineCountExpecter) wait(timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()
select {
case <-le.doneCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
type testingLogfSubscriber interface {
Notify(log string)
}
// testingLogfMonitor is to monitor t.Logf output.
type testingLogfMonitor struct {
testutil.TB
mu sync.RWMutex
subscribers map[string]testingLogfSubscriber
}
func newTestingLogfMonitor(tb testutil.TB) *testingLogfMonitor {
return &testingLogfMonitor{
TB: tb,
subscribers: make(map[string]testingLogfSubscriber),
}
}
func (m *testingLogfMonitor) addSubscriber(id string, sub testingLogfSubscriber) {
m.mu.Lock()
defer m.mu.Unlock()
m.subscribers[id] = sub
}
func (m *testingLogfMonitor) delSubscriber(id string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.subscribers, id)
}
func (m *testingLogfMonitor) Logf(format string, args ...interface{}) {
m.mu.RLock()
if len(m.subscribers) > 0 {
log := fmt.Sprintf(format, args...)
for _, sub := range m.subscribers {
sub.Notify(log)
}
}
m.mu.RUnlock()
m.TB.Logf(format, args...)
}