mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: change the snapshot + compact into sync operation
Signed-off-by: Clement <gh.2lgqz@aleeas.com>
This commit is contained in:
parent
c9fdc609bb
commit
d820cd2b56
@ -2126,65 +2126,63 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
||||
// the go routine created below.
|
||||
s.KV().Commit()
|
||||
|
||||
s.GoAttach(func() {
|
||||
lg := s.Logger()
|
||||
lg := s.Logger()
|
||||
|
||||
// For backward compatibility, generate v2 snapshot from v3 state.
|
||||
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
|
||||
if err != nil {
|
||||
// the snapshot was done asynchronously with the progress of raft.
|
||||
// raft might have already got a newer snapshot.
|
||||
if err == raft.ErrSnapOutOfDate {
|
||||
return
|
||||
}
|
||||
lg.Panic("failed to create snapshot", zap.Error(err))
|
||||
}
|
||||
|
||||
verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
|
||||
|
||||
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
|
||||
if err = s.r.storage.SaveSnap(snap); err != nil {
|
||||
lg.Panic("failed to save snapshot", zap.Error(err))
|
||||
}
|
||||
if err = s.r.storage.Release(snap); err != nil {
|
||||
lg.Panic("failed to release wal", zap.Error(err))
|
||||
}
|
||||
|
||||
lg.Info(
|
||||
"saved snapshot",
|
||||
zap.Uint64("snapshot-index", snap.Metadata.Index),
|
||||
)
|
||||
|
||||
// When sending a snapshot, etcd will pause compaction.
|
||||
// After receives a snapshot, the slow follower needs to get all the entries right after
|
||||
// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
|
||||
// the snapshot sent might already be compacted. It happens when the snapshot takes long time
|
||||
// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
|
||||
if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
|
||||
lg.Info("skip compaction since there is an inflight snapshot")
|
||||
// For backward compatibility, generate v2 snapshot from v3 state.
|
||||
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
|
||||
if err != nil {
|
||||
// the snapshot was done asynchronously with the progress of raft.
|
||||
// raft might have already got a newer snapshot.
|
||||
if err == raft.ErrSnapOutOfDate {
|
||||
return
|
||||
}
|
||||
lg.Panic("failed to create snapshot", zap.Error(err))
|
||||
}
|
||||
|
||||
// keep some in memory log entries for slow followers.
|
||||
compacti := uint64(1)
|
||||
if snapi > s.Cfg.SnapshotCatchUpEntries {
|
||||
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
|
||||
}
|
||||
verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())
|
||||
|
||||
err = s.r.raftStorage.Compact(compacti)
|
||||
if err != nil {
|
||||
// the compaction was done asynchronously with the progress of raft.
|
||||
// raft log might already been compact.
|
||||
if err == raft.ErrCompacted {
|
||||
return
|
||||
}
|
||||
lg.Panic("failed to compact", zap.Error(err))
|
||||
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
|
||||
if err = s.r.storage.SaveSnap(snap); err != nil {
|
||||
lg.Panic("failed to save snapshot", zap.Error(err))
|
||||
}
|
||||
if err = s.r.storage.Release(snap); err != nil {
|
||||
lg.Panic("failed to release wal", zap.Error(err))
|
||||
}
|
||||
|
||||
lg.Info(
|
||||
"saved snapshot",
|
||||
zap.Uint64("snapshot-index", snap.Metadata.Index),
|
||||
)
|
||||
|
||||
// When sending a snapshot, etcd will pause compaction.
|
||||
// After receives a snapshot, the slow follower needs to get all the entries right after
|
||||
// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
|
||||
// the snapshot sent might already be compacted. It happens when the snapshot takes long time
|
||||
// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
|
||||
if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
|
||||
lg.Info("skip compaction since there is an inflight snapshot")
|
||||
return
|
||||
}
|
||||
|
||||
// keep some in memory log entries for slow followers.
|
||||
compacti := uint64(1)
|
||||
if snapi > s.Cfg.SnapshotCatchUpEntries {
|
||||
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
|
||||
}
|
||||
|
||||
err = s.r.raftStorage.Compact(compacti)
|
||||
if err != nil {
|
||||
// the compaction was done asynchronously with the progress of raft.
|
||||
// raft log might already been compact.
|
||||
if err == raft.ErrCompacted {
|
||||
return
|
||||
}
|
||||
lg.Info(
|
||||
"compacted Raft logs",
|
||||
zap.Uint64("compact-index", compacti),
|
||||
)
|
||||
})
|
||||
lg.Panic("failed to compact", zap.Error(err))
|
||||
}
|
||||
lg.Info(
|
||||
"compacted Raft logs",
|
||||
zap.Uint64("compact-index", compacti),
|
||||
)
|
||||
}
|
||||
|
||||
// CutPeer drops messages to the specified peer.
|
||||
|
Loading…
x
Reference in New Issue
Block a user