diff --git a/raft/log.go b/raft/log.go index 98a3c5f15..280d03112 100644 --- a/raft/log.go +++ b/raft/log.go @@ -114,7 +114,7 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { if !l.matchTerm(ne.Index, ne.Term) { if ne.Index <= l.lastIndex() { raftLogger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", - ne.Index, l.term(ne.Index), ne.Term) + ne.Index, zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term) } return ne.Index } @@ -135,7 +135,11 @@ func (l *raftLog) unstableEntries() []pb.Entry { func (l *raftLog) nextEnts() (ents []pb.Entry) { off := max(l.applied+1, l.firstIndex()) if l.committed+1 > off { - return l.slice(off, l.committed+1, noLimit) + ents, err := l.slice(off, l.committed+1, noLimit) + if err != nil { + raftLogger.Panicf("unexpected error when getting unapplied entries (%v)", err) + } + return ents } return nil } @@ -193,38 +197,55 @@ func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) } func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } -func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) } +func (l *raftLog) lastTerm() uint64 { + t, err := l.term(l.lastIndex()) + if err != nil { + raftLogger.Panicf("unexpected error when getting the last term (%v)", err) + } + return t +} -func (l *raftLog) term(i uint64) uint64 { +func (l *raftLog) term(i uint64) (uint64, error) { // the valid term range is [index of dummy entry, last index] dummyIndex := l.firstIndex() - 1 if i < dummyIndex || i > l.lastIndex() { - return 0 + // TODO: return an error instead? + return 0, nil } if t, ok := l.unstable.maybeTerm(i); ok { - return t + return t, nil } t, err := l.storage.Term(i) if err == nil { - return t + return t, nil } if err == ErrCompacted { - return 0 + return 0, err } panic(err) // TODO(bdarnell) } -func (l *raftLog) entries(i, maxsize uint64) []pb.Entry { +func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) { if i > l.lastIndex() { - return nil + return nil, nil } return l.slice(i, l.lastIndex()+1, maxsize) } // allEntries returns all entries in the log. -func (l *raftLog) allEntries() []pb.Entry { return l.entries(l.firstIndex(), noLimit) } +func (l *raftLog) allEntries() []pb.Entry { + ents, err := l.entries(l.firstIndex(), noLimit) + if err == nil { + return ents + } + if err == ErrCompacted { // try again if there was a racing compaction + return l.allEntries() + } + // TODO (xiangli): handle error? + panic(err) +} // isUpToDate determines if the given (lastIndex,term) log is more up-to-date // by comparing the index and term of the last entries in the existing logs. @@ -236,10 +257,16 @@ func (l *raftLog) isUpToDate(lasti, term uint64) bool { return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex()) } -func (l *raftLog) matchTerm(i, term uint64) bool { return l.term(i) == term } +func (l *raftLog) matchTerm(i, term uint64) bool { + t, err := l.term(i) + if err != nil { + return false + } + return t == term +} func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { - if maxIndex > l.committed && l.term(maxIndex) == term { + if maxIndex > l.committed && zeroTermOnErrCompacted(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } @@ -253,17 +280,19 @@ func (l *raftLog) restore(s pb.Snapshot) { } // slice returns a slice of log entries from lo through hi-1, inclusive. -func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry { - l.mustCheckOutOfBounds(lo, hi) +func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) { + err := l.mustCheckOutOfBounds(lo, hi) + if err != nil { + return nil, err + } if lo == hi { - return nil + return nil, nil } var ents []pb.Entry if lo < l.unstable.offset { storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize) if err == ErrCompacted { - // This should never fail because it has been checked before. - raftLogger.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset)) + return nil, err } else if err == ErrUnavailable { raftLogger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset)) } else if err != nil { @@ -272,7 +301,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry { // check if ents has reached the size limitation if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo { - return storedEnts + return storedEnts, nil } ents = storedEnts @@ -286,16 +315,33 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry { ents = unstable } } - return limitSize(ents, maxSize) + return limitSize(ents, maxSize), nil } // l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries) -func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) { +func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { if lo > hi { raftLogger.Panicf("invalid slice %d > %d", lo, hi) } - length := l.lastIndex() - l.firstIndex() + 1 - if lo < l.firstIndex() || hi > l.firstIndex()+length { - raftLogger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, l.firstIndex(), l.lastIndex()) + fi := l.firstIndex() + if lo < fi { + return ErrCompacted } + + length := l.lastIndex() - fi + 1 + if lo < fi || hi > fi+length { + raftLogger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex()) + } + return nil +} + +func zeroTermOnErrCompacted(t uint64, err error) uint64 { + if err == nil { + return t + } + if err == ErrCompacted { + return 0 + } + raftLogger.Panicf("unexpected error (%v)", err) + return 0 } diff --git a/raft/log_test.go b/raft/log_test.go index 88fe6ae61..bd9d928aa 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -132,11 +132,15 @@ func TestAppend(t *testing.T) { if index != tt.windex { t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex) } - if g := raftLog.entries(1, noLimit); !reflect.DeepEqual(g, tt.wents) { + g, err := raftLog.entries(1, noLimit) + if err != nil { + t.Fatalf("#%d: unexpected error %v", i, err) + } + if !reflect.DeepEqual(g, tt.wents) { t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents) } - if g := raftLog.unstable.offset; g != tt.wunstable { - t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable) + if goff := raftLog.unstable.offset; goff != tt.wunstable { + t.Errorf("#%d: unstable = %d, want %d", i, goff, tt.wunstable) } } } @@ -257,7 +261,10 @@ func TestLogMaybeAppend(t *testing.T) { t.Errorf("#%d: committed = %d, want %d", i, gcommit, tt.wcommit) } if gappend && len(tt.ents) != 0 { - gents := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit) + gents, err := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit) + if err != nil { + t.Fatalf("unexpected error %v", err) + } if !reflect.DeepEqual(tt.ents, gents) { t.Errorf("%d: appended entries = %v, want %v", i, gents, tt.ents) } @@ -297,8 +304,8 @@ func TestCompactionSideEffects(t *testing.T) { } for j := offset; j <= raftLog.lastIndex(); j++ { - if raftLog.term(j) != j { - t.Errorf("term(%d) = %d, want %d", j, raftLog.term(j), j) + if mustTerm(raftLog.term(j)) != j { + t.Errorf("term(%d) = %d, want %d", j, mustTerm(raftLog.term(j)), j) } } @@ -322,7 +329,10 @@ func TestCompactionSideEffects(t *testing.T) { t.Errorf("lastIndex = %d, want = %d", raftLog.lastIndex(), prev+1) } - ents := raftLog.entries(raftLog.lastIndex(), noLimit) + ents, err := raftLog.entries(raftLog.lastIndex(), noLimit) + if err != nil { + t.Fatalf("unexpected error %v", err) + } if len(ents) != 1 { t.Errorf("len(entries) = %d, want = %d", len(ents), 1) } @@ -555,8 +565,8 @@ func TestLogRestore(t *testing.T) { if raftLog.unstable.offset != index+1 { t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1) } - if raftLog.term(index) != term { - t.Errorf("term = %d, want %d", raftLog.term(index), term) + if mustTerm(raftLog.term(index)) != term { + t.Errorf("term = %d, want %d", mustTerm(raftLog.term(index)), term) } } @@ -572,40 +582,49 @@ func TestIsOutOfBounds(t *testing.T) { first := offset + 1 tests := []struct { - lo, hi uint64 - wpainc bool + lo, hi uint64 + wpanic bool + wErrCompacted bool }{ { first - 2, first + 1, + false, true, }, { first - 1, first + 1, + false, true, }, { first, first, false, + false, }, { first + num/2, first + num/2, false, + false, }, { first + num - 1, first + num - 1, false, + false, }, { first + num, first + num, false, + false, }, { first + num, first + num + 1, true, + false, }, { first + num + 1, first + num + 1, true, + false, }, } @@ -613,15 +632,21 @@ func TestIsOutOfBounds(t *testing.T) { func() { defer func() { if r := recover(); r != nil { - if !tt.wpainc { + if !tt.wpanic { t.Errorf("%d: panic = %v, want %v: %v", i, true, false, r) } } }() - l.mustCheckOutOfBounds(tt.lo, tt.hi) - if tt.wpainc { + err := l.mustCheckOutOfBounds(tt.lo, tt.hi) + if tt.wpanic { t.Errorf("%d: panic = %v, want %v", i, false, true) } + if tt.wErrCompacted && err != ErrCompacted { + t.Errorf("%d: err = %v, want %v", i, err, ErrCompacted) + } + if !tt.wErrCompacted && err != nil { + t.Errorf("%d: unexpected err %v", i, err) + } }() } } @@ -650,7 +675,7 @@ func TestTerm(t *testing.T) { } for j, tt := range tests { - term := l.term(tt.index) + term := mustTerm(l.term(tt.index)) if !reflect.DeepEqual(term, tt.w) { t.Errorf("#%d: at = %d, want %d", j, term, tt.w) } @@ -680,7 +705,7 @@ func TestTermWithUnstableSnapshot(t *testing.T) { } for i, tt := range tests { - term := l.term(tt.index) + term := mustTerm(l.term(tt.index)) if !reflect.DeepEqual(term, tt.w) { t.Errorf("#%d: at = %d, want %d", i, term, tt.w) } @@ -714,8 +739,8 @@ func TestSlice(t *testing.T) { wpanic bool }{ // test no limit - {offset - 1, offset + 1, noLimit, nil, true}, - {offset, offset + 1, noLimit, nil, true}, + {offset - 1, offset + 1, noLimit, nil, false}, + {offset, offset + 1, noLimit, nil, false}, {half - 1, half + 1, noLimit, []pb.Entry{{Index: half - 1, Term: half - 1}, {Index: half, Term: half}}, false}, {half, half + 1, noLimit, []pb.Entry{{Index: half, Term: half}}, false}, {last - 1, last, noLimit, []pb.Entry{{Index: last - 1, Term: last - 1}}, false}, @@ -739,10 +764,23 @@ func TestSlice(t *testing.T) { } } }() - g := l.slice(tt.from, tt.to, tt.limit) + g, err := l.slice(tt.from, tt.to, tt.limit) + if tt.from <= offset && err != ErrCompacted { + t.Fatalf("#%d: err = %v, want %v", j, err, ErrCompacted) + } + if tt.from > offset && err != nil { + t.Fatalf("#%d: unexpected error %v", j, err) + } if !reflect.DeepEqual(g, tt.w) { t.Errorf("#%d: from %d to %d = %v, want %v", j, tt.from, tt.to, g, tt.w) } }() } } + +func mustTerm(term uint64, err error) uint64 { + if err != nil { + panic(err) + } + return term +} diff --git a/raft/raft.go b/raft/raft.go index fda500efa..0485182b5 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -243,7 +243,11 @@ func (r *raft) sendAppend(to uint64) { } m := pb.Message{} m.To = to - if r.needSnapshot(pr.Next) { + + term, errt := r.raftLog.term(pr.Next - 1) + ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) + + if errt != nil || erre != nil { // send snapshot if we failed to get term or entries m.Type = pb.MsgSnap snapshot, err := r.raftLog.snapshot() if err != nil { @@ -261,8 +265,8 @@ func (r *raft) sendAppend(to uint64) { } else { m.Type = pb.MsgApp m.Index = pr.Next - 1 - m.LogTerm = r.raftLog.term(pr.Next - 1) - m.Entries = r.raftLog.entries(pr.Next, r.maxMsgSize) + m.LogTerm = term + m.Entries = ents m.Commit = r.raftLog.committed if n := len(m.Entries); n != 0 { switch pr.State { @@ -413,7 +417,12 @@ func (r *raft) becomeLeader() { r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader - for _, e := range r.raftLog.entries(r.raftLog.committed+1, noLimit) { + ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit) + if err != nil { + raftLogger.Panicf("unexpected error getting uncommitted entries (%v)", err) + } + + for _, e := range ents { if e.Type != pb.EntryConfChange { continue } @@ -658,7 +667,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { raftLogger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x", - r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From) + r.id, zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()}) } } @@ -712,10 +721,6 @@ func (r *raft) restore(s pb.Snapshot) bool { return true } -func (r *raft) needSnapshot(i uint64) bool { - return i < r.raftLog.firstIndex() -} - // promotable indicates whether state machine can be promoted to leader, // which is true when its own id is in progress list. func (r *raft) promotable() bool { diff --git a/raft/raft_test.go b/raft/raft_test.go index b377649bf..3115d2cfa 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1480,8 +1480,8 @@ func TestRestore(t *testing.T) { if sm.raftLog.lastIndex() != s.Metadata.Index { t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index) } - if sm.raftLog.term(s.Metadata.Index) != s.Metadata.Term { - t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Metadata.Index), s.Metadata.Term) + if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term { + t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) } sg := sm.nodes() if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) { @@ -1630,7 +1630,11 @@ func TestStepIgnoreConfig(t *testing.T) { pendingConf := r.pendingConf r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}} - if ents := r.raftLog.entries(index+1, noLimit); !reflect.DeepEqual(ents, wents) { + ents, err := r.raftLog.entries(index+1, noLimit) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + if !reflect.DeepEqual(ents, wents) { t.Errorf("ents = %+v, want %+v", ents, wents) } if r.pendingConf != pendingConf {