separate maybeCompactRaftLog function to compact raft log independently from snapshots

Signed-off-by: Clement <gh.2lgqz@aleeas.com>
This commit is contained in:
Clement 2024-09-24 18:06:55 +08:00
parent 59cfd7a61d
commit dff53056f5

View File

@ -980,6 +980,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
<-apply.notifyc
s.triggerSnapshot(ep)
s.maybeCompactRaftLog(ep)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
@ -2170,6 +2171,21 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}
func (s *EtcdServer) maybeCompactRaftLog(ep *etcdProgress) {
// Retain all log entries up to the latest snapshot index to ensure any member can recover from that snapshot.
// Beyond the snapshot index, preserve the most recent s.Cfg.SnapshotCatchUpEntries entries in memory.
// This allows slow followers to catch up by synchronizing entries instead of requiring a full snapshot transfer.
// Only compact raft log once every N applies
if ep.snapi <= s.Cfg.SnapshotCatchUpEntries {
return
}
// make sure compacti >= 0, because s.r.raftStorage.Compact(compacti) always returns ErrCompacted if compacti <= 0
compacti := ep.snapi - s.Cfg.SnapshotCatchUpEntries
lg := s.Logger()
// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
@ -2181,13 +2197,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}
// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}
err = s.r.raftStorage.Compact(compacti)
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.