mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserve: revert the etcdserver side change for the data loss on one node cluster
Since the raft side change has been merged, so we need to revert the etcdserver side change. Refer to https://github.com/etcd-io/etcd/pull/14413 https://github.com/etcd-io/etcd/pull/14400 Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
parent
997260a832
commit
9097e61b40
@ -209,14 +209,6 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
|
||||
updateCommittedIndex(&ap, rh)
|
||||
|
||||
waitWALSync := shouldWaitWALSync(rd)
|
||||
if waitWALSync {
|
||||
// gofail: var raftBeforeSaveWaitWalSync struct{}
|
||||
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
|
||||
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case r.applyc <- ap:
|
||||
case <-r.stopped:
|
||||
@ -241,11 +233,9 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
// gofail: var raftAfterSaveSnap struct{}
|
||||
}
|
||||
|
||||
if !waitWALSync {
|
||||
// gofail: var raftBeforeSave struct{}
|
||||
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
|
||||
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
|
||||
}
|
||||
// gofail: var raftBeforeSave struct{}
|
||||
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
|
||||
r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
|
||||
}
|
||||
if !raft.IsEmptyHardState(rd.HardState) {
|
||||
proposalsCommitted.Set(float64(rd.HardState.Commit))
|
||||
@ -324,43 +314,6 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
}()
|
||||
}
|
||||
|
||||
// For a cluster with only one member, the raft may send both the
|
||||
// unstable entries and committed entries to etcdserver, and there
|
||||
// may have overlapped log entries between them.
|
||||
//
|
||||
// etcd responds to the client once it finishes (actually partially)
|
||||
// the applying workflow. But when the client receives the response,
|
||||
// it doesn't mean etcd has already successfully saved the data,
|
||||
// including BoltDB and WAL, because:
|
||||
// 1. etcd commits the boltDB transaction periodically instead of on each request;
|
||||
// 2. etcd saves WAL entries in parallel with applying the committed entries.
|
||||
//
|
||||
// Accordingly, it might run into a situation of data loss when the etcd crashes
|
||||
// immediately after responding to the client and before the boltDB and WAL
|
||||
// successfully save the data to disk.
|
||||
// Note that this issue can only happen for clusters with only one member.
|
||||
//
|
||||
// For clusters with multiple members, it isn't an issue, because etcd will
|
||||
// not commit & apply the data before it being replicated to majority members.
|
||||
// When the client receives the response, it means the data must have been applied.
|
||||
// It further means the data must have been committed.
|
||||
// Note: for clusters with multiple members, the raft will never send identical
|
||||
// unstable entries and committed entries to etcdserver.
|
||||
//
|
||||
// Refer to https://github.com/etcd-io/etcd/issues/14370.
|
||||
func shouldWaitWALSync(rd raft.Ready) bool {
|
||||
if len(rd.CommittedEntries) == 0 || len(rd.Entries) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check if there is overlap between unstable and committed entries
|
||||
// assuming that their index and term are only incrementing.
|
||||
lastCommittedEntry := rd.CommittedEntries[len(rd.CommittedEntries)-1]
|
||||
firstUnstableEntry := rd.Entries[0]
|
||||
return lastCommittedEntry.Term > firstUnstableEntry.Term ||
|
||||
(lastCommittedEntry.Term == firstUnstableEntry.Term && lastCommittedEntry.Index >= firstUnstableEntry.Index)
|
||||
}
|
||||
|
||||
func updateCommittedIndex(ap *toApply, rh *raftReadyHandler) {
|
||||
var ci uint64
|
||||
if len(ap.entries) != 0 {
|
||||
|
@ -22,7 +22,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
"go.etcd.io/etcd/pkg/v3/pbutil"
|
||||
"go.etcd.io/etcd/raft/v3"
|
||||
@ -318,79 +317,3 @@ func TestStopRaftNodeMoreThanOnce(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestShouldWaitWALSync(t *testing.T) {
|
||||
testcases := []struct {
|
||||
name string
|
||||
unstableEntries []raftpb.Entry
|
||||
commitedEntries []raftpb.Entry
|
||||
expectedResult bool
|
||||
}{
|
||||
{
|
||||
name: "both entries are nil",
|
||||
unstableEntries: nil,
|
||||
commitedEntries: nil,
|
||||
expectedResult: false,
|
||||
},
|
||||
{
|
||||
name: "both entries are empty slices",
|
||||
unstableEntries: []raftpb.Entry{},
|
||||
commitedEntries: []raftpb.Entry{},
|
||||
expectedResult: false,
|
||||
},
|
||||
{
|
||||
name: "one nil and the other empty",
|
||||
unstableEntries: nil,
|
||||
commitedEntries: []raftpb.Entry{},
|
||||
expectedResult: false,
|
||||
},
|
||||
{
|
||||
name: "one nil and the other has data",
|
||||
unstableEntries: nil,
|
||||
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
|
||||
expectedResult: false,
|
||||
},
|
||||
{
|
||||
name: "one empty and the other has data",
|
||||
unstableEntries: []raftpb.Entry{},
|
||||
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
|
||||
expectedResult: false,
|
||||
},
|
||||
{
|
||||
name: "has different term and index",
|
||||
unstableEntries: []raftpb.Entry{{Term: 5, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
|
||||
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
|
||||
expectedResult: false,
|
||||
},
|
||||
{
|
||||
name: "has identical data",
|
||||
unstableEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
|
||||
commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
|
||||
expectedResult: true,
|
||||
},
|
||||
{
|
||||
name: "has overlapped entry",
|
||||
unstableEntries: []raftpb.Entry{
|
||||
{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}},
|
||||
{Term: 4, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x44, 0x55, 0x66}},
|
||||
{Term: 4, Index: 12, Type: raftpb.EntryNormal, Data: []byte{0x77, 0x88, 0x99}},
|
||||
},
|
||||
commitedEntries: []raftpb.Entry{
|
||||
{Term: 4, Index: 8, Type: raftpb.EntryNormal, Data: []byte{0x07, 0x08, 0x09}},
|
||||
{Term: 4, Index: 9, Type: raftpb.EntryNormal, Data: []byte{0x10, 0x11, 0x12}},
|
||||
{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}},
|
||||
},
|
||||
expectedResult: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
shouldWALSync := shouldWaitWALSync(raft.Ready{
|
||||
Entries: tc.unstableEntries,
|
||||
CommittedEntries: tc.commitedEntries,
|
||||
})
|
||||
assert.Equal(t, tc.expectedResult, shouldWALSync)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user