Merge f1a76a2b6749416801696775f22164aa33f6d050 into c86c93ca2951338115159dcdd20711603044e1f1

This commit is contained in:
Clement 2024-09-26 09:30:13 +08:00 committed by GitHub
commit 2f523f2b38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 47 additions and 15 deletions

View File

@ -58,6 +58,10 @@ type ServerConfig struct {
// follower to catch up.
SnapshotCatchUpEntries uint64
// CompactRaftLogEveryNApplies compact raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
CompactRaftLogEveryNApplies uint64
MaxSnapFiles uint
MaxWALFiles uint

View File

@ -84,6 +84,10 @@ const (
// follower to catch up.
DefaultSnapshotCatchUpEntries uint64 = 5000
// DefaultCompactRaftLogEveryNApplies compact raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
DefaultCompactRaftLogEveryNApplies uint64 = 10
StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1"
@ -569,6 +573,14 @@ func (s *EtcdServer) start() {
)
s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
}
if s.Cfg.CompactRaftLogEveryNApplies == 0 {
lg.Info(
"updating compact raft log every N applies to default",
zap.Uint64("given-compact-raft-log-every-n-applies", s.Cfg.CompactRaftLogEveryNApplies),
zap.Uint64("updated-compact-raft-log-every-n-applies", DefaultCompactRaftLogEveryNApplies),
)
s.Cfg.CompactRaftLogEveryNApplies = DefaultCompactRaftLogEveryNApplies
}
s.w = wait.New()
s.applyWait = wait.NewTimeList()
@ -980,6 +992,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
<-apply.notifyc
s.triggerSnapshot(ep)
s.maybeCompactRaftLog(ep)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
@ -2170,6 +2183,18 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}
func (s *EtcdServer) maybeCompactRaftLog(ep *etcdProgress) {
lg := s.Logger()
// Keep some in memory log entries for slow followers, while keeping the entries up to snapshot index.
// Only compact raft log once every N applies
if ep.appliedi <= ep.snapi+s.Cfg.SnapshotCatchUpEntries || ep.appliedi%s.Cfg.CompactRaftLogEveryNApplies != 0 {
return
}
compacti := ep.appliedi - s.Cfg.SnapshotCatchUpEntries
// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
@ -2181,13 +2206,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}
// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}
err = s.r.raftStorage.Compact(compacti)
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.

View File

@ -148,8 +148,9 @@ type ClusterConfig struct {
MaxTxnOps uint
MaxRequestBytes uint
SnapshotCount uint64
SnapshotCatchUpEntries uint64
SnapshotCount uint64
SnapshotCatchUpEntries uint64
CompactRaftLogEveryNApplies uint64
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
@ -276,6 +277,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
MaxRequestBytes: c.Cfg.MaxRequestBytes,
SnapshotCount: c.Cfg.SnapshotCount,
SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries,
CompactRaftLogEveryNApplies: c.Cfg.CompactRaftLogEveryNApplies,
GRPCKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime,
GRPCKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval,
GRPCKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout,
@ -601,6 +603,7 @@ type MemberConfig struct {
MaxRequestBytes uint
SnapshotCount uint64
SnapshotCatchUpEntries uint64
CompactRaftLogEveryNApplies uint64
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
@ -686,6 +689,10 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
if mcfg.SnapshotCatchUpEntries != 0 {
m.SnapshotCatchUpEntries = mcfg.SnapshotCatchUpEntries
}
m.CompactRaftLogEveryNApplies = etcdserver.DefaultCompactRaftLogEveryNApplies
if mcfg.CompactRaftLogEveryNApplies != 0 {
m.CompactRaftLogEveryNApplies = mcfg.CompactRaftLogEveryNApplies
}
// for the purpose of integration testing, simple token is enough
m.AuthToken = "simple"

View File

@ -55,9 +55,10 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
CompactRaftLogEveryNApplies: 10,
})
defer clus.Terminate(t)
@ -102,11 +103,12 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
// elected. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet
// changes. So member 0 has index 8 in raft log before network
// partition. We need to trigger EtcdServer.snapshot() at least twice.
// Raft log is only compacted when appliedi%CompactRaftLogEveryNApplies==0
//
// SnapshotCount: 10, SnapshotCatchUpEntries: 5
// SnapshotCount: 10, SnapshotCatchUpEntries: 5, CompactRaftLogEveryNApplies: 10
//
// T1: L(snapshot-index: 11, compacted-index: 6), F_m0(index:8)
// T2: L(snapshot-index: 22, compacted-index: 17), F_m0(index:8, out of date)
// T1: L(snapshot-index: 11, compacted-index: 5), F_m0(index:8)
// T2: L(snapshot-index: 22, compacted-index: 15), F_m0(index:8, out of date)
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.