diff --git a/raft/log.go b/raft/log.go index 2b7722717..cade71a44 100644 --- a/raft/log.go +++ b/raft/log.go @@ -187,7 +187,7 @@ func (l *raftLog) appliedTo(i uint64) { l.applied = i } -func (l *raftLog) stableTo(i uint64) { l.unstable.stableTo(i) } +func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) } func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } diff --git a/raft/log_test.go b/raft/log_test.go index e3a324030..58ad414b1 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -392,7 +392,7 @@ func TestUnstableEnts(t *testing.T) { ents := raftLog.unstableEntries() if l := len(ents); l > 0 { - raftLog.stableTo(ents[l-1].Index) + raftLog.stableTo(ents[l-1].Index, ents[l-i].Term) } if !reflect.DeepEqual(ents, tt.wents) { t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents) @@ -438,18 +438,58 @@ func TestCommitTo(t *testing.T) { func TestStableTo(t *testing.T) { tests := []struct { - stable uint64 + stablei uint64 + stablet uint64 wunstable uint64 }{ - {1, 2}, - {2, 3}, + {1, 1, 2}, + {2, 2, 3}, + {2, 1, 1}, // bad term + {3, 1, 1}, // bad index } for i, tt := range tests { raftLog := newLog(NewMemoryStorage()) - raftLog.append(0, []pb.Entry{{}, {}}...) - raftLog.stableTo(tt.stable) + raftLog.append(0, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) + raftLog.stableTo(tt.stablei, tt.stablet) if raftLog.unstable.offset != tt.wunstable { - t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable, tt.wunstable) + t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable.offset, tt.wunstable) + } + } +} + +func TestStableToWithSnap(t *testing.T) { + snapi, snapt := uint64(5), uint64(2) + tests := []struct { + stablei uint64 + stablet uint64 + newEnts []pb.Entry + + wunstable uint64 + }{ + {snapi + 1, snapt, nil, snapi + 1}, + {snapi, snapt, nil, snapi + 1}, + {snapi - 1, snapt, nil, snapi + 1}, + + {snapi + 1, snapt + 1, nil, snapi + 1}, + {snapi, snapt + 1, nil, snapi + 1}, + {snapi - 1, snapt + 1, nil, snapi + 1}, + + {snapi + 1, snapt, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 2}, + {snapi, snapt, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 1}, + {snapi - 1, snapt, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 1}, + + {snapi + 1, snapt + 1, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 1}, + {snapi, snapt + 1, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 1}, + {snapi - 1, snapt + 1, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 1}, + } + for i, tt := range tests { + s := NewMemoryStorage() + s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}}) + raftLog := newLog(s) + raftLog.append(raftLog.lastIndex(), tt.newEnts...) + raftLog.stableTo(tt.stablei, tt.stablet) + if raftLog.unstable.offset != tt.wunstable { + t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable.offset, tt.wunstable) } } } diff --git a/raft/log_unstable.go b/raft/log_unstable.go index 1c50cddca..349a4c323 100644 --- a/raft/log_unstable.go +++ b/raft/log_unstable.go @@ -16,11 +16,7 @@ package raft -import ( - "log" - - pb "github.com/coreos/etcd/raft/raftpb" -) +import pb "github.com/coreos/etcd/raft/raftpb" // unstable.entris[i] has raft log position i+unstable.offset. // Note that unstable.offset may be less than the highest log @@ -77,13 +73,18 @@ func (u *unstable) maybeTerm(i uint64) (uint64, bool) { return u.entries[i-u.offset].Term, true } -func (u *unstable) stableTo(i uint64) { - if i < u.offset || i+1-u.offset > uint64(len(u.entries)) { - log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]", - i, u.offset, len(u.entries)) +func (u *unstable) stableTo(i, t uint64) { + gt, ok := u.maybeTerm(i) + if !ok { + return + } + // if i < offest, term is matched with the snapshot + // only update the unstalbe entries if term is matched with + // an unstable entry. + if gt == t && i >= u.offset { + u.entries = u.entries[i+1-u.offset:] + u.offset = i + 1 } - u.entries = u.entries[i+1-u.offset:] - u.offset = i + 1 } func (u *unstable) stableSnapTo(i uint64) { diff --git a/raft/node.go b/raft/node.go index b81d77773..81d1fdad5 100644 --- a/raft/node.go +++ b/raft/node.go @@ -209,6 +209,7 @@ func (n *node) run(r *raft) { var readyc chan Ready var advancec chan struct{} var prevLastUnstablei uint64 + var prevLastUnstablet uint64 var havePrevLastUnstablei bool var prevSnapi uint64 var rd Ready @@ -284,6 +285,7 @@ func (n *node) run(r *raft) { } if len(rd.Entries) > 0 { prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index + prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term havePrevLastUnstablei = true } if !IsEmptyHardState(rd.HardState) { @@ -303,7 +305,7 @@ func (n *node) run(r *raft) { r.raftLog.appliedTo(prevHardSt.Commit) } if havePrevLastUnstablei { - r.raftLog.stableTo(prevLastUnstablei) + r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) havePrevLastUnstablei = false } r.raftLog.stableSnapTo(prevSnapi) diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index d2be61129..c4d1d207f 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -902,7 +902,7 @@ func commitNoopEntry(r *raft, s *MemoryStorage) { r.readMessages() s.Append(r.raftLog.unstableEntries()) r.raftLog.appliedTo(r.raftLog.committed) - r.raftLog.stableTo(r.raftLog.lastIndex()) + r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) } func acceptAndReply(m pb.Message) pb.Message { diff --git a/raft/raft_test.go b/raft/raft_test.go index 13d14f34a..81ccfccb4 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -31,7 +31,7 @@ import ( func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) { // Transfer all unstable entries to "stable" storage. s.Append(r.raftLog.unstableEntries()) - r.raftLog.stableTo(r.raftLog.lastIndex()) + r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm()) ents = r.raftLog.nextEnts() r.raftLog.appliedTo(r.raftLog.committed)