From 3c0fbe285c5fbbe045c41ed978a5eb63883d8ad9 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 28 Nov 2014 14:13:07 -0800 Subject: [PATCH] raft: stableTo checks term matching stableTo should only mark the index stable if the term is matched. After raft sends out unstable entries to application, raft makes progress without waiting for reply. When the appliaction calls the stableTo to notify the entries up to "index" are stable, raft might have truncated some entries before "index" due to leader lost. raft must verify the (index,term) of stableTo, before marking the entries as stable. --- raft/log.go | 2 +- raft/log_test.go | 6 +++--- raft/log_unstable.go | 18 +++++++----------- raft/node.go | 4 +++- raft/raft_paper_test.go | 2 +- raft/raft_test.go | 2 +- 6 files changed, 16 insertions(+), 18 deletions(-) diff --git a/raft/log.go b/raft/log.go index 2b7722717..cade71a44 100644 --- a/raft/log.go +++ b/raft/log.go @@ -187,7 +187,7 @@ func (l *raftLog) appliedTo(i uint64) { l.applied = i } -func (l *raftLog) stableTo(i uint64) { l.unstable.stableTo(i) } +func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) } func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } diff --git a/raft/log_test.go b/raft/log_test.go index e3a324030..34e3bdf86 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -392,7 +392,7 @@ func TestUnstableEnts(t *testing.T) { ents := raftLog.unstableEntries() if l := len(ents); l > 0 { - raftLog.stableTo(ents[l-1].Index) + raftLog.stableTo(ents[l-1].Index, ents[l-i].Term) } if !reflect.DeepEqual(ents, tt.wents) { t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents) @@ -446,8 +446,8 @@ func TestStableTo(t *testing.T) { } for i, tt := range tests { raftLog := newLog(NewMemoryStorage()) - raftLog.append(0, []pb.Entry{{}, {}}...) - raftLog.stableTo(tt.stable) + raftLog.append(0, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}...) + raftLog.stableTo(tt.stable, 1) if raftLog.unstable.offset != tt.wunstable { t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable, tt.wunstable) } diff --git a/raft/log_unstable.go b/raft/log_unstable.go index 1c50cddca..287f80971 100644 --- a/raft/log_unstable.go +++ b/raft/log_unstable.go @@ -16,11 +16,7 @@ package raft -import ( - "log" - - pb "github.com/coreos/etcd/raft/raftpb" -) +import pb "github.com/coreos/etcd/raft/raftpb" // unstable.entris[i] has raft log position i+unstable.offset. // Note that unstable.offset may be less than the highest log @@ -77,13 +73,13 @@ func (u *unstable) maybeTerm(i uint64) (uint64, bool) { return u.entries[i-u.offset].Term, true } -func (u *unstable) stableTo(i uint64) { - if i < u.offset || i+1-u.offset > uint64(len(u.entries)) { - log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]", - i, u.offset, len(u.entries)) +func (u *unstable) stableTo(i, t uint64) { + if gt, ok := u.maybeTerm(i); ok { + if gt == t && i >= u.offset { + u.entries = u.entries[i+1-u.offset:] + u.offset = i + 1 + } } - u.entries = u.entries[i+1-u.offset:] - u.offset = i + 1 } func (u *unstable) stableSnapTo(i uint64) { diff --git a/raft/node.go b/raft/node.go index b81d77773..81d1fdad5 100644 --- a/raft/node.go +++ b/raft/node.go @@ -209,6 +209,7 @@ func (n *node) run(r *raft) { var readyc chan Ready var advancec chan struct{} var prevLastUnstablei uint64 + var prevLastUnstablet uint64 var havePrevLastUnstablei bool var prevSnapi uint64 var rd Ready @@ -284,6 +285,7 @@ func (n *node) run(r *raft) { } if len(rd.Entries) > 0 { prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index + prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term havePrevLastUnstablei = true } if !IsEmptyHardState(rd.HardState) { @@ -303,7 +305,7 @@ func (n *node) run(r *raft) { r.raftLog.appliedTo(prevHardSt.Commit) } if havePrevLastUnstablei { - r.raftLog.stableTo(prevLastUnstablei) + r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) havePrevLastUnstablei = false } r.raftLog.stableSnapTo(prevSnapi) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index d2be61129..c4d1d207f 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -902,7 +902,7 @@ func commitNoopEntry(r *raft, s *MemoryStorage) { r.readMessages() s.Append(r.raftLog.unstableEntries()) r.raftLog.appliedTo(r.raftLog.committed) - r.raftLog.stableTo(r.raftLog.lastIndex()) + r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) } func acceptAndReply(m pb.Message) pb.Message { diff --git a/raft/raft_test.go b/raft/raft_test.go index 13d14f34a..81ccfccb4 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -31,7 +31,7 @@ import ( func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) { // Transfer all unstable entries to "stable" storage. s.Append(r.raftLog.unstableEntries()) - r.raftLog.stableTo(r.raftLog.lastIndex()) + r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) ents = r.raftLog.nextEnts() r.raftLog.appliedTo(r.raftLog.committed)