mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver/*: changes to snapshots and wal logic to fix #10219
This commit is contained in:
parent
9162cd613d
commit
3d2b565f98
2
build
2
build
@ -16,7 +16,7 @@ GO_LDFLAGS="$GO_LDFLAGS -X ${REPO_PATH}/version.GitSHA=${GIT_SHA}"
|
|||||||
toggle_failpoints() {
|
toggle_failpoints() {
|
||||||
mode="$1"
|
mode="$1"
|
||||||
if command -v gofail >/dev/null 2>&1; then
|
if command -v gofail >/dev/null 2>&1; then
|
||||||
gofail "$mode" etcdserver/ mvcc/backend/ wal/
|
gofail "$mode" etcdserver/ mvcc/backend/
|
||||||
elif [[ "$mode" != "disable" ]]; then
|
elif [[ "$mode" != "disable" ]]; then
|
||||||
echo "FAILPOINTS set but gofail not found"
|
echo "FAILPOINTS set but gofail not found"
|
||||||
exit 1
|
exit 1
|
||||||
|
@ -229,20 +229,20 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
|
|
||||||
if !raft.IsEmptySnap(rd.Snapshot) {
|
if !raft.IsEmptySnap(rd.Snapshot) {
|
||||||
// gofail: var raftBeforeSaveSnap struct{}
|
// gofail: var raftBeforeSaveSnap struct{}
|
||||||
if err := r.storage.SaveSnapshot(rd.Snapshot); err != nil {
|
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
|
||||||
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
|
r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
|
||||||
}
|
}
|
||||||
// gofail: var raftAfterSaveSnap struct{}
|
// gofail: var raftAfterSaveSnap struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// gofail: var raftBeforeSaveAll struct{}
|
// gofail: var raftBeforeSave struct{}
|
||||||
if err := r.storage.SaveAll(rd.HardState, rd.Entries, rd.Snapshot); err != nil {
|
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
|
||||||
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
|
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
|
||||||
}
|
}
|
||||||
if !raft.IsEmptyHardState(rd.HardState) {
|
if !raft.IsEmptyHardState(rd.HardState) {
|
||||||
proposalsCommitted.Set(float64(rd.HardState.Commit))
|
proposalsCommitted.Set(float64(rd.HardState.Commit))
|
||||||
}
|
}
|
||||||
// gofail: var raftAfterSaveAll struct{}
|
// gofail: var raftAfterSave struct{}
|
||||||
|
|
||||||
if !raft.IsEmptySnap(rd.Snapshot) {
|
if !raft.IsEmptySnap(rd.Snapshot) {
|
||||||
// etcdserver now claim the snapshot has been persisted onto the disk
|
// etcdserver now claim the snapshot has been persisted onto the disk
|
||||||
|
@ -2178,6 +2178,11 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
|
|||||||
if err = s.r.storage.SaveSnap(snap); err != nil {
|
if err = s.r.storage.SaveSnap(snap); err != nil {
|
||||||
lg.Panic("failed to save snapshot", zap.Error(err))
|
lg.Panic("failed to save snapshot", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err = s.r.storage.Release(snap); err != nil {
|
||||||
|
lg.Panic("failed to release wal", zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
lg.Info(
|
lg.Info(
|
||||||
"saved snapshot",
|
"saved snapshot",
|
||||||
zap.Uint64("snapshot-index", snap.Metadata.Index),
|
zap.Uint64("snapshot-index", snap.Metadata.Index),
|
||||||
|
@ -36,12 +36,6 @@ type Storage interface {
|
|||||||
SaveSnap(snap raftpb.Snapshot) error
|
SaveSnap(snap raftpb.Snapshot) error
|
||||||
// Close closes the Storage and performs finalization.
|
// Close closes the Storage and performs finalization.
|
||||||
Close() error
|
Close() error
|
||||||
|
|
||||||
// SaveSnapshot function saves only snapshot to the underlying stable storage.
|
|
||||||
SaveSnapshot(snap raftpb.Snapshot) error
|
|
||||||
// SaveAll function saves ents, snapshot and state to the underlying stable storage.
|
|
||||||
// SaveAll MUST block until st and ents are on stable storage.
|
|
||||||
SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error
|
|
||||||
// Release release release the locked wal files since they will not be used.
|
// Release release release the locked wal files since they will not be used.
|
||||||
Release(snap raftpb.Snapshot) error
|
Release(snap raftpb.Snapshot) error
|
||||||
}
|
}
|
||||||
@ -66,15 +60,7 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = st.Snapshotter.SaveSnap(snap)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return st.WAL.ReleaseLockTo(snap.Metadata.Index)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SaveSnapshot saves the snapshot to disk.
|
|
||||||
func (st *storage) SaveSnapshot(snap raftpb.Snapshot) error {
|
|
||||||
return st.Snapshotter.SaveSnap(snap)
|
return st.Snapshotter.SaveSnap(snap)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
66
wal/wal.go
66
wal/wal.go
@ -870,72 +870,6 @@ func (w *WAL) SaveSnapshot(e walpb.Snapshot) error {
|
|||||||
return w.sync()
|
return w.sync()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WAL) SaveAll(st raftpb.HardState, ents []raftpb.Entry, snap raftpb.Snapshot) error {
|
|
||||||
w.mu.Lock()
|
|
||||||
defer w.mu.Unlock()
|
|
||||||
|
|
||||||
// short cut, do not call sync
|
|
||||||
if raft.IsEmptyHardState(st) && len(ents) == 0 && raft.IsEmptySnap(snap) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
mustSync := raft.MustSync(st, w.state, len(ents))
|
|
||||||
|
|
||||||
if !raft.IsEmptySnap(snap) {
|
|
||||||
mustSync = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// 1. Save entries
|
|
||||||
// TODO(xiangli): no more reference operator
|
|
||||||
for i := range ents {
|
|
||||||
if err := w.saveEntry(&ents[i]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// gofail: var raftAfterSaveWALFirstEntry struct{}
|
|
||||||
}
|
|
||||||
// gofail: var raftAfterSaveWALEntries struct{}
|
|
||||||
|
|
||||||
// 2. Save snapshot
|
|
||||||
if !raft.IsEmptySnap(snap) {
|
|
||||||
e := walpb.Snapshot{
|
|
||||||
Index: snap.Metadata.Index,
|
|
||||||
Term: snap.Metadata.Term,
|
|
||||||
}
|
|
||||||
|
|
||||||
b := pbutil.MustMarshal(&e)
|
|
||||||
|
|
||||||
rec := &walpb.Record{Type: snapshotType, Data: b}
|
|
||||||
if err := w.encoder.encode(rec); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// update enti only when snapshot is ahead of last index
|
|
||||||
if w.enti < e.Index {
|
|
||||||
w.enti = e.Index
|
|
||||||
}
|
|
||||||
// gofail: var raftAfterSaveWALSnap struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Save HardState
|
|
||||||
if err := w.saveState(&st); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// gofail: var raftAfterSaveWALState struct{}
|
|
||||||
|
|
||||||
curOff, err := w.tail().Seek(0, io.SeekCurrent)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if curOff < SegmentSizeBytes {
|
|
||||||
if mustSync {
|
|
||||||
return w.sync()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return w.cut()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *WAL) saveCrc(prevCrc uint32) error {
|
func (w *WAL) saveCrc(prevCrc uint32) error {
|
||||||
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user