mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: don't apply entries when applying snapshot
This commit removes the ability to apply log entries at the same time as applying a snapshot. Doing so it possible, but it leads to complex code and raises questions about what should be applied first. It also raises additional complexity when we start allowing concurrent, asynchronous log appends and log application. It's easiest to just disallow this. Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
This commit is contained in:
parent
95c5fed3cf
commit
539a8410f4
19
raft/log.go
19
raft/log.go
@ -181,9 +181,13 @@ func (l *raftLog) unstableEntries() []pb.Entry {
|
||||
// If applied is smaller than the index of snapshot, it returns all committed
|
||||
// entries after the index of snapshot.
|
||||
func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) {
|
||||
off := max(l.applied+1, l.firstIndex())
|
||||
if l.committed+1 > off {
|
||||
ents, err := l.slice(off, l.committed+1, l.maxNextCommittedEntsSize)
|
||||
if l.hasPendingSnapshot() {
|
||||
// See comment in hasNextCommittedEnts.
|
||||
return nil
|
||||
}
|
||||
if l.committed > l.applied {
|
||||
lo, hi := l.applied+1, l.committed+1 // [lo, hi)
|
||||
ents, err := l.slice(lo, hi, l.maxNextCommittedEntsSize)
|
||||
if err != nil {
|
||||
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
|
||||
}
|
||||
@ -195,8 +199,13 @@ func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) {
|
||||
// hasNextCommittedEnts returns if there is any available entries for execution.
|
||||
// This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
|
||||
func (l *raftLog) hasNextCommittedEnts() bool {
|
||||
off := max(l.applied+1, l.firstIndex())
|
||||
return l.committed+1 > off
|
||||
if l.hasPendingSnapshot() {
|
||||
// If we have a snapshot to apply, don't also return any committed
|
||||
// entries. Doing so raises questions about what should be applied
|
||||
// first.
|
||||
return false
|
||||
}
|
||||
return l.committed > l.applied
|
||||
}
|
||||
|
||||
// hasPendingSnapshot returns if there is pending snapshot waiting for applying.
|
||||
|
@ -349,13 +349,16 @@ func TestHasNextCommittedEnts(t *testing.T) {
|
||||
{Term: 1, Index: 6},
|
||||
}
|
||||
tests := []struct {
|
||||
applied uint64
|
||||
hasNext bool
|
||||
applied uint64
|
||||
snap bool
|
||||
whasNext bool
|
||||
}{
|
||||
{0, true},
|
||||
{3, true},
|
||||
{4, true},
|
||||
{5, false},
|
||||
{applied: 0, snap: false, whasNext: true},
|
||||
{applied: 3, snap: false, whasNext: true},
|
||||
{applied: 4, snap: false, whasNext: true},
|
||||
{applied: 5, snap: false, whasNext: false},
|
||||
// With snapshot.
|
||||
{applied: 3, snap: true, whasNext: false},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
storage := NewMemoryStorage()
|
||||
@ -364,10 +367,15 @@ func TestHasNextCommittedEnts(t *testing.T) {
|
||||
raftLog.append(ents...)
|
||||
raftLog.maybeCommit(5, 1)
|
||||
raftLog.appliedTo(tt.applied)
|
||||
if tt.snap {
|
||||
newSnap := snap
|
||||
newSnap.Metadata.Index++
|
||||
raftLog.restore(newSnap)
|
||||
}
|
||||
|
||||
hasNext := raftLog.hasNextCommittedEnts()
|
||||
if hasNext != tt.hasNext {
|
||||
t.Errorf("#%d: hasNext = %v, want %v", i, hasNext, tt.hasNext)
|
||||
if hasNext != tt.whasNext {
|
||||
t.Errorf("#%d: hasNext = %v, want %v", i, hasNext, tt.whasNext)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -383,12 +391,15 @@ func TestNextCommittedEnts(t *testing.T) {
|
||||
}
|
||||
tests := []struct {
|
||||
applied uint64
|
||||
snap bool
|
||||
wents []pb.Entry
|
||||
}{
|
||||
{0, ents[:2]},
|
||||
{3, ents[:2]},
|
||||
{4, ents[1:2]},
|
||||
{5, nil},
|
||||
{applied: 0, snap: false, wents: ents[:2]},
|
||||
{applied: 3, snap: false, wents: ents[:2]},
|
||||
{applied: 4, snap: false, wents: ents[1:2]},
|
||||
{applied: 5, snap: false, wents: nil},
|
||||
// With snapshot.
|
||||
{applied: 3, snap: true, wents: nil},
|
||||
}
|
||||
for i, tt := range tests {
|
||||
storage := NewMemoryStorage()
|
||||
@ -397,6 +408,11 @@ func TestNextCommittedEnts(t *testing.T) {
|
||||
raftLog.append(ents...)
|
||||
raftLog.maybeCommit(5, 1)
|
||||
raftLog.appliedTo(tt.applied)
|
||||
if tt.snap {
|
||||
newSnap := snap
|
||||
newSnap.Metadata.Index++
|
||||
raftLog.restore(newSnap)
|
||||
}
|
||||
|
||||
nents := raftLog.nextCommittedEnts()
|
||||
if !reflect.DeepEqual(nents, tt.wents) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user