mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

To ease a future transition into joint quorums, this commit removes the previous "ad-hoc" majority-based quorum and vote computations with that introduced in the `raft/quorum` package. More specifically, the progressTracker now uses a quorum.JointConfig for which the "second" majority quorum is always empty; in this case the quorum behaves like the one quorum.MajorityConfig that is actually present. Or, more briefly, this change is a no-op, but it will take the busywork out of actually starting to make use of joint quorums in the future. On a side node, I suspect that this might've fixed a bug regarding the read index though I haven't been able to explicitly come up with a counter-example. The problem was that the acks collected for the read index weren't taking into account membership changes, so they'd run the danger of using acks from nodes since removed to claim that a quorum of acks had been received. There's a chance that there isn't a counter-example (the only guarantee extracted from the "quorum" is that there isn't another leader, but even if there's another leader all that matters is that that leader doesn't have a divergent history from the stale leader in the hypothetical counter-example), but either way there is morally a bug here that is now fixed because VoteCommitted doesn't care about votes from members that are not voters known to the currently active configuration.
458 lines
13 KiB
Go
458 lines
13 KiB
Go
// Copyright 2015 The etcd Authors
|
||
//
|
||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
// you may not use this file except in compliance with the License.
|
||
// You may obtain a copy of the License at
|
||
//
|
||
// http://www.apache.org/licenses/LICENSE-2.0
|
||
//
|
||
// Unless required by applicable law or agreed to in writing, software
|
||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
// See the License for the specific language governing permissions and
|
||
// limitations under the License.
|
||
|
||
package raft
|
||
|
||
import (
|
||
"fmt"
|
||
"sort"
|
||
|
||
"go.etcd.io/etcd/raft/quorum"
|
||
)
|
||
|
||
const (
|
||
ProgressStateProbe ProgressStateType = iota
|
||
ProgressStateReplicate
|
||
ProgressStateSnapshot
|
||
)
|
||
|
||
type ProgressStateType uint64
|
||
|
||
var prstmap = [...]string{
|
||
"ProgressStateProbe",
|
||
"ProgressStateReplicate",
|
||
"ProgressStateSnapshot",
|
||
}
|
||
|
||
func (st ProgressStateType) String() string { return prstmap[uint64(st)] }
|
||
|
||
// Progress represents a follower’s progress in the view of the leader. Leader maintains
|
||
// progresses of all followers, and sends entries to the follower based on its progress.
|
||
type Progress struct {
|
||
Match, Next uint64
|
||
// State defines how the leader should interact with the follower.
|
||
//
|
||
// When in ProgressStateProbe, leader sends at most one replication message
|
||
// per heartbeat interval. It also probes actual progress of the follower.
|
||
//
|
||
// When in ProgressStateReplicate, leader optimistically increases next
|
||
// to the latest entry sent after sending replication message. This is
|
||
// an optimized state for fast replicating log entries to the follower.
|
||
//
|
||
// When in ProgressStateSnapshot, leader should have sent out snapshot
|
||
// before and stops sending any replication message.
|
||
State ProgressStateType
|
||
|
||
// Paused is used in ProgressStateProbe.
|
||
// When Paused is true, raft should pause sending replication message to this peer.
|
||
Paused bool
|
||
// PendingSnapshot is used in ProgressStateSnapshot.
|
||
// If there is a pending snapshot, the pendingSnapshot will be set to the
|
||
// index of the snapshot. If pendingSnapshot is set, the replication process of
|
||
// this Progress will be paused. raft will not resend snapshot until the pending one
|
||
// is reported to be failed.
|
||
PendingSnapshot uint64
|
||
|
||
// RecentActive is true if the progress is recently active. Receiving any messages
|
||
// from the corresponding follower indicates the progress is active.
|
||
// RecentActive can be reset to false after an election timeout.
|
||
RecentActive bool
|
||
|
||
// inflights is a sliding window for the inflight messages.
|
||
// Each inflight message contains one or more log entries.
|
||
// The max number of entries per message is defined in raft config as MaxSizePerMsg.
|
||
// Thus inflight effectively limits both the number of inflight messages
|
||
// and the bandwidth each Progress can use.
|
||
// When inflights is full, no more message should be sent.
|
||
// When a leader sends out a message, the index of the last
|
||
// entry should be added to inflights. The index MUST be added
|
||
// into inflights in order.
|
||
// When a leader receives a reply, the previous inflights should
|
||
// be freed by calling inflights.freeTo with the index of the last
|
||
// received entry.
|
||
ins *inflights
|
||
|
||
// IsLearner is true if this progress is tracked for a learner.
|
||
IsLearner bool
|
||
}
|
||
|
||
func (pr *Progress) resetState(state ProgressStateType) {
|
||
pr.Paused = false
|
||
pr.PendingSnapshot = 0
|
||
pr.State = state
|
||
pr.ins.reset()
|
||
}
|
||
|
||
func (pr *Progress) becomeProbe() {
|
||
// If the original state is ProgressStateSnapshot, progress knows that
|
||
// the pending snapshot has been sent to this peer successfully, then
|
||
// probes from pendingSnapshot + 1.
|
||
if pr.State == ProgressStateSnapshot {
|
||
pendingSnapshot := pr.PendingSnapshot
|
||
pr.resetState(ProgressStateProbe)
|
||
pr.Next = max(pr.Match+1, pendingSnapshot+1)
|
||
} else {
|
||
pr.resetState(ProgressStateProbe)
|
||
pr.Next = pr.Match + 1
|
||
}
|
||
}
|
||
|
||
func (pr *Progress) becomeReplicate() {
|
||
pr.resetState(ProgressStateReplicate)
|
||
pr.Next = pr.Match + 1
|
||
}
|
||
|
||
func (pr *Progress) becomeSnapshot(snapshoti uint64) {
|
||
pr.resetState(ProgressStateSnapshot)
|
||
pr.PendingSnapshot = snapshoti
|
||
}
|
||
|
||
// maybeUpdate returns false if the given n index comes from an outdated message.
|
||
// Otherwise it updates the progress and returns true.
|
||
func (pr *Progress) maybeUpdate(n uint64) bool {
|
||
var updated bool
|
||
if pr.Match < n {
|
||
pr.Match = n
|
||
updated = true
|
||
pr.resume()
|
||
}
|
||
if pr.Next < n+1 {
|
||
pr.Next = n + 1
|
||
}
|
||
return updated
|
||
}
|
||
|
||
func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
|
||
|
||
// maybeDecrTo returns false if the given to index comes from an out of order message.
|
||
// Otherwise it decreases the progress next index to min(rejected, last) and returns true.
|
||
func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
|
||
if pr.State == ProgressStateReplicate {
|
||
// the rejection must be stale if the progress has matched and "rejected"
|
||
// is smaller than "match".
|
||
if rejected <= pr.Match {
|
||
return false
|
||
}
|
||
// directly decrease next to match + 1
|
||
pr.Next = pr.Match + 1
|
||
return true
|
||
}
|
||
|
||
// the rejection must be stale if "rejected" does not match next - 1
|
||
if pr.Next-1 != rejected {
|
||
return false
|
||
}
|
||
|
||
if pr.Next = min(rejected, last+1); pr.Next < 1 {
|
||
pr.Next = 1
|
||
}
|
||
pr.resume()
|
||
return true
|
||
}
|
||
|
||
func (pr *Progress) pause() { pr.Paused = true }
|
||
func (pr *Progress) resume() { pr.Paused = false }
|
||
|
||
// IsPaused returns whether sending log entries to this node has been
|
||
// paused. A node may be paused because it has rejected recent
|
||
// MsgApps, is currently waiting for a snapshot, or has reached the
|
||
// MaxInflightMsgs limit.
|
||
func (pr *Progress) IsPaused() bool {
|
||
switch pr.State {
|
||
case ProgressStateProbe:
|
||
return pr.Paused
|
||
case ProgressStateReplicate:
|
||
return pr.ins.full()
|
||
case ProgressStateSnapshot:
|
||
return true
|
||
default:
|
||
panic("unexpected state")
|
||
}
|
||
}
|
||
|
||
func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 }
|
||
|
||
// needSnapshotAbort returns true if snapshot progress's Match
|
||
// is equal or higher than the pendingSnapshot.
|
||
func (pr *Progress) needSnapshotAbort() bool {
|
||
return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot
|
||
}
|
||
|
||
func (pr *Progress) String() string {
|
||
return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d, recentActive = %v, isLearner = %v",
|
||
pr.Next, pr.Match, pr.State, pr.IsPaused(), pr.PendingSnapshot, pr.RecentActive, pr.IsLearner)
|
||
}
|
||
|
||
type inflights struct {
|
||
// the starting index in the buffer
|
||
start int
|
||
// number of inflights in the buffer
|
||
count int
|
||
|
||
// the size of the buffer
|
||
size int
|
||
|
||
// buffer contains the index of the last entry
|
||
// inside one message.
|
||
buffer []uint64
|
||
}
|
||
|
||
func newInflights(size int) *inflights {
|
||
return &inflights{
|
||
size: size,
|
||
}
|
||
}
|
||
|
||
// add adds an inflight into inflights
|
||
func (in *inflights) add(inflight uint64) {
|
||
if in.full() {
|
||
panic("cannot add into a full inflights")
|
||
}
|
||
next := in.start + in.count
|
||
size := in.size
|
||
if next >= size {
|
||
next -= size
|
||
}
|
||
if next >= len(in.buffer) {
|
||
in.growBuf()
|
||
}
|
||
in.buffer[next] = inflight
|
||
in.count++
|
||
}
|
||
|
||
// grow the inflight buffer by doubling up to inflights.size. We grow on demand
|
||
// instead of preallocating to inflights.size to handle systems which have
|
||
// thousands of Raft groups per process.
|
||
func (in *inflights) growBuf() {
|
||
newSize := len(in.buffer) * 2
|
||
if newSize == 0 {
|
||
newSize = 1
|
||
} else if newSize > in.size {
|
||
newSize = in.size
|
||
}
|
||
newBuffer := make([]uint64, newSize)
|
||
copy(newBuffer, in.buffer)
|
||
in.buffer = newBuffer
|
||
}
|
||
|
||
// freeTo frees the inflights smaller or equal to the given `to` flight.
|
||
func (in *inflights) freeTo(to uint64) {
|
||
if in.count == 0 || to < in.buffer[in.start] {
|
||
// out of the left side of the window
|
||
return
|
||
}
|
||
|
||
idx := in.start
|
||
var i int
|
||
for i = 0; i < in.count; i++ {
|
||
if to < in.buffer[idx] { // found the first large inflight
|
||
break
|
||
}
|
||
|
||
// increase index and maybe rotate
|
||
size := in.size
|
||
if idx++; idx >= size {
|
||
idx -= size
|
||
}
|
||
}
|
||
// free i inflights and set new start index
|
||
in.count -= i
|
||
in.start = idx
|
||
if in.count == 0 {
|
||
// inflights is empty, reset the start index so that we don't grow the
|
||
// buffer unnecessarily.
|
||
in.start = 0
|
||
}
|
||
}
|
||
|
||
func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) }
|
||
|
||
// full returns true if the inflights is full.
|
||
func (in *inflights) full() bool {
|
||
return in.count == in.size
|
||
}
|
||
|
||
// resets frees all inflights.
|
||
func (in *inflights) reset() {
|
||
in.count = 0
|
||
in.start = 0
|
||
}
|
||
|
||
// progressTracker tracks the currently active configuration and the information
|
||
// known about the nodes and learners in it. In particular, it tracks the match
|
||
// index for each peer which in turn allows reasoning about the committed index.
|
||
type progressTracker struct {
|
||
voters quorum.JointConfig
|
||
learners map[uint64]struct{}
|
||
prs map[uint64]*Progress
|
||
|
||
votes map[uint64]bool
|
||
|
||
maxInflight int
|
||
}
|
||
|
||
func makeProgressTracker(maxInflight int) progressTracker {
|
||
p := progressTracker{
|
||
maxInflight: maxInflight,
|
||
voters: quorum.JointConfig{
|
||
quorum.MajorityConfig{},
|
||
quorum.MajorityConfig{},
|
||
},
|
||
learners: map[uint64]struct{}{},
|
||
votes: map[uint64]bool{},
|
||
prs: map[uint64]*Progress{},
|
||
}
|
||
return p
|
||
}
|
||
|
||
// isSingleton returns true if (and only if) there is only one voting member
|
||
// (i.e. the leader) in the current configuration.
|
||
func (p *progressTracker) isSingleton() bool {
|
||
return len(p.voters[0]) == 1 && len(p.voters[1]) == 0
|
||
}
|
||
|
||
type progressAckIndexer map[uint64]*Progress
|
||
|
||
var _ quorum.AckedIndexer = progressAckIndexer(nil)
|
||
|
||
func (l progressAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) {
|
||
pr, ok := l[id]
|
||
if !ok {
|
||
return 0, false
|
||
}
|
||
return quorum.Index(pr.Match), true
|
||
}
|
||
|
||
// committed returns the largest log index known to be committed based on what
|
||
// the voting members of the group have acknowledged.
|
||
func (p *progressTracker) committed() uint64 {
|
||
return uint64(p.voters.CommittedIndex(progressAckIndexer(p.prs)))
|
||
}
|
||
|
||
func (p *progressTracker) removeAny(id uint64) {
|
||
_, okPR := p.prs[id]
|
||
_, okV1 := p.voters[0][id]
|
||
_, okV2 := p.voters[1][id]
|
||
_, okL := p.learners[id]
|
||
|
||
okV := okV1 || okV2
|
||
|
||
if !okPR {
|
||
panic("attempting to remove unknown peer %x")
|
||
} else if !okV && !okL {
|
||
panic("attempting to remove unknown peer %x")
|
||
} else if okV && okL {
|
||
panic(fmt.Sprintf("peer %x is both voter and learner", id))
|
||
}
|
||
|
||
delete(p.voters[0], id)
|
||
delete(p.voters[1], id)
|
||
delete(p.learners, id)
|
||
delete(p.prs, id)
|
||
}
|
||
|
||
// initProgress initializes a new progress for the given node or learner. The
|
||
// node may not exist yet in either form or a panic will ensue.
|
||
func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
|
||
if pr := p.prs[id]; pr != nil {
|
||
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
|
||
}
|
||
if !isLearner {
|
||
p.voters[0][id] = struct{}{}
|
||
} else {
|
||
p.learners[id] = struct{}{}
|
||
}
|
||
p.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: isLearner}
|
||
}
|
||
|
||
func (p *progressTracker) getProgress(id uint64) *Progress {
|
||
return p.prs[id]
|
||
}
|
||
|
||
// visit invokes the supplied closure for all tracked progresses.
|
||
func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
|
||
for id, pr := range p.prs {
|
||
f(id, pr)
|
||
}
|
||
}
|
||
|
||
// checkQuorumActive returns true if the quorum is active from
|
||
// the view of the local raft state machine. Otherwise, it returns
|
||
// false.
|
||
func (p *progressTracker) quorumActive() bool {
|
||
votes := map[uint64]bool{}
|
||
p.visit(func(id uint64, pr *Progress) {
|
||
if pr.IsLearner {
|
||
return
|
||
}
|
||
votes[id] = pr.RecentActive
|
||
})
|
||
|
||
return p.voters.VoteResult(votes) == quorum.VoteWon
|
||
}
|
||
|
||
func (p *progressTracker) voterNodes() []uint64 {
|
||
m := p.voters.IDs()
|
||
nodes := make([]uint64, 0, len(m))
|
||
for id := range m {
|
||
nodes = append(nodes, id)
|
||
}
|
||
sort.Sort(uint64Slice(nodes))
|
||
return nodes
|
||
}
|
||
|
||
func (p *progressTracker) learnerNodes() []uint64 {
|
||
nodes := make([]uint64, 0, len(p.learners))
|
||
for id := range p.learners {
|
||
nodes = append(nodes, id)
|
||
}
|
||
sort.Sort(uint64Slice(nodes))
|
||
return nodes
|
||
}
|
||
|
||
// resetVotes prepares for a new round of vote counting via recordVote.
|
||
func (p *progressTracker) resetVotes() {
|
||
p.votes = map[uint64]bool{}
|
||
}
|
||
|
||
// recordVote records that the node with the given id voted for this Raft
|
||
// instance if v == true (and declined it otherwise).
|
||
func (p *progressTracker) recordVote(id uint64, v bool) {
|
||
_, ok := p.votes[id]
|
||
if !ok {
|
||
p.votes[id] = v
|
||
}
|
||
}
|
||
|
||
// tallyVotes returns the number of granted and rejected votes, and whether the
|
||
// election outcome is known.
|
||
func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) {
|
||
// Make sure to populate granted/rejected correctly even if the votes slice
|
||
// contains members no longer part of the configuration. This doesn't really
|
||
// matter in the way the numbers are used (they're informational), but might
|
||
// as well get it right.
|
||
for id, pr := range p.prs {
|
||
if pr.IsLearner {
|
||
continue
|
||
}
|
||
if p.votes[id] {
|
||
granted++
|
||
} else {
|
||
rejected++
|
||
}
|
||
}
|
||
result := p.voters.VoteResult(p.votes)
|
||
return granted, rejected, result
|
||
}
|