mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: fix correctness bug in CommittedEntries pagination
In #9982, a mechanism to limit the size of `CommittedEntries` was introduced. The way this mechanism worked was that it would load applicable entries (passing the max size hint) and would emit a `HardState` whose commit index was truncated to match the limitation applied to the entries. Unfortunately, this was subtly incorrect when the user-provided `Entries` implementation didn't exactly match what Raft uses internally. Depending on whether a `Node` or a `RawNode` was used, this would either lead to regressing the HardState's commit index or outright forgetting to apply entries, respectively. Asking implementers to precisely match the Raft size limitation semantics was considered but looks like a bad idea as it puts correctness squarely in the hands of downstream users. Instead, this PR removes the truncation of `HardState` when limiting is active and tracks the applied index separately. This removes the old paradigm (that the previous code tried to work around) that the client will always apply all the way to the commit index, which isn't true when commit entries are paginated. See [1] for more on the discovery of this bug (CockroachDB's implementation of `Entries` returns one more entry than Raft's when the size limit hits). [1]: https://github.com/cockroachdb/cockroach/issues/28918#issuecomment-418174448
This commit is contained in:
31
raft/node.go
31
raft/node.go
@@ -109,6 +109,19 @@ func (rd Ready) containsUpdates() bool {
|
||||
len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
|
||||
}
|
||||
|
||||
// appliedCursor extracts from the Ready the highest index the client has
|
||||
// applied (once the Ready is confirmed via Advance). If no information is
|
||||
// contained in the Ready, returns zero.
|
||||
func (rd Ready) appliedCursor() uint64 {
|
||||
if n := len(rd.CommittedEntries); n > 0 {
|
||||
return rd.CommittedEntries[n-1].Index
|
||||
}
|
||||
if index := rd.Snapshot.Metadata.Index; index > 0 {
|
||||
return index
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// Node represents a node in a raft cluster.
|
||||
type Node interface {
|
||||
// Tick increments the internal logical clock for the Node by a single tick. Election
|
||||
@@ -282,6 +295,7 @@ func (n *node) run(r *raft) {
|
||||
var prevLastUnstablei, prevLastUnstablet uint64
|
||||
var havePrevLastUnstablei bool
|
||||
var prevSnapi uint64
|
||||
var applyingToI uint64
|
||||
var rd Ready
|
||||
|
||||
lead := None
|
||||
@@ -381,13 +395,17 @@ func (n *node) run(r *raft) {
|
||||
if !IsEmptySnap(rd.Snapshot) {
|
||||
prevSnapi = rd.Snapshot.Metadata.Index
|
||||
}
|
||||
if index := rd.appliedCursor(); index != 0 {
|
||||
applyingToI = index
|
||||
}
|
||||
|
||||
r.msgs = nil
|
||||
r.readStates = nil
|
||||
advancec = n.advancec
|
||||
case <-advancec:
|
||||
if prevHardSt.Commit != 0 {
|
||||
r.raftLog.appliedTo(prevHardSt.Commit)
|
||||
if applyingToI != 0 {
|
||||
r.raftLog.appliedTo(applyingToI)
|
||||
applyingToI = 0
|
||||
}
|
||||
if havePrevLastUnstablei {
|
||||
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
|
||||
@@ -559,15 +577,6 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
|
||||
}
|
||||
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
|
||||
rd.HardState = hardSt
|
||||
// If we hit a size limit when loadaing CommittedEntries, clamp
|
||||
// our HardState.Commit to what we're actually returning. This is
|
||||
// also used as our cursor to resume for the next Ready batch.
|
||||
if len(rd.CommittedEntries) > 0 {
|
||||
lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1]
|
||||
if rd.HardState.Commit > lastCommit.Index {
|
||||
rd.HardState.Commit = lastCommit.Index
|
||||
}
|
||||
}
|
||||
}
|
||||
if r.raftLog.unstable.snapshot != nil {
|
||||
rd.Snapshot = *r.raftLog.unstable.snapshot
|
||||
|
||||
Reference in New Issue
Block a user