Merge pull request #1809 from xiang90/unstable

raft: stableTo checks term matching
This commit is contained in:
Xiang Li 2014-12-01 11:09:40 -08:00
commit 92d4112feb
6 changed files with 65 additions and 22 deletions

View File

@ -187,7 +187,7 @@ func (l *raftLog) appliedTo(i uint64) {
l.applied = i
}
func (l *raftLog) stableTo(i uint64) { l.unstable.stableTo(i) }
func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }

View File

@ -392,7 +392,7 @@ func TestUnstableEnts(t *testing.T) {
ents := raftLog.unstableEntries()
if l := len(ents); l > 0 {
raftLog.stableTo(ents[l-1].Index)
raftLog.stableTo(ents[l-1].Index, ents[l-i].Term)
}
if !reflect.DeepEqual(ents, tt.wents) {
t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents)
@ -438,18 +438,58 @@ func TestCommitTo(t *testing.T) {
func TestStableTo(t *testing.T) {
tests := []struct {
stable uint64
stablei uint64
stablet uint64
wunstable uint64
}{
{1, 2},
{2, 3},
{1, 1, 2},
{2, 2, 3},
{2, 1, 1}, // bad term
{3, 1, 1}, // bad index
}
for i, tt := range tests {
raftLog := newLog(NewMemoryStorage())
raftLog.append(0, []pb.Entry{{}, {}}...)
raftLog.stableTo(tt.stable)
raftLog.append(0, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
raftLog.stableTo(tt.stablei, tt.stablet)
if raftLog.unstable.offset != tt.wunstable {
t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable, tt.wunstable)
t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable.offset, tt.wunstable)
}
}
}
func TestStableToWithSnap(t *testing.T) {
snapi, snapt := uint64(5), uint64(2)
tests := []struct {
stablei uint64
stablet uint64
newEnts []pb.Entry
wunstable uint64
}{
{snapi + 1, snapt, nil, snapi + 1},
{snapi, snapt, nil, snapi + 1},
{snapi - 1, snapt, nil, snapi + 1},
{snapi + 1, snapt + 1, nil, snapi + 1},
{snapi, snapt + 1, nil, snapi + 1},
{snapi - 1, snapt + 1, nil, snapi + 1},
{snapi + 1, snapt, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 2},
{snapi, snapt, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 1},
{snapi - 1, snapt, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 1},
{snapi + 1, snapt + 1, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 1},
{snapi, snapt + 1, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 1},
{snapi - 1, snapt + 1, []pb.Entry{{Index: snapi + 1, Term: snapt}}, snapi + 1},
}
for i, tt := range tests {
s := NewMemoryStorage()
s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}})
raftLog := newLog(s)
raftLog.append(raftLog.lastIndex(), tt.newEnts...)
raftLog.stableTo(tt.stablei, tt.stablet)
if raftLog.unstable.offset != tt.wunstable {
t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable.offset, tt.wunstable)
}
}
}

View File

@ -16,11 +16,7 @@
package raft
import (
"log"
pb "github.com/coreos/etcd/raft/raftpb"
)
import pb "github.com/coreos/etcd/raft/raftpb"
// unstable.entris[i] has raft log position i+unstable.offset.
// Note that unstable.offset may be less than the highest log
@ -77,13 +73,18 @@ func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
return u.entries[i-u.offset].Term, true
}
func (u *unstable) stableTo(i uint64) {
if i < u.offset || i+1-u.offset > uint64(len(u.entries)) {
log.Panicf("stableTo(%d) is out of range [unstable(%d), len(unstableEnts)(%d)]",
i, u.offset, len(u.entries))
func (u *unstable) stableTo(i, t uint64) {
gt, ok := u.maybeTerm(i)
if !ok {
return
}
// if i < offest, term is matched with the snapshot
// only update the unstalbe entries if term is matched with
// an unstable entry.
if gt == t && i >= u.offset {
u.entries = u.entries[i+1-u.offset:]
u.offset = i + 1
}
u.entries = u.entries[i+1-u.offset:]
u.offset = i + 1
}
func (u *unstable) stableSnapTo(i uint64) {

View File

@ -209,6 +209,7 @@ func (n *node) run(r *raft) {
var readyc chan Ready
var advancec chan struct{}
var prevLastUnstablei uint64
var prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var rd Ready
@ -284,6 +285,7 @@ func (n *node) run(r *raft) {
}
if len(rd.Entries) > 0 {
prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
@ -303,7 +305,7 @@ func (n *node) run(r *raft) {
r.raftLog.appliedTo(prevHardSt.Commit)
}
if havePrevLastUnstablei {
r.raftLog.stableTo(prevLastUnstablei)
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
havePrevLastUnstablei = false
}
r.raftLog.stableSnapTo(prevSnapi)

View File

@ -902,7 +902,7 @@ func commitNoopEntry(r *raft, s *MemoryStorage) {
r.readMessages()
s.Append(r.raftLog.unstableEntries())
r.raftLog.appliedTo(r.raftLog.committed)
r.raftLog.stableTo(r.raftLog.lastIndex())
r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
}
func acceptAndReply(m pb.Message) pb.Message {

View File

@ -31,7 +31,7 @@ import (
func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
// Transfer all unstable entries to "stable" storage.
s.Append(r.raftLog.unstableEntries())
r.raftLog.stableTo(r.raftLog.lastIndex())
r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
ents = r.raftLog.nextEnts()
r.raftLog.appliedTo(r.raftLog.committed)