From 788d1e59a21545e4c5631f1d5dede345a2dd2e56 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 2 Dec 2014 08:50:15 -0800 Subject: [PATCH] raft: use index in entry --- raft/log.go | 28 +++++++++-------- raft/log_test.go | 66 +++++++++++++++++++---------------------- raft/log_unstable.go | 3 +- raft/node.go | 2 +- raft/raft.go | 2 +- raft/raft_paper_test.go | 12 ++++---- raft/raft_test.go | 20 ++++++------- 7 files changed, 66 insertions(+), 67 deletions(-) diff --git a/raft/log.go b/raft/log.go index 65ed6a0a8..31ef7b22e 100644 --- a/raft/log.go +++ b/raft/log.go @@ -75,14 +75,14 @@ func (l *raftLog) String() string { func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { lastnewi = index + uint64(len(ents)) if l.matchTerm(index, logTerm) { - from := index + 1 - ci := l.findConflict(from, ents) + ci := l.findConflict(ents) switch { case ci == 0: case ci <= l.committed: log.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) default: - l.append(ci-1, ents[ci-from:]...) + offset := index + 1 + l.append(ents[ci-offset:]...) } l.commitTo(min(committed, lastnewi)) return lastnewi, true @@ -90,11 +90,14 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry return 0, false } -func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 { - if after < l.committed { +func (l *raftLog) append(ents ...pb.Entry) uint64 { + if len(ents) == 0 { + return l.lastIndex() + } + if after := ents[0].Index - 1; after < l.committed { log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) } - l.unstable.truncateAndAppend(after, ents) + l.unstable.truncateAndAppend(ents) return l.lastIndex() } @@ -109,15 +112,14 @@ func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 { // a different term. // The first entry MUST have an index equal to the argument 'from'. // The index of the given entries MUST be continuously increasing. -func (l *raftLog) findConflict(from uint64, ents []pb.Entry) uint64 { - // TODO(xiangli): validate the index of ents - for offset, ne := range ents { - if i := from + uint64(offset); !l.matchTerm(ne.Index, ne.Term) { - if i <= l.lastIndex() { +func (l *raftLog) findConflict(ents []pb.Entry) uint64 { + for _, ne := range ents { + if !l.matchTerm(ne.Index, ne.Term) { + if ne.Index <= l.lastIndex() { log.Printf("raftlog: found conflict at index %d [existing term: %d, conflicting term: %d]", - i, l.term(i), ne.Term) + ne.Index, l.term(ne.Index), ne.Term) } - return i + return ne.Index } } return 0 diff --git a/raft/log_test.go b/raft/log_test.go index 58ad414b1..9a5829b59 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -26,33 +26,32 @@ import ( func TestFindConflict(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} tests := []struct { - from uint64 ents []pb.Entry wconflict uint64 }{ // no conflict, empty ent - {1, []pb.Entry{}, 0}, - {3, []pb.Entry{}, 0}, + {[]pb.Entry{}, 0}, + {[]pb.Entry{}, 0}, // no conflict - {1, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}, 0}, - {2, []pb.Entry{{Index: 2, Term: 2}, {Index: 3, Term: 3}}, 0}, - {3, []pb.Entry{{Index: 3, Term: 3}}, 0}, + {[]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}, 0}, + {[]pb.Entry{{Index: 2, Term: 2}, {Index: 3, Term: 3}}, 0}, + {[]pb.Entry{{Index: 3, Term: 3}}, 0}, // no conflict, but has new entries - {1, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, - {2, []pb.Entry{{Index: 2, Term: 2}, {Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, - {3, []pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, - {4, []pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, + {[]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, + {[]pb.Entry{{Index: 2, Term: 2}, {Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, + {[]pb.Entry{{Index: 3, Term: 3}, {Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, + {[]pb.Entry{{Index: 4, Term: 4}, {Index: 5, Term: 4}}, 4}, // conflicts with existing entries - {1, []pb.Entry{{Index: 1, Term: 4}, {Index: 2, Term: 4}}, 1}, - {2, []pb.Entry{{Index: 2, Term: 1}, {Index: 3, Term: 4}, {Index: 4, Term: 4}}, 2}, - {3, []pb.Entry{{Index: 3, Term: 1}, {Index: 4, Term: 2}, {Index: 5, Term: 4}, {Index: 6, Term: 4}}, 3}, + {[]pb.Entry{{Index: 1, Term: 4}, {Index: 2, Term: 4}}, 1}, + {[]pb.Entry{{Index: 2, Term: 1}, {Index: 3, Term: 4}, {Index: 4, Term: 4}}, 2}, + {[]pb.Entry{{Index: 3, Term: 1}, {Index: 4, Term: 2}, {Index: 5, Term: 4}, {Index: 6, Term: 4}}, 3}, } for i, tt := range tests { raftLog := newLog(NewMemoryStorage()) - raftLog.append(raftLog.lastIndex(), previousEnts...) + raftLog.append(previousEnts...) - gconflict := raftLog.findConflict(tt.from, tt.ents) + gconflict := raftLog.findConflict(tt.ents) if gconflict != tt.wconflict { t.Errorf("#%d: conflict = %d, want %d", i, gconflict, tt.wconflict) } @@ -62,7 +61,7 @@ func TestFindConflict(t *testing.T) { func TestIsUpToDate(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} raftLog := newLog(NewMemoryStorage()) - raftLog.append(raftLog.lastIndex(), previousEnts...) + raftLog.append(previousEnts...) tests := []struct { lastIndex uint64 term uint64 @@ -93,21 +92,18 @@ func TestIsUpToDate(t *testing.T) { func TestAppend(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}} tests := []struct { - after uint64 ents []pb.Entry windex uint64 wents []pb.Entry wunstable uint64 }{ { - 2, []pb.Entry{}, 2, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 3, }, { - 2, []pb.Entry{{Index: 3, Term: 2}}, 3, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 2}}, @@ -115,7 +111,6 @@ func TestAppend(t *testing.T) { }, // conflicts with index 1 { - 0, []pb.Entry{{Index: 1, Term: 2}}, 1, []pb.Entry{{Index: 1, Term: 2}}, @@ -123,7 +118,6 @@ func TestAppend(t *testing.T) { }, // conflicts with index 2 { - 1, []pb.Entry{{Index: 2, Term: 3}, {Index: 3, Term: 3}}, 3, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 3}, {Index: 3, Term: 3}}, @@ -136,7 +130,7 @@ func TestAppend(t *testing.T) { storage.Append(previousEnts) raftLog := newLog(storage) - index := raftLog.append(tt.after, tt.ents...) + index := raftLog.append(tt.ents...) if index != tt.windex { t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex) } @@ -242,7 +236,7 @@ func TestLogMaybeAppend(t *testing.T) { for i, tt := range tests { raftLog := newLog(NewMemoryStorage()) - raftLog.append(raftLog.lastIndex(), previousEnts...) + raftLog.append(previousEnts...) raftLog.committed = commit func() { defer func() { @@ -288,7 +282,7 @@ func TestCompactionSideEffects(t *testing.T) { } raftLog := newLog(storage) for i = unstableIndex; i < lastIndex; i++ { - raftLog.append(i, pb.Entry{Term: uint64(i + 1), Index: uint64(i + 1)}) + raftLog.append(pb.Entry{Term: uint64(i + 1), Index: uint64(i + 1)}) } ok := raftLog.maybeCommit(lastIndex, lastTerm) @@ -325,7 +319,7 @@ func TestCompactionSideEffects(t *testing.T) { } prev := raftLog.lastIndex() - raftLog.append(raftLog.lastIndex(), pb.Entry{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex() + 1}) + raftLog.append(pb.Entry{Index: raftLog.lastIndex() + 1, Term: raftLog.lastIndex() + 1}) if raftLog.lastIndex() != prev+1 { t.Errorf("lastIndex = %d, want = %d", raftLog.lastIndex(), prev+1) } @@ -358,7 +352,7 @@ func TestNextEnts(t *testing.T) { storage := NewMemoryStorage() storage.ApplySnapshot(snap) raftLog := newLog(storage) - raftLog.append(snap.Metadata.Index, ents...) + raftLog.append(ents...) raftLog.maybeCommit(5, 1) raftLog.appliedTo(tt.applied) @@ -388,7 +382,7 @@ func TestUnstableEnts(t *testing.T) { // append unstable entries to raftlog raftLog := newLog(storage) - raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable-1:]...) + raftLog.append(previousEnts[tt.unstable-1:]...) ents := raftLog.unstableEntries() if l := len(ents); l > 0 { @@ -426,7 +420,7 @@ func TestCommitTo(t *testing.T) { } }() raftLog := newLog(NewMemoryStorage()) - raftLog.append(0, previousEnts...) + raftLog.append(previousEnts...) raftLog.committed = commit raftLog.commitTo(tt.commit) if raftLog.committed != tt.wcommit { @@ -449,7 +443,7 @@ func TestStableTo(t *testing.T) { } for i, tt := range tests { raftLog := newLog(NewMemoryStorage()) - raftLog.append(0, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) + raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) raftLog.stableTo(tt.stablei, tt.stablet) if raftLog.unstable.offset != tt.wunstable { t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable.offset, tt.wunstable) @@ -486,7 +480,7 @@ func TestStableToWithSnap(t *testing.T) { s := NewMemoryStorage() s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}}) raftLog := newLog(s) - raftLog.append(raftLog.lastIndex(), tt.newEnts...) + raftLog.append(tt.newEnts...) raftLog.stableTo(tt.stablei, tt.stablet) if raftLog.unstable.offset != tt.wunstable { t.Errorf("#%d: unstable = %d, want %d", i, raftLog.unstable.offset, tt.wunstable) @@ -574,7 +568,9 @@ func TestIsOutOfBounds(t *testing.T) { storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) l := newLog(storage) - l.append(offset, make([]pb.Entry, num)...) + for i := uint64(1); i <= num; i++ { + l.append(pb.Entry{Index: i + offset}) + } tests := []struct { index uint64 @@ -604,7 +600,7 @@ func TestTerm(t *testing.T) { storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) l := newLog(storage) for i = 1; i < num; i++ { - l.append(offset+i-1, pb.Entry{Index: i, Term: i}) + l.append(pb.Entry{Index: offset + i, Term: i}) } tests := []struct { @@ -635,7 +631,7 @@ func TestSlice(t *testing.T) { storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) l := newLog(storage) for i = 1; i < num; i++ { - l.append(offset+i-1, pb.Entry{Index: i, Term: i}) + l.append(pb.Entry{Index: offset + i, Term: offset + i}) } tests := []struct { @@ -645,8 +641,8 @@ func TestSlice(t *testing.T) { }{ {offset - 1, offset + 1, nil}, {offset, offset + 1, nil}, - {offset + num/2, offset + num/2 + 1, []pb.Entry{{Index: num / 2, Term: num / 2}}}, - {offset + num - 1, offset + num, []pb.Entry{{Index: num - 1, Term: num - 1}}}, + {offset + num/2, offset + num/2 + 1, []pb.Entry{{Index: offset + num/2, Term: offset + num/2}}}, + {offset + num - 1, offset + num, []pb.Entry{{Index: offset + num - 1, Term: offset + num - 1}}}, {offset + num, offset + num + 1, nil}, {offset + num/2, offset + num/2, nil}, diff --git a/raft/log_unstable.go b/raft/log_unstable.go index a43a50740..51fca6af0 100644 --- a/raft/log_unstable.go +++ b/raft/log_unstable.go @@ -103,7 +103,8 @@ func (u *unstable) restore(s pb.Snapshot) { u.snapshot = &s } -func (u *unstable) truncateAndAppend(after uint64, ents []pb.Entry) { +func (u *unstable) truncateAndAppend(ents []pb.Entry) { + after := ents[0].Index - 1 switch { case after < u.offset: log.Printf("raftlog: replace the unstable entries from index %d", after+1) diff --git a/raft/node.go b/raft/node.go index 81d1fdad5..95e01bd89 100644 --- a/raft/node.go +++ b/raft/node.go @@ -144,7 +144,7 @@ func StartNode(id uint64, peers []Peer, election, heartbeat int, storage Storage panic("unexpected marshal error") } e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d} - r.raftLog.append(r.raftLog.lastIndex(), e) + r.raftLog.append(e) } // Mark these initial entries as committed. // TODO(bdarnell): These entries are still unstable; do we need to preserve diff --git a/raft/raft.go b/raft/raft.go index 084e960a5..1db519518 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -320,7 +320,7 @@ func (r *raft) q() int { func (r *raft) appendEntry(e pb.Entry) { e.Term = r.Term e.Index = r.raftLog.lastIndex() + 1 - r.raftLog.append(r.raftLog.lastIndex(), e) + r.raftLog.append(e) r.prs[r.id].update(r.raftLog.lastIndex()) r.maybeCommit() } diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index b08456a47..8f8421c10 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -654,9 +654,9 @@ func TestFollowerAppendEntries(t *testing.T) { }, { 1, 1, - []pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}}, - []pb.Entry{{Term: 1, Index: 1}, {Term: 3, Index: 3}, {Term: 4, Index: 4}}, - []pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}}, + []pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}}, + []pb.Entry{{Term: 1, Index: 1}, {Term: 3, Index: 2}, {Term: 4, Index: 3}}, + []pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}}, }, { 0, 0, @@ -666,9 +666,9 @@ func TestFollowerAppendEntries(t *testing.T) { }, { 0, 0, - []pb.Entry{{Term: 3, Index: 3}}, - []pb.Entry{{Term: 3, Index: 3}}, - []pb.Entry{{Term: 3, Index: 3}}, + []pb.Entry{{Term: 3, Index: 1}}, + []pb.Entry{{Term: 3, Index: 1}}, + []pb.Entry{{Term: 3, Index: 1}}, }, } for i, tt := range tests { diff --git a/raft/raft_test.go b/raft/raft_test.go index 81ccfccb4..d4ed2ea5d 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -662,16 +662,16 @@ func TestHandleMsgApp(t *testing.T) { // Ensure 2 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false}, - {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Term: 2}}}, 1, 1, false}, - {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Term: 2}, {Term: 2}}}, 4, 3, false}, - {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 3, 3, false}, - {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Index: 1, Term: 2}}}, 1, 1, false}, + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Index: 3, Term: 2}, {Index: 4, Term: 2}}}, 4, 3, false}, + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Index: 3, Term: 2}}}, 3, 3, false}, + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // Ensure 3 - {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1 - {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2 - {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2 - {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last() + {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false}, // match entry 1, commit upto last new entry 1 + {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // match entry 1, commit upto last new entry 2 + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false}, // match entry 2, commit upto last new entry 2 + {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false}, // commit upto log.last() } for i, tt := range tests { @@ -1077,7 +1077,7 @@ func TestLeaderIncreaseNext(t *testing.T) { for i, tt := range tests { sm := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) - sm.raftLog.append(0, previousEnts...) + sm.raftLog.append(previousEnts...) sm.becomeCandidate() sm.becomeLeader() sm.prs[2].match, sm.prs[2].next = tt.match, tt.next @@ -1343,7 +1343,7 @@ func TestRaftNodes(t *testing.T) { } func ents(terms ...uint64) *raft { - ents := []pb.Entry{{}} + ents := []pb.Entry{} for i, term := range terms { ents = append(ents, pb.Entry{Index: uint64(i), Term: term}) }