From b4022899eb46018fe908263cbac8f87268c373c5 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 12 Jun 2015 22:24:40 -0700 Subject: [PATCH] raft: fix panic in send app sendApp accesses the storage several times. Perviously, we assume that the storage will not be modified during the read opeartions. The assumption is not true since the storage can be compacted between the read operations. If a compaction causes a read entries error, we should not painc. Instead, we can simply retry the sendApp logic until succeed. --- raft/log.go | 94 +++++++++++++++++++++++++++++++++++------------ raft/log_test.go | 76 ++++++++++++++++++++++++++++---------- raft/raft.go | 23 +++++++----- raft/raft_test.go | 10 +++-- 4 files changed, 148 insertions(+), 55 deletions(-) diff --git a/raft/log.go b/raft/log.go index 98a3c5f15..280d03112 100644 --- a/raft/log.go +++ b/raft/log.go @@ -114,7 +114,7 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { if !l.matchTerm(ne.Index, ne.Term) { if ne.Index <= l.lastIndex() { raftLogger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", - ne.Index, l.term(ne.Index), ne.Term) + ne.Index, zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term) } return ne.Index } @@ -135,7 +135,11 @@ func (l *raftLog) unstableEntries() []pb.Entry { func (l *raftLog) nextEnts() (ents []pb.Entry) { off := max(l.applied+1, l.firstIndex()) if l.committed+1 > off { - return l.slice(off, l.committed+1, noLimit) + ents, err := l.slice(off, l.committed+1, noLimit) + if err != nil { + raftLogger.Panicf("unexpected error when getting unapplied entries (%v)", err) + } + return ents } return nil } @@ -193,38 +197,55 @@ func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) } func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } -func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) } +func (l *raftLog) lastTerm() uint64 { + t, err := l.term(l.lastIndex()) + if err != nil { + raftLogger.Panicf("unexpected error when getting the last term (%v)", err) + } + return t +} -func (l *raftLog) term(i uint64) uint64 { +func (l *raftLog) term(i uint64) (uint64, error) { // the valid term range is [index of dummy entry, last index] dummyIndex := l.firstIndex() - 1 if i < dummyIndex || i > l.lastIndex() { - return 0 + // TODO: return an error instead? + return 0, nil } if t, ok := l.unstable.maybeTerm(i); ok { - return t + return t, nil } t, err := l.storage.Term(i) if err == nil { - return t + return t, nil } if err == ErrCompacted { - return 0 + return 0, err } panic(err) // TODO(bdarnell) } -func (l *raftLog) entries(i, maxsize uint64) []pb.Entry { +func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) { if i > l.lastIndex() { - return nil + return nil, nil } return l.slice(i, l.lastIndex()+1, maxsize) } // allEntries returns all entries in the log. -func (l *raftLog) allEntries() []pb.Entry { return l.entries(l.firstIndex(), noLimit) } +func (l *raftLog) allEntries() []pb.Entry { + ents, err := l.entries(l.firstIndex(), noLimit) + if err == nil { + return ents + } + if err == ErrCompacted { // try again if there was a racing compaction + return l.allEntries() + } + // TODO (xiangli): handle error? + panic(err) +} // isUpToDate determines if the given (lastIndex,term) log is more up-to-date // by comparing the index and term of the last entries in the existing logs. @@ -236,10 +257,16 @@ func (l *raftLog) isUpToDate(lasti, term uint64) bool { return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex()) } -func (l *raftLog) matchTerm(i, term uint64) bool { return l.term(i) == term } +func (l *raftLog) matchTerm(i, term uint64) bool { + t, err := l.term(i) + if err != nil { + return false + } + return t == term +} func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { - if maxIndex > l.committed && l.term(maxIndex) == term { + if maxIndex > l.committed && zeroTermOnErrCompacted(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } @@ -253,17 +280,19 @@ func (l *raftLog) restore(s pb.Snapshot) { } // slice returns a slice of log entries from lo through hi-1, inclusive. -func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry { - l.mustCheckOutOfBounds(lo, hi) +func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) { + err := l.mustCheckOutOfBounds(lo, hi) + if err != nil { + return nil, err + } if lo == hi { - return nil + return nil, nil } var ents []pb.Entry if lo < l.unstable.offset { storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize) if err == ErrCompacted { - // This should never fail because it has been checked before. - raftLogger.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset)) + return nil, err } else if err == ErrUnavailable { raftLogger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset)) } else if err != nil { @@ -272,7 +301,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry { // check if ents has reached the size limitation if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo { - return storedEnts + return storedEnts, nil } ents = storedEnts @@ -286,16 +315,33 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry { ents = unstable } } - return limitSize(ents, maxSize) + return limitSize(ents, maxSize), nil } // l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries) -func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) { +func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { if lo > hi { raftLogger.Panicf("invalid slice %d > %d", lo, hi) } - length := l.lastIndex() - l.firstIndex() + 1 - if lo < l.firstIndex() || hi > l.firstIndex()+length { - raftLogger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, l.firstIndex(), l.lastIndex()) + fi := l.firstIndex() + if lo < fi { + return ErrCompacted } + + length := l.lastIndex() - fi + 1 + if lo < fi || hi > fi+length { + raftLogger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex()) + } + return nil +} + +func zeroTermOnErrCompacted(t uint64, err error) uint64 { + if err == nil { + return t + } + if err == ErrCompacted { + return 0 + } + raftLogger.Panicf("unexpected error (%v)", err) + return 0 } diff --git a/raft/log_test.go b/raft/log_test.go index 88fe6ae61..bd9d928aa 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -132,11 +132,15 @@ func TestAppend(t *testing.T) { if index != tt.windex { t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex) } - if g := raftLog.entries(1, noLimit); !reflect.DeepEqual(g, tt.wents) { + g, err := raftLog.entries(1, noLimit) + if err != nil { + t.Fatalf("#%d: unexpected error %v", i, err) + } + if !reflect.DeepEqual(g, tt.wents) { t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents) } - if g := raftLog.unstable.offset; g != tt.wunstable { - t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable) + if goff := raftLog.unstable.offset; goff != tt.wunstable { + t.Errorf("#%d: unstable = %d, want %d", i, goff, tt.wunstable) } } } @@ -257,7 +261,10 @@ func TestLogMaybeAppend(t *testing.T) { t.Errorf("#%d: committed = %d, want %d", i, gcommit, tt.wcommit) } if gappend && len(tt.ents) != 0 { - gents := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit) + gents, err := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit) + if err != nil { + t.Fatalf("unexpected error %v", err) + } if !reflect.DeepEqual(tt.ents, gents) { t.Errorf("%d: appended entries = %v, want %v", i, gents, tt.ents) } @@ -297,8 +304,8 @@ func TestCompactionSideEffects(t *testing.T) { } for j := offset; j <= raftLog.lastIndex(); j++ { - if raftLog.term(j) != j { - t.Errorf("term(%d) = %d, want %d", j, raftLog.term(j), j) + if mustTerm(raftLog.term(j)) != j { + t.Errorf("term(%d) = %d, want %d", j, mustTerm(raftLog.term(j)), j) } } @@ -322,7 +329,10 @@ func TestCompactionSideEffects(t *testing.T) { t.Errorf("lastIndex = %d, want = %d", raftLog.lastIndex(), prev+1) } - ents := raftLog.entries(raftLog.lastIndex(), noLimit) + ents, err := raftLog.entries(raftLog.lastIndex(), noLimit) + if err != nil { + t.Fatalf("unexpected error %v", err) + } if len(ents) != 1 { t.Errorf("len(entries) = %d, want = %d", len(ents), 1) } @@ -555,8 +565,8 @@ func TestLogRestore(t *testing.T) { if raftLog.unstable.offset != index+1 { t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1) } - if raftLog.term(index) != term { - t.Errorf("term = %d, want %d", raftLog.term(index), term) + if mustTerm(raftLog.term(index)) != term { + t.Errorf("term = %d, want %d", mustTerm(raftLog.term(index)), term) } } @@ -572,40 +582,49 @@ func TestIsOutOfBounds(t *testing.T) { first := offset + 1 tests := []struct { - lo, hi uint64 - wpainc bool + lo, hi uint64 + wpanic bool + wErrCompacted bool }{ { first - 2, first + 1, + false, true, }, { first - 1, first + 1, + false, true, }, { first, first, false, + false, }, { first + num/2, first + num/2, false, + false, }, { first + num - 1, first + num - 1, false, + false, }, { first + num, first + num, false, + false, }, { first + num, first + num + 1, true, + false, }, { first + num + 1, first + num + 1, true, + false, }, } @@ -613,15 +632,21 @@ func TestIsOutOfBounds(t *testing.T) { func() { defer func() { if r := recover(); r != nil { - if !tt.wpainc { + if !tt.wpanic { t.Errorf("%d: panic = %v, want %v: %v", i, true, false, r) } } }() - l.mustCheckOutOfBounds(tt.lo, tt.hi) - if tt.wpainc { + err := l.mustCheckOutOfBounds(tt.lo, tt.hi) + if tt.wpanic { t.Errorf("%d: panic = %v, want %v", i, false, true) } + if tt.wErrCompacted && err != ErrCompacted { + t.Errorf("%d: err = %v, want %v", i, err, ErrCompacted) + } + if !tt.wErrCompacted && err != nil { + t.Errorf("%d: unexpected err %v", i, err) + } }() } } @@ -650,7 +675,7 @@ func TestTerm(t *testing.T) { } for j, tt := range tests { - term := l.term(tt.index) + term := mustTerm(l.term(tt.index)) if !reflect.DeepEqual(term, tt.w) { t.Errorf("#%d: at = %d, want %d", j, term, tt.w) } @@ -680,7 +705,7 @@ func TestTermWithUnstableSnapshot(t *testing.T) { } for i, tt := range tests { - term := l.term(tt.index) + term := mustTerm(l.term(tt.index)) if !reflect.DeepEqual(term, tt.w) { t.Errorf("#%d: at = %d, want %d", i, term, tt.w) } @@ -714,8 +739,8 @@ func TestSlice(t *testing.T) { wpanic bool }{ // test no limit - {offset - 1, offset + 1, noLimit, nil, true}, - {offset, offset + 1, noLimit, nil, true}, + {offset - 1, offset + 1, noLimit, nil, false}, + {offset, offset + 1, noLimit, nil, false}, {half - 1, half + 1, noLimit, []pb.Entry{{Index: half - 1, Term: half - 1}, {Index: half, Term: half}}, false}, {half, half + 1, noLimit, []pb.Entry{{Index: half, Term: half}}, false}, {last - 1, last, noLimit, []pb.Entry{{Index: last - 1, Term: last - 1}}, false}, @@ -739,10 +764,23 @@ func TestSlice(t *testing.T) { } } }() - g := l.slice(tt.from, tt.to, tt.limit) + g, err := l.slice(tt.from, tt.to, tt.limit) + if tt.from <= offset && err != ErrCompacted { + t.Fatalf("#%d: err = %v, want %v", j, err, ErrCompacted) + } + if tt.from > offset && err != nil { + t.Fatalf("#%d: unexpected error %v", j, err) + } if !reflect.DeepEqual(g, tt.w) { t.Errorf("#%d: from %d to %d = %v, want %v", j, tt.from, tt.to, g, tt.w) } }() } } + +func mustTerm(term uint64, err error) uint64 { + if err != nil { + panic(err) + } + return term +} diff --git a/raft/raft.go b/raft/raft.go index fda500efa..0485182b5 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -243,7 +243,11 @@ func (r *raft) sendAppend(to uint64) { } m := pb.Message{} m.To = to - if r.needSnapshot(pr.Next) { + + term, errt := r.raftLog.term(pr.Next - 1) + ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) + + if errt != nil || erre != nil { // send snapshot if we failed to get term or entries m.Type = pb.MsgSnap snapshot, err := r.raftLog.snapshot() if err != nil { @@ -261,8 +265,8 @@ func (r *raft) sendAppend(to uint64) { } else { m.Type = pb.MsgApp m.Index = pr.Next - 1 - m.LogTerm = r.raftLog.term(pr.Next - 1) - m.Entries = r.raftLog.entries(pr.Next, r.maxMsgSize) + m.LogTerm = term + m.Entries = ents m.Commit = r.raftLog.committed if n := len(m.Entries); n != 0 { switch pr.State { @@ -413,7 +417,12 @@ func (r *raft) becomeLeader() { r.tick = r.tickHeartbeat r.lead = r.id r.state = StateLeader - for _, e := range r.raftLog.entries(r.raftLog.committed+1, noLimit) { + ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit) + if err != nil { + raftLogger.Panicf("unexpected error getting uncommitted entries (%v)", err) + } + + for _, e := range ents { if e.Type != pb.EntryConfChange { continue } @@ -658,7 +667,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { raftLogger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x", - r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From) + r.id, zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()}) } } @@ -712,10 +721,6 @@ func (r *raft) restore(s pb.Snapshot) bool { return true } -func (r *raft) needSnapshot(i uint64) bool { - return i < r.raftLog.firstIndex() -} - // promotable indicates whether state machine can be promoted to leader, // which is true when its own id is in progress list. func (r *raft) promotable() bool { diff --git a/raft/raft_test.go b/raft/raft_test.go index b377649bf..3115d2cfa 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1480,8 +1480,8 @@ func TestRestore(t *testing.T) { if sm.raftLog.lastIndex() != s.Metadata.Index { t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index) } - if sm.raftLog.term(s.Metadata.Index) != s.Metadata.Term { - t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Metadata.Index), s.Metadata.Term) + if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term { + t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term) } sg := sm.nodes() if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) { @@ -1630,7 +1630,11 @@ func TestStepIgnoreConfig(t *testing.T) { pendingConf := r.pendingConf r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}} - if ents := r.raftLog.entries(index+1, noLimit); !reflect.DeepEqual(ents, wents) { + ents, err := r.raftLog.entries(index+1, noLimit) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + if !reflect.DeepEqual(ents, wents) { t.Errorf("ents = %+v, want %+v", ents, wents) } if r.pendingConf != pendingConf {