etcd/raft/log.go
Nathan VanBenschoten 539a8410f4 raft: don't apply entries when applying snapshot
This commit removes the ability to apply log entries at the same time as
applying a snapshot. Doing so it possible, but it leads to complex code and
raises questions about what should be applied first. It also raises additional
complexity when we start allowing concurrent, asynchronous log appends and log
application. It's easiest to just disallow this.

Signed-off-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
2022-11-11 13:58:09 -05:00

419 lines
12 KiB
Go

// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raft
import (
"fmt"
"log"
pb "go.etcd.io/etcd/raft/v3/raftpb"
)
type raftLog struct {
// storage contains all stable entries since the last snapshot.
storage Storage
// unstable contains all unstable entries and snapshot.
// they will be saved into storage.
unstable unstable
// committed is the highest log position that is known to be in
// stable storage on a quorum of nodes.
committed uint64
// applied is the highest log position that the application has
// been instructed to apply to its state machine.
// Invariant: applied <= committed
applied uint64
logger Logger
// maxNextCommittedEntsSize is the maximum number aggregate byte size of the
// messages returned from calls to nextCommittedEnts.
maxNextCommittedEntsSize uint64
}
// newLog returns log using the given storage and default options. It
// recovers the log to the state that it just commits and applies the
// latest snapshot.
func newLog(storage Storage, logger Logger) *raftLog {
return newLogWithSize(storage, logger, noLimit)
}
// newLogWithSize returns a log using the given storage and max
// message size.
func newLogWithSize(storage Storage, logger Logger, maxNextCommittedEntsSize uint64) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
maxNextCommittedEntsSize: maxNextCommittedEntsSize,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
lastIndex, err := storage.LastIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
log.unstable.offset = lastIndex + 1
log.unstable.logger = logger
// Initialize our committed and applied pointers to the time of the last compaction.
log.committed = firstIndex - 1
log.applied = firstIndex - 1
return log
}
func (l *raftLog) String() string {
return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
}
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
if ci-offset > uint64(len(ents)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents))
}
l.append(ents[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false
}
func (l *raftLog) append(ents ...pb.Entry) uint64 {
if len(ents) == 0 {
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}
// findConflict finds the index of the conflict.
// It returns the first pair of conflicting entries between the existing
// entries and the given entries, if there are any.
// If there is no conflicting entries, and the existing entries contains
// all the given entries, zero will be returned.
// If there is no conflicting entries, but the given entries contains new
// entries, the index of the first new entry will be returned.
// An entry is considered to be conflicting if it has the same index but
// a different term.
// The index of the given entries MUST be continuously increasing.
func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
for _, ne := range ents {
if !l.matchTerm(ne.Index, ne.Term) {
if ne.Index <= l.lastIndex() {
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
}
return ne.Index
}
}
return 0
}
// findConflictByTerm takes an (index, term) pair (indicating a conflicting log
// entry on a leader/follower during an append) and finds the largest index in
// log l with a term <= `term` and an index <= `index`. If no such index exists
// in the log, the log's first index is returned.
//
// The index provided MUST be equal to or less than l.lastIndex(). Invalid
// inputs log a warning and the input index is returned.
func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 {
if li := l.lastIndex(); index > li {
// NB: such calls should not exist, but since there is a straightfoward
// way to recover, do it.
//
// It is tempting to also check something about the first index, but
// there is odd behavior with peers that have no log, in which case
// lastIndex will return zero and firstIndex will return one, which
// leads to calls with an index of zero into this method.
l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm",
index, li)
return index
}
for {
logTerm, err := l.term(index)
if logTerm <= term || err != nil {
break
}
index--
}
return index
}
func (l *raftLog) unstableEntries() []pb.Entry {
if len(l.unstable.entries) == 0 {
return nil
}
return l.unstable.entries
}
// nextCommittedEnts returns all the available entries for execution.
// If applied is smaller than the index of snapshot, it returns all committed
// entries after the index of snapshot.
func (l *raftLog) nextCommittedEnts() (ents []pb.Entry) {
if l.hasPendingSnapshot() {
// See comment in hasNextCommittedEnts.
return nil
}
if l.committed > l.applied {
lo, hi := l.applied+1, l.committed+1 // [lo, hi)
ents, err := l.slice(lo, hi, l.maxNextCommittedEntsSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
return ents
}
return nil
}
// hasNextCommittedEnts returns if there is any available entries for execution.
// This is a fast check without heavy raftLog.slice() in nextCommittedEnts().
func (l *raftLog) hasNextCommittedEnts() bool {
if l.hasPendingSnapshot() {
// If we have a snapshot to apply, don't also return any committed
// entries. Doing so raises questions about what should be applied
// first.
return false
}
return l.committed > l.applied
}
// hasPendingSnapshot returns if there is pending snapshot waiting for applying.
func (l *raftLog) hasPendingSnapshot() bool {
return l.unstable.snapshot != nil
}
func (l *raftLog) snapshot() (pb.Snapshot, error) {
if l.unstable.snapshot != nil {
return *l.unstable.snapshot, nil
}
return l.storage.Snapshot()
}
func (l *raftLog) firstIndex() uint64 {
if i, ok := l.unstable.maybeFirstIndex(); ok {
return i
}
index, err := l.storage.FirstIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
return index
}
func (l *raftLog) lastIndex() uint64 {
if i, ok := l.unstable.maybeLastIndex(); ok {
return i
}
i, err := l.storage.LastIndex()
if err != nil {
panic(err) // TODO(bdarnell)
}
return i
}
func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
if l.lastIndex() < tocommit {
l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
}
l.committed = tocommit
}
}
func (l *raftLog) appliedTo(i uint64) {
if i == 0 {
return
}
if l.committed < i || i < l.applied {
l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
}
l.applied = i
}
func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
func (l *raftLog) lastTerm() uint64 {
t, err := l.term(l.lastIndex())
if err != nil {
l.logger.Panicf("unexpected error when getting the last term (%v)", err)
}
return t
}
func (l *raftLog) term(i uint64) (uint64, error) {
// the valid term range is [index of dummy entry, last index]
dummyIndex := l.firstIndex() - 1
if i < dummyIndex || i > l.lastIndex() {
// TODO: return an error instead?
return 0, nil
}
if t, ok := l.unstable.maybeTerm(i); ok {
return t, nil
}
t, err := l.storage.Term(i)
if err == nil {
return t, nil
}
if err == ErrCompacted || err == ErrUnavailable {
return 0, err
}
panic(err) // TODO(bdarnell)
}
func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) {
if i > l.lastIndex() {
return nil, nil
}
return l.slice(i, l.lastIndex()+1, maxsize)
}
// allEntries returns all entries in the log.
func (l *raftLog) allEntries() []pb.Entry {
ents, err := l.entries(l.firstIndex(), noLimit)
if err == nil {
return ents
}
if err == ErrCompacted { // try again if there was a racing compaction
return l.allEntries()
}
// TODO (xiangli): handle error?
panic(err)
}
// isUpToDate determines if the given (lastIndex,term) log is more up-to-date
// by comparing the index and term of the last entries in the existing logs.
// If the logs have last entries with different terms, then the log with the
// later term is more up-to-date. If the logs end with the same term, then
// whichever log has the larger lastIndex is more up-to-date. If the logs are
// the same, the given log is up-to-date.
func (l *raftLog) isUpToDate(lasti, term uint64) bool {
return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
}
func (l *raftLog) matchTerm(i, term uint64) bool {
t, err := l.term(i)
if err != nil {
return false
}
return t == term
}
func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
return true
}
return false
}
func (l *raftLog) restore(s pb.Snapshot) {
l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
l.committed = s.Metadata.Index
l.unstable.restore(s)
}
// slice returns a slice of log entries from lo through hi-1, inclusive.
func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
err := l.mustCheckOutOfBounds(lo, hi)
if err != nil {
return nil, err
}
if lo == hi {
return nil, nil
}
var ents []pb.Entry
if lo < l.unstable.offset {
storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize)
if err == ErrCompacted {
return nil, err
} else if err == ErrUnavailable {
l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
} else if err != nil {
panic(err) // TODO(bdarnell)
}
// check if ents has reached the size limitation
if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo {
return storedEnts, nil
}
ents = storedEnts
}
if hi > l.unstable.offset {
unstable := l.unstable.slice(max(lo, l.unstable.offset), hi)
if len(ents) > 0 {
combined := make([]pb.Entry, len(ents)+len(unstable))
n := copy(combined, ents)
copy(combined[n:], unstable)
ents = combined
} else {
ents = unstable
}
}
return limitSize(ents, maxSize), nil
}
// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
if lo > hi {
l.logger.Panicf("invalid slice %d > %d", lo, hi)
}
fi := l.firstIndex()
if lo < fi {
return ErrCompacted
}
length := l.lastIndex() + 1 - fi
if hi > fi+length {
l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
}
return nil
}
func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 {
if err == nil {
return t
}
if err == ErrCompacted {
return 0
}
l.logger.Panicf("unexpected error (%v)", err)
return 0
}