diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 22d0c0173..a4ba6e219 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -215,6 +215,18 @@ func (r *raftNode) start(rh *raftReadyHandler) { notifyc: notifyc, } + waitWALSync := shouldWaitWALSync(rd) + if waitWALSync { + // gofail: var raftBeforeSaveWaitWalSync struct{} + if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { + if r.lg != nil { + r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) + } else { + plog.Fatalf("failed to save state and entries error: %v", err) + } + } + } + updateCommittedIndex(&ap, rh) select { @@ -245,12 +257,14 @@ func (r *raftNode) start(rh *raftReadyHandler) { // gofail: var raftAfterSaveSnap struct{} } - // gofail: var raftBeforeSave struct{} - if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { - if r.lg != nil { - r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) - } else { - plog.Fatalf("failed to save state and entries error: %v", err) + if !waitWALSync { + // gofail: var raftBeforeSave struct{} + if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { + if r.lg != nil { + r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err)) + } else { + plog.Fatalf("failed to save state and entries error: %v", err) + } } } if !raft.IsEmptyHardState(rd.HardState) { @@ -342,6 +356,42 @@ func (r *raftNode) start(rh *raftReadyHandler) { }() } +// For a cluster with only one member, the raft may send both the +// unstable entries and committed entries to etcdserver, and there +// may have overlapped log entries between them. +// +// etcd responds to the client once it finishes (actually partially) +// the applying workflow. But when the client receives the response, +// it doesn't mean etcd has already successfully saved the data, +// including BoltDB and WAL, because: +// 1. etcd commits the boltDB transaction periodically instead of on each request; +// 2. etcd saves WAL entries in parallel with applying the committed entries. +// Accordingly, it might run into a situation of data loss when the etcd crashes +// immediately after responding to the client and before the boltDB and WAL +// successfully save the data to disk. +// Note that this issue can only happen for clusters with only one member. +// +// For clusters with multiple members, it isn't an issue, because etcd will +// not commit & apply the data before it being replicated to majority members. +// When the client receives the response, it means the data must have been applied. +// It further means the data must have been committed. +// Note: for clusters with multiple members, the raft will never send identical +// unstable entries and committed entries to etcdserver. +// +// Refer to https://github.com/etcd-io/etcd/issues/14370. +func shouldWaitWALSync(rd raft.Ready) bool { + if len(rd.CommittedEntries) == 0 || len(rd.Entries) == 0 { + return false + } + + // Check if there is overlap between unstable and committed entries + // assuming that their index and term are only incrementing. + lastCommittedEntry := rd.CommittedEntries[len(rd.CommittedEntries)-1] + firstUnstableEntry := rd.Entries[0] + return lastCommittedEntry.Term > firstUnstableEntry.Term || + (lastCommittedEntry.Term == firstUnstableEntry.Term && lastCommittedEntry.Index >= firstUnstableEntry.Index) +} + func updateCommittedIndex(ap *apply, rh *raftReadyHandler) { var ci uint64 if len(ap.entries) != 0 { diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index 6c5164d48..4d373b83c 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/pkg/mock/mockstorage" "go.etcd.io/etcd/pkg/pbutil" @@ -267,3 +268,79 @@ func TestProcessDuplicatedAppRespMessage(t *testing.T) { t.Errorf("count = %d, want %d", got, want) } } + +func TestShouldWaitWALSync(t *testing.T) { + testcases := []struct { + name string + unstableEntries []raftpb.Entry + commitedEntries []raftpb.Entry + expectedResult bool + }{ + { + name: "both entries are nil", + unstableEntries: nil, + commitedEntries: nil, + expectedResult: false, + }, + { + name: "both entries are empty slices", + unstableEntries: []raftpb.Entry{}, + commitedEntries: []raftpb.Entry{}, + expectedResult: false, + }, + { + name: "one nil and the other empty", + unstableEntries: nil, + commitedEntries: []raftpb.Entry{}, + expectedResult: false, + }, + { + name: "one nil and the other has data", + unstableEntries: nil, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "one empty and the other has data", + unstableEntries: []raftpb.Entry{}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "has different term and index", + unstableEntries: []raftpb.Entry{{Term: 5, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: false, + }, + { + name: "has identical data", + unstableEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}}, + expectedResult: true, + }, + { + name: "has overlapped entry", + unstableEntries: []raftpb.Entry{ + {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}, + {Term: 4, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x44, 0x55, 0x66}}, + {Term: 4, Index: 12, Type: raftpb.EntryNormal, Data: []byte{0x77, 0x88, 0x99}}, + }, + commitedEntries: []raftpb.Entry{ + {Term: 4, Index: 8, Type: raftpb.EntryNormal, Data: []byte{0x07, 0x08, 0x09}}, + {Term: 4, Index: 9, Type: raftpb.EntryNormal, Data: []byte{0x10, 0x11, 0x12}}, + {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}, + }, + expectedResult: true, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + shouldWALSync := shouldWaitWALSync(raft.Ready{ + Entries: tc.unstableEntries, + CommittedEntries: tc.commitedEntries, + }) + assert.Equal(t, tc.expectedResult, shouldWALSync) + }) + } +}