mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver/*, wal/*: changes to snapshots and wal logic to fix #10219
This commit is contained in:
parent
ab494956bf
commit
9162cd613d
2
build
2
build
@ -16,7 +16,7 @@ GO_LDFLAGS="$GO_LDFLAGS -X ${REPO_PATH}/version.GitSHA=${GIT_SHA}"
|
||||
toggle_failpoints() {
|
||||
mode="$1"
|
||||
if command -v gofail >/dev/null 2>&1; then
|
||||
gofail "$mode" etcdserver/ mvcc/backend/
|
||||
gofail "$mode" etcdserver/ mvcc/backend/ wal/
|
||||
elif [[ "$mode" != "disable" ]]; then
|
||||
echo "FAILPOINTS set but gofail not found"
|
||||
exit 1
|
||||
|
@ -38,6 +38,7 @@ const snapSuffix = ".snap"
|
||||
|
||||
var (
|
||||
ErrNoSnapshot = errors.New("snap: no available snapshot")
|
||||
ErrSnapshotIndex = errors.New("snap: no available snapshot index")
|
||||
ErrEmptySnapshot = errors.New("snap: empty snapshot")
|
||||
ErrCRCMismatch = errors.New("snap: crc mismatch")
|
||||
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
||||
@ -119,6 +120,23 @@ func (s *Snapshotter) Load() (*raftpb.Snapshot, error) {
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
func (s *Snapshotter) LoadIndex(i uint64) (*raftpb.Snapshot, error) {
|
||||
names, err := s.snapNames()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(names) == 0 {
|
||||
return nil, ErrNoSnapshot
|
||||
}
|
||||
|
||||
if i >= uint64(len(names)) {
|
||||
return nil, ErrSnapshotIndex
|
||||
}
|
||||
|
||||
return loadSnap(s.lg, s.dir, names[i])
|
||||
}
|
||||
|
||||
func loadSnap(lg *zap.Logger, dir, name string) (*raftpb.Snapshot, error) {
|
||||
fpath := filepath.Join(dir, name)
|
||||
snap, err := Read(lg, fpath)
|
||||
|
@ -227,27 +227,36 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
r.transport.Send(r.processMessages(rd.Messages))
|
||||
}
|
||||
|
||||
// gofail: var raftBeforeSave struct{}
|
||||
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
|
||||
if !raft.IsEmptySnap(rd.Snapshot) {
|
||||
// gofail: var raftBeforeSaveSnap struct{}
|
||||
if err := r.storage.SaveSnapshot(rd.Snapshot); err != nil {
|
||||
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
|
||||
}
|
||||
// gofail: var raftAfterSaveSnap struct{}
|
||||
}
|
||||
|
||||
// gofail: var raftBeforeSaveAll struct{}
|
||||
if err := r.storage.SaveAll(rd.HardState, rd.Entries, rd.Snapshot); err != nil {
|
||||
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
|
||||
}
|
||||
if !raft.IsEmptyHardState(rd.HardState) {
|
||||
proposalsCommitted.Set(float64(rd.HardState.Commit))
|
||||
}
|
||||
// gofail: var raftAfterSave struct{}
|
||||
// gofail: var raftAfterSaveAll struct{}
|
||||
|
||||
if !raft.IsEmptySnap(rd.Snapshot) {
|
||||
// gofail: var raftBeforeSaveSnap struct{}
|
||||
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
|
||||
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
|
||||
}
|
||||
// etcdserver now claim the snapshot has been persisted onto the disk
|
||||
notifyc <- struct{}{}
|
||||
|
||||
// gofail: var raftAfterSaveSnap struct{}
|
||||
// gofail: var raftBeforeApplySnap struct{}
|
||||
r.raftStorage.ApplySnapshot(rd.Snapshot)
|
||||
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
|
||||
// gofail: var raftAfterApplySnap struct{}
|
||||
|
||||
if err := r.storage.Release(rd.Snapshot); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
// gofail: var raftAfterWALRelease struct{}
|
||||
}
|
||||
|
||||
r.raftStorage.Append(rd.Entries)
|
||||
|
@ -67,14 +67,14 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultSnapshotCount = 100000
|
||||
DefaultSnapshotCount = 10
|
||||
|
||||
// DefaultSnapshotCatchUpEntries is the number of entries for a slow follower
|
||||
// to catch-up after compacting the raft storage entries.
|
||||
// We expect the follower has a millisecond level latency with the leader.
|
||||
// The max throughput is around 10K. Keep a 5K entries is enough for helping
|
||||
// follower to catch up.
|
||||
DefaultSnapshotCatchUpEntries uint64 = 5000
|
||||
DefaultSnapshotCatchUpEntries uint64 = 10
|
||||
|
||||
StoreClusterPrefix = "/0"
|
||||
StoreKeysPrefix = "/1"
|
||||
@ -414,10 +414,33 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
zap.String("wal-dir", cfg.WALDir()),
|
||||
)
|
||||
}
|
||||
snapshot, err = ss.Load()
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
return nil, err
|
||||
|
||||
// Find a snapshot to start/restart a raft node
|
||||
var (
|
||||
snapshot *raftpb.Snapshot
|
||||
err error
|
||||
)
|
||||
|
||||
for i := uint64(0); ; i++ {
|
||||
snapshot, err = ss.LoadIndex(i)
|
||||
if err != nil && err != snap.ErrNoSnapshot {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err == snap.ErrNoSnapshot {
|
||||
break
|
||||
}
|
||||
|
||||
if checkWALSnap(cfg.Logger, cfg.WALDir(), snapshot) {
|
||||
break
|
||||
}
|
||||
|
||||
cfg.Logger.Info(
|
||||
"skip snapshot",
|
||||
zap.Uint64("index", i),
|
||||
)
|
||||
}
|
||||
|
||||
if snapshot != nil {
|
||||
if err = st.Recovery(snapshot.Data); err != nil {
|
||||
cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
|
||||
|
@ -36,6 +36,14 @@ type Storage interface {
|
||||
SaveSnap(snap raftpb.Snapshot) error
|
||||
// Close closes the Storage and performs finalization.
|
||||
Close() error
|
||||
|
||||
// SaveSnapshot function saves only snapshot to the underlying stable storage.
|
||||
SaveSnapshot(snap raftpb.Snapshot) error
|
||||
// SaveAll function saves ents, snapshot and state to the underlying stable storage.
|
||||
// SaveAll MUST block until st and ents are on stable storage.
|
||||
SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error
|
||||
// Release release release the locked wal files since they will not be used.
|
||||
Release(snap raftpb.Snapshot) error
|
||||
}
|
||||
|
||||
type storage struct {
|
||||
@ -65,6 +73,41 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
|
||||
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
|
||||
}
|
||||
|
||||
// SaveSnapshot saves the snapshot to disk.
|
||||
func (st *storage) SaveSnapshot(snap raftpb.Snapshot) error {
|
||||
return st.Snapshotter.SaveSnap(snap)
|
||||
}
|
||||
|
||||
func (st *storage) Release(snap raftpb.Snapshot) error {
|
||||
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
|
||||
}
|
||||
|
||||
func checkWALSnap(lg *zap.Logger, waldir string, snapshot *raftpb.Snapshot) bool {
|
||||
if snapshot == nil {
|
||||
lg.Fatal("checkWALSnap: snapshot is empty")
|
||||
}
|
||||
|
||||
walsnap := walpb.Snapshot{
|
||||
Index: snapshot.Metadata.Index,
|
||||
Term: snapshot.Metadata.Term,
|
||||
}
|
||||
|
||||
w, _, _, st, _ := readWAL(lg, waldir, walsnap)
|
||||
defer w.Close()
|
||||
|
||||
lg.Info(
|
||||
"checkWALSnap: snapshot and hardstate data",
|
||||
zap.Uint64("snapshot-index", snapshot.Metadata.Index),
|
||||
zap.Uint64("st-commit", st.Commit),
|
||||
)
|
||||
|
||||
if snapshot.Metadata.Index > st.Commit {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func readWAL(lg *zap.Logger, waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
|
||||
var (
|
||||
err error
|
||||
|
66
wal/wal.go
66
wal/wal.go
@ -870,6 +870,72 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
|
||||
return w.sync()
|
||||
}
|
||||
|
||||
func (w *WAL) SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
// short cut, do not call sync
|
||||
if raft.IsEmptyHardState(st) && len(ents) == 0 && raft.IsEmptySnap(snap) {
|
||||
return nil
|
||||
}
|
||||
|
||||
mustSync := raft.MustSync(st, w.state, len(ents))
|
||||
|
||||
if !raft.IsEmptySnap(snap) {
|
||||
mustSync = true
|
||||
}
|
||||
|
||||
// 1. Save entries
|
||||
// TODO(xiangli): no more reference operator
|
||||
for i := range ents {
|
||||
if err := w.saveEntry(&ents[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
// gofail: var raftAfterSaveWALFirstEntry struct{}
|
||||
}
|
||||
// gofail: var raftAfterSaveWALEntries struct{}
|
||||
|
||||
// 2. Save snapshot
|
||||
if !raft.IsEmptySnap(snap) {
|
||||
e := walpb.Snapshot{
|
||||
Index: snap.Metadata.Index,
|
||||
Term: snap.Metadata.Term,
|
||||
}
|
||||
|
||||
b := pbutil.MustMarshal(&e)
|
||||
|
||||
rec := &walpb.Record{Type: snapshotType, Data: b}
|
||||
if err := w.encoder.encode(rec); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update enti only when snapshot is ahead of last index
|
||||
if w.enti < e.Index {
|
||||
w.enti = e.Index
|
||||
}
|
||||
// gofail: var raftAfterSaveWALSnap struct{}
|
||||
}
|
||||
|
||||
// 3. Save HardState
|
||||
if err := w.saveState(&st); err != nil {
|
||||
return err
|
||||
}
|
||||
// gofail: var raftAfterSaveWALState struct{}
|
||||
|
||||
curOff, err := w.tail().Seek(0, io.SeekCurrent)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if curOff < SegmentSizeBytes {
|
||||
if mustSync {
|
||||
return w.sync()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return w.cut()
|
||||
}
|
||||
|
||||
func (w *WAL) saveCrc(prevCrc uint32) error {
|
||||
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user