Merge pull request #14721 from nvanbenschoten/nvanbenschoten/noCommittedOnSnap

raft: don't apply entries when applying snapshot
This commit is contained in:
Benjamin Wang 2022-11-15 06:41:12 +08:00 committed by GitHub
commit 970ecfcddb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 18 deletions

View File

@ -181,9 +181,13 @@ func (l *raftLog) unstableEntries() []pb.Entry {
// If applied is smaller than the index of snapshot, it returns all committed // If applied is smaller than the index of snapshot, it returns all committed
// entries after the index of snapshot. // entries after the index of snapshot.
func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) { func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) {
off := max(l.applied+1, l.firstIndex()) if l.hasPendingSnapshot() {
if l.committed+1 > off { // See comment in hasNextCommittedEnts.
ents, err := l.slice(off, l.committed+1, l.maxNextCommittedEntsSize) return nil
}
if l.committed > l.applied {
lo, hi := l.applied+1, l.committed+1 // [lo, hi)
ents, err := l.slice(lo, hi, l.maxNextCommittedEntsSize)
if err != nil { if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
} }
@ -195,13 +199,18 @@ func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) {
// hasNextCommittedEnts returns if there is any available entries for execution. // hasNextCommittedEnts returns if there is any available entries for execution.
// This is a fast check without heavy raftLog.slice() in nextCommittedEnts(). // This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
func (l *raftLog) hasNextCommittedEnts() bool { func (l *raftLog) hasNextCommittedEnts() bool {
off := max(l.applied+1, l.firstIndex()) if l.hasPendingSnapshot() {
return l.committed+1 > off // If we have a snapshot to apply, don't also return any committed
// entries. Doing so raises questions about what should be applied
// first.
return false
}
return l.committed > l.applied
} }
// hasPendingSnapshot returns if there is pending snapshot waiting for applying. // hasPendingSnapshot returns if there is pending snapshot waiting for applying.
func (l *raftLog) hasPendingSnapshot() bool { func (l *raftLog) hasPendingSnapshot() bool {
return l.unstable.snapshot != nil && !IsEmptySnap(*l.unstable.snapshot) return l.unstable.snapshot != nil
} }
func (l *raftLog) snapshot() (pb.Snapshot, error) { func (l *raftLog) snapshot() (pb.Snapshot, error) {

View File

@ -350,12 +350,15 @@ func TestHasNextCommittedEnts(t *testing.T) {
} }
tests := []struct { tests := []struct {
applied uint64 applied uint64
hasNext bool snap bool
whasNext bool
}{ }{
{0, true}, {applied: 0, snap: false, whasNext: true},
{3, true}, {applied: 3, snap: false, whasNext: true},
{4, true}, {applied: 4, snap: false, whasNext: true},
{5, false}, {applied: 5, snap: false, whasNext: false},
// With snapshot.
{applied: 3, snap: true, whasNext: false},
} }
for i, tt := range tests { for i, tt := range tests {
storage := NewMemoryStorage() storage := NewMemoryStorage()
@ -364,10 +367,15 @@ func TestHasNextCommittedEnts(t *testing.T) {
raftLog.append(ents...) raftLog.append(ents...)
raftLog.maybeCommit(5, 1) raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied) raftLog.appliedTo(tt.applied)
if tt.snap {
newSnap := snap
newSnap.Metadata.Index++
raftLog.restore(newSnap)
}
hasNext := raftLog.hasNextCommittedEnts() hasNext := raftLog.hasNextCommittedEnts()
if hasNext != tt.hasNext { if hasNext != tt.whasNext {
t.Errorf("#%d: hasNext = %v, want %v", i, hasNext, tt.hasNext) t.Errorf("#%d: hasNext = %v, want %v", i, hasNext, tt.whasNext)
} }
} }
} }
@ -383,12 +391,15 @@ func TestNextCommittedEnts(t *testing.T) {
} }
tests := []struct { tests := []struct {
applied uint64 applied uint64
snap bool
wents []pb.Entry wents []pb.Entry
}{ }{
{0, ents[:2]}, {applied: 0, snap: false, wents: ents[:2]},
{3, ents[:2]}, {applied: 3, snap: false, wents: ents[:2]},
{4, ents[1:2]}, {applied: 4, snap: false, wents: ents[1:2]},
{5, nil}, {applied: 5, snap: false, wents: nil},
// With snapshot.
{applied: 3, snap: true, wents: nil},
} }
for i, tt := range tests { for i, tt := range tests {
storage := NewMemoryStorage() storage := NewMemoryStorage()
@ -397,6 +408,11 @@ func TestNextCommittedEnts(t *testing.T) {
raftLog.append(ents...) raftLog.append(ents...)
raftLog.maybeCommit(5, 1) raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied) raftLog.appliedTo(tt.applied)
if tt.snap {
newSnap := snap
newSnap.Metadata.Index++
raftLog.restore(newSnap)
}
nents := raftLog.nextCommittedEnts() nents := raftLog.nextCommittedEnts()
if !reflect.DeepEqual(nents, tt.wents) { if !reflect.DeepEqual(nents, tt.wents) {