diff --git a/tests/integration/v3_watch_restore_test.go b/tests/integration/v3_watch_restore_test.go index bdebeacfc..9178d95f6 100644 --- a/tests/integration/v3_watch_restore_test.go +++ b/tests/integration/v3_watch_restore_test.go @@ -17,10 +17,14 @@ package integration import ( "context" "fmt" + "strings" + "sync" + "sync/atomic" "testing" "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/client/pkg/v3/testutil" "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/integration" ) @@ -54,7 +58,9 @@ func MustFetchNotEmptyMetric(tb testing.TB, member *integration.Member, metric s func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewCluster(t, &integration.ClusterConfig{ + logMonitor := newTestingLogfMonitor(t) + + clus := integration.NewCluster(logMonitor, &integration.ClusterConfig{ Size: 3, SnapshotCount: 10, SnapshotCatchUpEntries: 5, @@ -81,7 +87,7 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { } clus.Members[0].InjectPartition(t, clus.Members[1:]...) - initialLead := clus.WaitMembersForLeader(t, clus.Members[1:]) + initialLead := clus.WaitMembersForLeader(t, clus.Members[1:]) + 1 t.Logf("elected lead: %v", clus.Members[initialLead].Server.MemberId()) t.Logf("sleeping for 2 seconds") time.Sleep(2 * time.Second) @@ -89,6 +95,23 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { kvc := integration.ToGRPC(clus.Client(1)).KV + // NOTE: When starting a new cluster with 3 members, each member will + // apply 3 ConfChange directly at the beginning before a leader is + // elected. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet + // changes. So member 0 has index 8 in raft log before network + // partition. We need to trigger EtcdServer.snapshot() at least twice. + // + // SnapshotCount: 10, SnapshotCatchUpEntries: 5 + // + // T1: L(snapshot-index: 11, compacted-index: 6), F_m0(index:8) + // T2: L(snapshot-index: 22, compacted-index: 17), F_m0(index:8, out of date) + // + // Since there is no way to confirm server has compacted the log, we + // use log monitor to watch and expect "compacted Raft logs" content. + logSubID := "compacted" + logSub := newLineCountExpecter("compacted Raft logs", 4) // two members + logMonitor.addSubscriber(logSubID, logSub) + // to trigger snapshot from the leader to the stopped follower for i := 0; i < 15; i++ { _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) @@ -97,8 +120,16 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { } } - // trigger snapshot send from leader to this slow follower - // which then calls watchable store Restore + // ensure two members has compacted the log twice. + if err := logSub.wait(5 * time.Second); err != nil { + t.Fatal("Failed to ensure that members compacted Raft log in 5 seconds") + } + logMonitor.delSubscriber(logSubID) + t.Logf("two members have compacted raft logs") + + // After RecoverPartition, leader L will send snapshot to slow F_m0 + // follower, because F_m0(index:8) is 'out of date' compared to + // L(compacted-index:17). clus.Members[0].RecoverPartition(t, clus.Members[1:]...) // We don't expect leadership change here, just recompute the leader'Server index // within clus.Members list. @@ -154,3 +185,89 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { } } } + +type lineCountExpecter struct { + doneOnce sync.Once + doneCh chan struct{} + + content string + count int64 + seen int64 +} + +func newLineCountExpecter(expectedContent string, expectedCount int64) *lineCountExpecter { + return &lineCountExpecter{ + doneCh: make(chan struct{}), + content: expectedContent, + count: expectedCount, + } +} + +func (le *lineCountExpecter) Notify(log string) { + if !strings.Contains(log, le.content) { + return + } + + if atomic.AddInt64(&le.seen, 1) >= le.count { + le.doneOnce.Do(func() { + close(le.doneCh) + }) + } +} + +func (le *lineCountExpecter) wait(timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + + select { + case <-le.doneCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +type testingLogfSubscriber interface { + Notify(log string) +} + +// testingLogfMonitor is to monitor t.Logf output. +type testingLogfMonitor struct { + testutil.TB + + mu sync.RWMutex + subscribers map[string]testingLogfSubscriber +} + +func newTestingLogfMonitor(tb testutil.TB) *testingLogfMonitor { + return &testingLogfMonitor{ + TB: tb, + subscribers: make(map[string]testingLogfSubscriber), + } +} + +func (m *testingLogfMonitor) addSubscriber(id string, sub testingLogfSubscriber) { + m.mu.Lock() + defer m.mu.Unlock() + m.subscribers[id] = sub +} + +func (m *testingLogfMonitor) delSubscriber(id string) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.subscribers, id) +} + +func (m *testingLogfMonitor) Logf(format string, args ...interface{}) { + m.mu.RLock() + if len(m.subscribers) > 0 { + log := fmt.Sprintf(format, args...) + + for _, sub := range m.subscribers { + sub.Notify(log) + } + } + m.mu.RUnlock() + + m.TB.Logf(format, args...) +}