diff --git a/raft/log.go b/raft/log.go index f591e1b35..94258a149 100644 --- a/raft/log.go +++ b/raft/log.go @@ -79,7 +79,19 @@ func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 { return l.lastIndex() } +// findConflict finds the index of the conflict. +// It returns the first pair of conflicting entries between the existing +// entries and the given entries, if there are any. +// If there is no conflicting entries, and the existing entries contains +// all the given entries, zero will be returned. +// If there is no conflicting entries, but the given entries contains new +// entries, the index of the first new entry will be returned. +// An entry is considered to be conflicting if it has the same index but +// a different term. +// The first entry MUST have an index equal to the argument 'from'. +// The index of the given entries MUST be continuously increasing. func (l *raftLog) findConflict(from uint64, ents []pb.Entry) uint64 { + // TODO(xiangli): validate the index of ents for i, ne := range ents { if oe := l.at(from + uint64(i)); oe == nil || oe.Term != ne.Term { return from + uint64(i) @@ -117,9 +129,9 @@ func (l *raftLog) resetNextEnts() { } } -func (l *raftLog) lastIndex() uint64 { - return uint64(len(l.ents)) - 1 + l.offset -} +func (l *raftLog) lastIndex() uint64 { return uint64(len(l.ents)) - 1 + l.offset } + +func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) } func (l *raftLog) term(i uint64) uint64 { if e := l.at(i); e != nil { @@ -138,9 +150,14 @@ func (l *raftLog) entries(i uint64) []pb.Entry { return l.slice(i, l.lastIndex()+1) } -func (l *raftLog) isUpToDate(i, term uint64) bool { - e := l.at(l.lastIndex()) - return term > e.Term || (term == e.Term && i >= l.lastIndex()) +// isUpToDate determines if the given (lastIndex,term) log is more up-to-date +// by comparing the index and term of the last entries in the existing logs. +// If the logs have last entries with different terms, then the log with the +// later term is more up-to-date. If the logs end with the same term, then +// whichever log has the larger lastIndex is more up-to-date. If the logs are +// the same, the given log is up-to-date. +func (l *raftLog) isUpToDate(lasti, term uint64) bool { + return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex()) } func (l *raftLog) matchTerm(i, term uint64) bool { diff --git a/raft/log_test.go b/raft/log_test.go index 56ab2b639..3cfa41a31 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -23,6 +23,73 @@ import ( pb "github.com/coreos/etcd/raft/raftpb" ) +func TestFindConflict(t *testing.T) { + previousEnts := []pb.Entry{{Term: 1}, {Term: 2}, {Term: 3}} + tests := []struct { + from uint64 + ents []pb.Entry + wconflict uint64 + }{ + // no conflict, empty ent + {1, []pb.Entry{}, 0}, + {3, []pb.Entry{}, 0}, + // no conflict + {1, []pb.Entry{{Term: 1}, {Term: 2}, {Term: 3}}, 0}, + {2, []pb.Entry{{Term: 2}, {Term: 3}}, 0}, + {3, []pb.Entry{{Term: 3}}, 0}, + // no conflict, but has new entries + {1, []pb.Entry{{Term: 1}, {Term: 2}, {Term: 3}, {Term: 4}, {Term: 4}}, 4}, + {2, []pb.Entry{{Term: 2}, {Term: 3}, {Term: 4}, {Term: 4}}, 4}, + {3, []pb.Entry{{Term: 3}, {Term: 4}, {Term: 4}}, 4}, + {4, []pb.Entry{{Term: 4}, {Term: 4}}, 4}, + // conflicts with existing entries + {1, []pb.Entry{{Term: 4}, {Term: 4}}, 1}, + {2, []pb.Entry{{Term: 1}, {Term: 4}, {Term: 4}}, 2}, + {3, []pb.Entry{{Term: 1}, {Term: 2}, {Term: 4}, {Term: 4}}, 3}, + } + + for i, tt := range tests { + raftLog := newLog() + raftLog.append(raftLog.lastIndex(), previousEnts...) + + gconflict := raftLog.findConflict(tt.from, tt.ents) + if gconflict != tt.wconflict { + t.Errorf("#%d: conflict = %d, want %d", i, gconflict, tt.wconflict) + } + } +} + +func TestIsUpToDate(t *testing.T) { + previousEnts := []pb.Entry{{Term: 1}, {Term: 2}, {Term: 3}} + raftLog := newLog() + raftLog.append(raftLog.lastIndex(), previousEnts...) + tests := []struct { + lastIndex uint64 + term uint64 + wUpToDate bool + }{ + // greater term, ignore lastIndex + {raftLog.lastIndex() - 1, 4, true}, + {raftLog.lastIndex(), 4, true}, + {raftLog.lastIndex() + 1, 4, true}, + // smaller term, ignore lastIndex + {raftLog.lastIndex() - 1, 2, false}, + {raftLog.lastIndex(), 2, false}, + {raftLog.lastIndex() + 1, 2, false}, + // equal term, lager lastIndex wins + {raftLog.lastIndex() - 1, 3, false}, + {raftLog.lastIndex(), 3, true}, + {raftLog.lastIndex() + 1, 3, true}, + } + + for i, tt := range tests { + gUpToDate := raftLog.isUpToDate(tt.lastIndex, tt.term) + if gUpToDate != tt.wUpToDate { + t.Errorf("#%d: uptodate = %v, want %v", i, gUpToDate, tt.wUpToDate) + } + } +} + // TestAppend ensures: // 1. If an existing entry conflicts with a new one (same index // but different terms), delete the existing entry and all that @@ -72,7 +139,7 @@ func TestAppend(t *testing.T) { for i, tt := range tests { raftLog := newLog() - raftLog.ents = append(raftLog.ents, previousEnts...) + raftLog.append(raftLog.lastIndex(), previousEnts...) raftLog.unstable = previousUnstable index := raftLog.append(tt.after, tt.ents...) if index != tt.windex { @@ -152,7 +219,7 @@ func TestUnstableEnts(t *testing.T) { for i, tt := range tests { raftLog := newLog() - raftLog.ents = append(raftLog.ents, previousEnts...) + raftLog.append(0, previousEnts...) raftLog.unstable = tt.unstable ents := raftLog.unstableEnts() raftLog.resetUnstable() diff --git a/raft/raft.go b/raft/raft.go index 89138bc87..fe3bf5f4f 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -349,8 +349,7 @@ func (r *raft) campaign() { if i == r.id { continue } - lasti := r.raftLog.lastIndex() - r.send(pb.Message{To: i, Type: pb.MsgVote, Index: lasti, LogTerm: r.raftLog.term(lasti)}) + r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()}) } }