raft: use index in entry

This commit is contained in:
Xiang Li 2014-12-02 08:50:15 -08:00
parent 6692a8060e
commit 788d1e59a2
7 changed files with 66 additions and 67 deletions

View File

@ -75,14 +75,14 @@ func (l *raftLog) String() string {
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
lastnewi = index + uint64(len(ents))
if l.matchTerm(index, logTerm) {
from := index + 1
ci := l.findConflict(from, ents)
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
log.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
l.append(ci-1, ents[ci-from:]...)
offset := index + 1
l.append(ents[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
@ -90,11 +90,14 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry
return 0, false
}
func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 {
if after < l.committed {
func (l *raftLog) append(ents ...pb.Entry) uint64 {
if len(ents) == 0 {
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}
l.unstable.truncateAndAppend(after, ents)
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}
@ -109,15 +112,14 @@ func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 {
// a different term.
// The first entry MUST have an index equal to the argument 'from'.
// The index of the given entries MUST be continuously increasing.
func (l *raftLog) findConflict(from uint64, ents []pb.Entry) uint64 {
// TODO(xiangli): validate the index of ents
for offset, ne := range ents {
if i := from + uint64(offset); !l.matchTerm(ne.Index, ne.Term) {
if i <= l.lastIndex() {
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
for _, ne := range ents {
if !l.matchTerm(ne.Index, ne.Term) {
if ne.Index <= l.lastIndex() {
log.Printf("raftlog: found conflict at index %d [existing term: %d, conflicting term: %d]",
i, l.term(i), ne.Term)
ne.Index, l.term(ne.Index), ne.Term)
}
return i
return ne.Index
}
}
return 0

View File

@ -26,33 +26,32 @@ import (
func TestFindConflict(t *testing.T) {
previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}
tests := []struct {
from uint64
ents []pb.Entry
wconflict uint64
}{
// no conflict, empty ent
{1, []pb.Entry{}, 0},
{3, []pb.Entry{}, 0},
{[]pb.Entry{}, 0},
{[]pb.Entry{}, 0},
// no conflict
{1, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}, 0},
{2, []pb.Entry{{Index: 2, Term: 2}, {Index: 3, Term: 3}}, 0},
{3, []pb.Entry{{Index: 3, Term: 3}}, 0},
{[]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}, 0},
{[]pb.Entry{{Index: 2, Term: 2}, {Index: 3, Term: 3}}, 0},
{[]pb.Entry{{Index: 3, Term: 3}}, 0},
// no conflict, but has new entries
{1, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4},
{2, []pb.Entry{{Index: 2, Term: 2}, {Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4},
{3, []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4},
{4, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4},
{[]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4},
{[]pb.Entry{{Index: 2, Term: 2}, {Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4},
{[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4},
{[]pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4},
// conflicts with existing entries
{1, []pb.Entry{{Index: 1, Term: 4}, {Index: 2, Term: 4}}, 1},
{2, []pb.Entry{{Index: 2, Term: 1}, {Index: 3, Term: 4}, {Index: 4, Term: 4}}, 2},
{3, []pb.Entry{{Index: 3, Term: 1}, {Index: 4, Term: 2}, {Index: 5, Term: 4}, {Index: 6, Term: 4}}, 3},
{[]pb.Entry{{Index: 1, Term: 4}, {Index: 2, Term: 4}}, 1},
{[]pb.Entry{{Index: 2, Term: 1}, {Index: 3, Term: 4}, {Index: 4, Term: 4}}, 2},
{[]pb.Entry{{Index: 3, Term: 1}, {Index: 4, Term: 2}, {Index: 5, Term: 4}, {Index: 6, Term: 4}}, 3},
}
for i, tt := range tests {
raftLog := newLog(NewMemoryStorage())
raftLog.append(raftLog.lastIndex(), previousEnts...)
raftLog.append(previousEnts...)
gconflict := raftLog.findConflict(tt.from, tt.ents)
gconflict := raftLog.findConflict(tt.ents)
if gconflict != tt.wconflict {
t.Errorf("#%d: conflict = %d, want %d", i, gconflict, tt.wconflict)
}
@ -62,7 +61,7 @@ func TestFindConflict(t *testing.T) {
func TestIsUpToDate(t *testing.T) {
previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}
raftLog := newLog(NewMemoryStorage())
raftLog.append(raftLog.lastIndex(), previousEnts...)
raftLog.append(previousEnts...)
tests := []struct {
lastIndex uint64
term uint64
@ -93,21 +92,18 @@ func TestIsUpToDate(t *testing.T) {
func TestAppend(t *testing.T) {
previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}
tests := []struct {
after uint64
ents []pb.Entry
windex uint64
wents []pb.Entry
wunstable uint64
}{
{
2,
[]pb.Entry{},
2,
[]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}},
3,
},
{
2,
[]pb.Entry{{Index: 3, Term: 2}},
3,
[]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 2}},
@ -115,7 +111,6 @@ func TestAppend(t *testing.T) {
},
// conflicts with index 1
{
0,
[]pb.Entry{{Index: 1, Term: 2}},
1,
[]pb.Entry{{Index: 1, Term: 2}},
@ -123,7 +118,6 @@ func TestAppend(t *testing.T) {
},
// conflicts with index 2
{
1,
[]pb.Entry{{Index: 2, Term: 3}, {Index: 3, Term: 3}},
3,
[]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 3}, {Index: 3, Term: 3}},
@ -136,7 +130,7 @@ func TestAppend(t *testing.T) {
storage.Append(previousEnts)
raftLog := newLog(storage)
index := raftLog.append(tt.after, tt.ents...)
index := raftLog.append(tt.ents...)
if index != tt.windex {
t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex)
}
@ -242,7 +236,7 @@ func TestLogMaybeAppend(t *testing.T) {
for i, tt := range tests {
raftLog := newLog(NewMemoryStorage())
raftLog.append(raftLog.lastIndex(), previousEnts...)
raftLog.append(previousEnts...)
raftLog.committed = commit
func() {
defer func() {
@ -288,7 +282,7 @@ func TestCompactionSideEffects(t *testing.T) {
}
raftLog := newLog(storage)
for i = unstableIndex; i < lastIndex; i++ {
raftLog.append(i, pb.Entry{Term: uint64(i + 1), Index: uint64(i + 1)})
raftLog.append(pb.Entry{Term: uint64(i + 1), Index: uint64(i + 1)})
}
ok := raftLog.maybeCommit(lastIndex, lastTerm)
@ -325,7 +319,7 @@ func TestCompactionSideEffects(t *testing.T) {
}
prev := raftLog.lastIndex()
raftLog.append(raftLog.lastIndex(), pb.Entry{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex() + 1})
raftLog.append(pb.Entry{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex() + 1})
if raftLog.lastIndex() != prev+1 {
t.Errorf("lastIndex = %d, want = %d", raftLog.lastIndex(), prev+1)
}
@ -358,7 +352,7 @@ func TestNextEnts(t *testing.T) {
storage := NewMemoryStorage()
storage.ApplySnapshot(snap)
raftLog := newLog(storage)
raftLog.append(snap.Metadata.Index, ents...)
raftLog.append(ents...)
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied)
@ -388,7 +382,7 @@ func TestUnstableEnts(t *testing.T) {
// append unstable entries to raftlog
raftLog := newLog(storage)
raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable-1:]...)
raftLog.append(previousEnts[tt.unstable-1:]...)
ents := raftLog.unstableEntries()
if l := len(ents); l > 0 {
@ -426,7 +420,7 @@ func TestCommitTo(t *testing.T) {
}
}()
raftLog := newLog(NewMemoryStorage())
raftLog.append(0, previousEnts...)
raftLog.append(previousEnts...)
raftLog.committed = commit
raftLog.commitTo(tt.commit)
if raftLog.committed != tt.wcommit {
@ -449,7 +443,7 @@ func TestStableTo(t *testing.T) {
}
for i, tt := range tests {
raftLog := newLog(NewMemoryStorage())
raftLog.append(0, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
raftLog.stableTo(tt.stablei, tt.stablet)
if raftLog.unstable.offset != tt.wunstable {
t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable.offset, tt.wunstable)
@ -486,7 +480,7 @@ func TestStableToWithSnap(t *testing.T) {
s := NewMemoryStorage()
s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}})
raftLog := newLog(s)
raftLog.append(raftLog.lastIndex(), tt.newEnts...)
raftLog.append(tt.newEnts...)
raftLog.stableTo(tt.stablei, tt.stablet)
if raftLog.unstable.offset != tt.wunstable {
t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable.offset, tt.wunstable)
@ -574,7 +568,9 @@ func TestIsOutOfBounds(t *testing.T) {
storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}})
l := newLog(storage)
l.append(offset, make([]pb.Entry, num)...)
for i := uint64(1); i <= num; i++ {
l.append(pb.Entry{Index: i + offset})
}
tests := []struct {
index uint64
@ -604,7 +600,7 @@ func TestTerm(t *testing.T) {
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}})
l := newLog(storage)
for i = 1; i < num; i++ {
l.append(offset+i-1, pb.Entry{Index: i, Term: i})
l.append(pb.Entry{Index: offset + i, Term: i})
}
tests := []struct {
@ -635,7 +631,7 @@ func TestSlice(t *testing.T) {
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}})
l := newLog(storage)
for i = 1; i < num; i++ {
l.append(offset+i-1, pb.Entry{Index: i, Term: i})
l.append(pb.Entry{Index: offset + i, Term: offset + i})
}
tests := []struct {
@ -645,8 +641,8 @@ func TestSlice(t *testing.T) {
}{
{offset - 1, offset + 1, nil},
{offset, offset + 1, nil},
{offset + num/2, offset + num/2 + 1, []pb.Entry{{Index: num / 2, Term: num / 2}}},
{offset + num - 1, offset + num, []pb.Entry{{Index: num - 1, Term: num - 1}}},
{offset + num/2, offset + num/2 + 1, []pb.Entry{{Index: offset + num/2, Term: offset + num/2}}},
{offset + num - 1, offset + num, []pb.Entry{{Index: offset + num - 1, Term: offset + num - 1}}},
{offset + num, offset + num + 1, nil},
{offset + num/2, offset + num/2, nil},

View File

@ -103,7 +103,8 @@ func (u *unstable) restore(s pb.Snapshot) {
u.snapshot = &s
}
func (u *unstable) truncateAndAppend(after uint64, ents []pb.Entry) {
func (u *unstable) truncateAndAppend(ents []pb.Entry) {
after := ents[0].Index - 1
switch {
case after < u.offset:
log.Printf("raftlog: replace the unstable entries from index %d", after+1)

View File

@ -144,7 +144,7 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage
panic("unexpected marshal error")
}
e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
r.raftLog.append(r.raftLog.lastIndex(), e)
r.raftLog.append(e)
}
// Mark these initial entries as committed.
// TODO(bdarnell): These entries are still unstable; do we need to preserve

View File

@ -320,7 +320,7 @@ func (r *raft) q() int {
func (r *raft) appendEntry(e pb.Entry) {
e.Term = r.Term
e.Index = r.raftLog.lastIndex() + 1
r.raftLog.append(r.raftLog.lastIndex(), e)
r.raftLog.append(e)
r.prs[r.id].update(r.raftLog.lastIndex())
r.maybeCommit()
}

View File

@ -654,9 +654,9 @@ func TestFollowerAppendEntries(t *testing.T) {
},
{
1, 1,
[]pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}},
[]pb.Entry{{Term: 1, Index: 1}, {Term: 3, Index: 3}, {Term: 4, Index: 4}},
[]pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}},
[]pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}},
[]pb.Entry{{Term: 1, Index: 1}, {Term: 3, Index: 2}, {Term: 4, Index: 3}},
[]pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}},
},
{
0, 0,
@ -666,9 +666,9 @@ func TestFollowerAppendEntries(t *testing.T) {
},
{
0, 0,
[]pb.Entry{{Term: 3, Index: 3}},
[]pb.Entry{{Term: 3, Index: 3}},
[]pb.Entry{{Term: 3, Index: 3}},
[]pb.Entry{{Term: 3, Index: 1}},
[]pb.Entry{{Term: 3, Index: 1}},
[]pb.Entry{{Term: 3, Index: 1}},
},
}
for i, tt := range tests {

View File

@ -662,16 +662,16 @@ func TestHandleMsgApp(t *testing.T) {
// Ensure 2
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false},
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, false},
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, false},
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, false},
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false},
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Index: 1, Term: 2}}}, 1, 1, false},
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Index: 3, Term: 2}, {Index: 4, Term: 2}}}, 4, 3, false},
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Index: 3, Term: 2}}}, 3, 3, false},
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false},
// Ensure 3
{pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1
{pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last()
{pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1
{pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2
{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last()
}
for i, tt := range tests {
@ -1077,7 +1077,7 @@ func TestLeaderIncreaseNext(t *testing.T) {
for i, tt := range tests {
sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
sm.raftLog.append(0, previousEnts...)
sm.raftLog.append(previousEnts...)
sm.becomeCandidate()
sm.becomeLeader()
sm.prs[2].match, sm.prs[2].next = tt.match, tt.next
@ -1343,7 +1343,7 @@ func TestRaftNodes(t *testing.T) {
}
func ents(terms ...uint64) *raft {
ents := []pb.Entry{{}}
ents := []pb.Entry{}
for i, term := range terms {
ents = append(ents, pb.Entry{Index: uint64(i), Term: term})
}