raft: fix panic in send app

sendApp accesses the storage several times. Perviously, we
assume that the storage will not be modified during the read
opeartions. The assumption is not true since the storage can
be compacted between the read operations. If a compaction
causes a read entries error, we should not painc. Instead, we
can simply retry the sendApp logic until succeed.
This commit is contained in:
Xiang Li 2015-06-12 22:24:40 -07:00
parent 219d304291
commit b4022899eb
4 changed files with 148 additions and 55 deletions

View File

@ -114,7 +114,7 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
if !l.matchTerm(ne.Index, ne.Term) {
if ne.Index <= l.lastIndex() {
raftLogger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
ne.Index, l.term(ne.Index), ne.Term)
ne.Index, zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
}
return ne.Index
}
@ -135,7 +135,11 @@ func (l *raftLog) unstableEntries() []pb.Entry {
func (l *raftLog) nextEnts() (ents []pb.Entry) {
off := max(l.applied+1, l.firstIndex())
if l.committed+1 > off {
return l.slice(off, l.committed+1, noLimit)
ents, err := l.slice(off, l.committed+1, noLimit)
if err != nil {
raftLogger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
return ents
}
return nil
}
@ -193,38 +197,55 @@ func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
func (l *raftLog) lastTerm() uint64 { return l.term(l.lastIndex()) }
func (l *raftLog) lastTerm() uint64 {
t, err := l.term(l.lastIndex())
if err != nil {
raftLogger.Panicf("unexpected error when getting the last term (%v)", err)
}
return t
}
func (l *raftLog) term(i uint64) uint64 {
func (l *raftLog) term(i uint64) (uint64, error) {
// the valid term range is [index of dummy entry, last index]
dummyIndex := l.firstIndex() - 1
if i < dummyIndex || i > l.lastIndex() {
return 0
// TODO: return an error instead?
return 0, nil
}
if t, ok := l.unstable.maybeTerm(i); ok {
return t
return t, nil
}
t, err := l.storage.Term(i)
if err == nil {
return t
return t, nil
}
if err == ErrCompacted {
return 0
return 0, err
}
panic(err) // TODO(bdarnell)
}
func (l *raftLog) entries(i, maxsize uint64) []pb.Entry {
func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) {
if i > l.lastIndex() {
return nil
return nil, nil
}
return l.slice(i, l.lastIndex()+1, maxsize)
}
// allEntries returns all entries in the log.
func (l *raftLog) allEntries() []pb.Entry { return l.entries(l.firstIndex(), noLimit) }
func (l *raftLog) allEntries() []pb.Entry {
ents, err := l.entries(l.firstIndex(), noLimit)
if err == nil {
return ents
}
if err == ErrCompacted { // try again if there was a racing compaction
return l.allEntries()
}
// TODO (xiangli): handle error?
panic(err)
}
// isUpToDate determines if the given (lastIndex,term) log is more up-to-date
// by comparing the index and term of the last entries in the existing logs.
@ -236,10 +257,16 @@ func (l *raftLog) isUpToDate(lasti, term uint64) bool {
return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
}
func (l *raftLog) matchTerm(i, term uint64) bool { return l.term(i) == term }
func (l *raftLog) matchTerm(i, term uint64) bool {
t, err := l.term(i)
if err != nil {
return false
}
return t == term
}
func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
if maxIndex > l.committed && l.term(maxIndex) == term {
if maxIndex > l.committed && zeroTermOnErrCompacted(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
return true
}
@ -253,17 +280,19 @@ func (l *raftLog) restore(s pb.Snapshot) {
}
// slice returns a slice of log entries from lo through hi-1, inclusive.
func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry {
l.mustCheckOutOfBounds(lo, hi)
func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
err := l.mustCheckOutOfBounds(lo, hi)
if err != nil {
return nil, err
}
if lo == hi {
return nil
return nil, nil
}
var ents []pb.Entry
if lo < l.unstable.offset {
storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize)
if err == ErrCompacted {
// This should never fail because it has been checked before.
raftLogger.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset))
return nil, err
} else if err == ErrUnavailable {
raftLogger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
} else if err != nil {
@ -272,7 +301,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry {
// check if ents has reached the size limitation
if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo {
return storedEnts
return storedEnts, nil
}
ents = storedEnts
@ -286,16 +315,33 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) []pb.Entry {
ents = unstable
}
}
return limitSize(ents, maxSize)
return limitSize(ents, maxSize), nil
}
// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) {
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
if lo > hi {
raftLogger.Panicf("invalid slice %d > %d", lo, hi)
}
length := l.lastIndex() - l.firstIndex() + 1
if lo < l.firstIndex() || hi > l.firstIndex()+length {
raftLogger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, l.firstIndex(), l.lastIndex())
fi := l.firstIndex()
if lo < fi {
return ErrCompacted
}
length := l.lastIndex() - fi + 1
if lo < fi || hi > fi+length {
raftLogger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
}
return nil
}
func zeroTermOnErrCompacted(t uint64, err error) uint64 {
if err == nil {
return t
}
if err == ErrCompacted {
return 0
}
raftLogger.Panicf("unexpected error (%v)", err)
return 0
}

View File

@ -132,11 +132,15 @@ func TestAppend(t *testing.T) {
if index != tt.windex {
t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex)
}
if g := raftLog.entries(1, noLimit); !reflect.DeepEqual(g, tt.wents) {
g, err := raftLog.entries(1, noLimit)
if err != nil {
t.Fatalf("#%d: unexpected error %v", i, err)
}
if !reflect.DeepEqual(g, tt.wents) {
t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents)
}
if g := raftLog.unstable.offset; g != tt.wunstable {
t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable)
if goff := raftLog.unstable.offset; goff != tt.wunstable {
t.Errorf("#%d: unstable = %d, want %d", i, goff, tt.wunstable)
}
}
}
@ -257,7 +261,10 @@ func TestLogMaybeAppend(t *testing.T) {
t.Errorf("#%d: committed = %d, want %d", i, gcommit, tt.wcommit)
}
if gappend && len(tt.ents) != 0 {
gents := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit)
gents, err := raftLog.slice(raftLog.lastIndex()-uint64(len(tt.ents))+1, raftLog.lastIndex()+1, noLimit)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
if !reflect.DeepEqual(tt.ents, gents) {
t.Errorf("%d: appended entries = %v, want %v", i, gents, tt.ents)
}
@ -297,8 +304,8 @@ func TestCompactionSideEffects(t *testing.T) {
}
for j := offset; j <= raftLog.lastIndex(); j++ {
if raftLog.term(j) != j {
t.Errorf("term(%d) = %d, want %d", j, raftLog.term(j), j)
if mustTerm(raftLog.term(j)) != j {
t.Errorf("term(%d) = %d, want %d", j, mustTerm(raftLog.term(j)), j)
}
}
@ -322,7 +329,10 @@ func TestCompactionSideEffects(t *testing.T) {
t.Errorf("lastIndex = %d, want = %d", raftLog.lastIndex(), prev+1)
}
ents := raftLog.entries(raftLog.lastIndex(), noLimit)
ents, err := raftLog.entries(raftLog.lastIndex(), noLimit)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
if len(ents) != 1 {
t.Errorf("len(entries) = %d, want = %d", len(ents), 1)
}
@ -555,8 +565,8 @@ func TestLogRestore(t *testing.T) {
if raftLog.unstable.offset != index+1 {
t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1)
}
if raftLog.term(index) != term {
t.Errorf("term = %d, want %d", raftLog.term(index), term)
if mustTerm(raftLog.term(index)) != term {
t.Errorf("term = %d, want %d", mustTerm(raftLog.term(index)), term)
}
}
@ -572,40 +582,49 @@ func TestIsOutOfBounds(t *testing.T) {
first := offset + 1
tests := []struct {
lo, hi uint64
wpainc bool
lo, hi uint64
wpanic bool
wErrCompacted bool
}{
{
first - 2, first + 1,
false,
true,
},
{
first - 1, first + 1,
false,
true,
},
{
first, first,
false,
false,
},
{
first + num/2, first + num/2,
false,
false,
},
{
first + num - 1, first + num - 1,
false,
false,
},
{
first + num, first + num,
false,
false,
},
{
first + num, first + num + 1,
true,
false,
},
{
first + num + 1, first + num + 1,
true,
false,
},
}
@ -613,15 +632,21 @@ func TestIsOutOfBounds(t *testing.T) {
func() {
defer func() {
if r := recover(); r != nil {
if !tt.wpainc {
if !tt.wpanic {
t.Errorf("%d: panic = %v, want %v: %v", i, true, false, r)
}
}
}()
l.mustCheckOutOfBounds(tt.lo, tt.hi)
if tt.wpainc {
err := l.mustCheckOutOfBounds(tt.lo, tt.hi)
if tt.wpanic {
t.Errorf("%d: panic = %v, want %v", i, false, true)
}
if tt.wErrCompacted && err != ErrCompacted {
t.Errorf("%d: err = %v, want %v", i, err, ErrCompacted)
}
if !tt.wErrCompacted && err != nil {
t.Errorf("%d: unexpected err %v", i, err)
}
}()
}
}
@ -650,7 +675,7 @@ func TestTerm(t *testing.T) {
}
for j, tt := range tests {
term := l.term(tt.index)
term := mustTerm(l.term(tt.index))
if !reflect.DeepEqual(term, tt.w) {
t.Errorf("#%d: at = %d, want %d", j, term, tt.w)
}
@ -680,7 +705,7 @@ func TestTermWithUnstableSnapshot(t *testing.T) {
}
for i, tt := range tests {
term := l.term(tt.index)
term := mustTerm(l.term(tt.index))
if !reflect.DeepEqual(term, tt.w) {
t.Errorf("#%d: at = %d, want %d", i, term, tt.w)
}
@ -714,8 +739,8 @@ func TestSlice(t *testing.T) {
wpanic bool
}{
// test no limit
{offset - 1, offset + 1, noLimit, nil, true},
{offset, offset + 1, noLimit, nil, true},
{offset - 1, offset + 1, noLimit, nil, false},
{offset, offset + 1, noLimit, nil, false},
{half - 1, half + 1, noLimit, []pb.Entry{{Index: half - 1, Term: half - 1}, {Index: half, Term: half}}, false},
{half, half + 1, noLimit, []pb.Entry{{Index: half, Term: half}}, false},
{last - 1, last, noLimit, []pb.Entry{{Index: last - 1, Term: last - 1}}, false},
@ -739,10 +764,23 @@ func TestSlice(t *testing.T) {
}
}
}()
g := l.slice(tt.from, tt.to, tt.limit)
g, err := l.slice(tt.from, tt.to, tt.limit)
if tt.from <= offset && err != ErrCompacted {
t.Fatalf("#%d: err = %v, want %v", j, err, ErrCompacted)
}
if tt.from > offset && err != nil {
t.Fatalf("#%d: unexpected error %v", j, err)
}
if !reflect.DeepEqual(g, tt.w) {
t.Errorf("#%d: from %d to %d = %v, want %v", j, tt.from, tt.to, g, tt.w)
}
}()
}
}
func mustTerm(term uint64, err error) uint64 {
if err != nil {
panic(err)
}
return term
}

View File

@ -243,7 +243,11 @@ func (r *raft) sendAppend(to uint64) {
}
m := pb.Message{}
m.To = to
if r.needSnapshot(pr.Next) {
term, errt := r.raftLog.term(pr.Next - 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
@ -261,8 +265,8 @@ func (r *raft) sendAppend(to uint64) {
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1
m.LogTerm = r.raftLog.term(pr.Next - 1)
m.Entries = r.raftLog.entries(pr.Next, r.maxMsgSize)
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
@ -413,7 +417,12 @@ func (r *raft) becomeLeader() {
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
for _, e := range r.raftLog.entries(r.raftLog.committed+1, noLimit) {
ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
if err != nil {
raftLogger.Panicf("unexpected error getting uncommitted entries (%v)", err)
}
for _, e := range ents {
if e.Type != pb.EntryConfChange {
continue
}
@ -658,7 +667,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
raftLogger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From)
r.id, zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
@ -712,10 +721,6 @@ func (r *raft) restore(s pb.Snapshot) bool {
return true
}
func (r *raft) needSnapshot(i uint64) bool {
return i < r.raftLog.firstIndex()
}
// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list.
func (r *raft) promotable() bool {

View File

@ -1480,8 +1480,8 @@ func TestRestore(t *testing.T) {
if sm.raftLog.lastIndex() != s.Metadata.Index {
t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
}
if sm.raftLog.term(s.Metadata.Index) != s.Metadata.Term {
t.Errorf("log.lastTerm = %d, want %d", sm.raftLog.term(s.Metadata.Index), s.Metadata.Term)
if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
}
sg := sm.nodes()
if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) {
@ -1630,7 +1630,11 @@ func TestStepIgnoreConfig(t *testing.T) {
pendingConf := r.pendingConf
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
if ents := r.raftLog.entries(index+1, noLimit); !reflect.DeepEqual(ents, wents) {
ents, err := r.raftLog.entries(index+1, noLimit)
if err != nil {
t.Fatalf("unexpected error %v", err)
}
if !reflect.DeepEqual(ents, wents) {
t.Errorf("ents = %+v, want %+v", ents, wents)
}
if r.pendingConf != pendingConf {