mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #10683 from tbg/prs
raft: extract progress tracking into own component
This commit is contained in:
commit
c38e965a65
10
raft/node.go
10
raft/node.go
@ -353,15 +353,15 @@ func (n *node) run(r *raft) {
|
|||||||
}
|
}
|
||||||
case m := <-n.recvc:
|
case m := <-n.recvc:
|
||||||
// filter out response message from unknown From.
|
// filter out response message from unknown From.
|
||||||
if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
|
if pr := r.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
|
||||||
r.Step(m)
|
r.Step(m)
|
||||||
}
|
}
|
||||||
case cc := <-n.confc:
|
case cc := <-n.confc:
|
||||||
if cc.NodeID == None {
|
if cc.NodeID == None {
|
||||||
select {
|
select {
|
||||||
case n.confstatec <- pb.ConfState{
|
case n.confstatec <- pb.ConfState{
|
||||||
Nodes: r.nodes(),
|
Nodes: r.prs.voterNodes(),
|
||||||
Learners: r.learnerNodes()}:
|
Learners: r.prs.learnerNodes()}:
|
||||||
case <-n.done:
|
case <-n.done:
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
@ -384,8 +384,8 @@ func (n *node) run(r *raft) {
|
|||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case n.confstatec <- pb.ConfState{
|
case n.confstatec <- pb.ConfState{
|
||||||
Nodes: r.nodes(),
|
Nodes: r.prs.voterNodes(),
|
||||||
Learners: r.learnerNodes()}:
|
Learners: r.prs.learnerNodes()}:
|
||||||
case <-n.done:
|
case <-n.done:
|
||||||
}
|
}
|
||||||
case <-n.tickc:
|
case <-n.tickc:
|
||||||
|
177
raft/progress.go
177
raft/progress.go
@ -14,7 +14,10 @@
|
|||||||
|
|
||||||
package raft
|
package raft
|
||||||
|
|
||||||
import "fmt"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
ProgressStateProbe ProgressStateType = iota
|
ProgressStateProbe ProgressStateType = iota
|
||||||
@ -283,3 +286,175 @@ func (in *inflights) reset() {
|
|||||||
in.count = 0
|
in.count = 0
|
||||||
in.start = 0
|
in.start = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// progressTracker tracks the currently active configuration and the information
|
||||||
|
// known about the nodes and learners in it. In particular, it tracks the match
|
||||||
|
// index for each peer which in turn allows reasoning about the committed index.
|
||||||
|
type progressTracker struct {
|
||||||
|
nodes map[uint64]*Progress
|
||||||
|
learners map[uint64]*Progress
|
||||||
|
|
||||||
|
votes map[uint64]bool
|
||||||
|
|
||||||
|
maxInflight int
|
||||||
|
matchBuf uint64Slice
|
||||||
|
}
|
||||||
|
|
||||||
|
func makePRS(maxInflight int) progressTracker {
|
||||||
|
p := progressTracker{
|
||||||
|
maxInflight: maxInflight,
|
||||||
|
nodes: map[uint64]*Progress{},
|
||||||
|
learners: map[uint64]*Progress{},
|
||||||
|
votes: map[uint64]bool{},
|
||||||
|
}
|
||||||
|
return p
|
||||||
|
}
|
||||||
|
|
||||||
|
// isSingleton returns true if (and only if) there is only one voting member
|
||||||
|
// (i.e. the leader) in the current configuration.
|
||||||
|
func (p *progressTracker) isSingleton() bool {
|
||||||
|
return len(p.nodes) == 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *progressTracker) quorum() int {
|
||||||
|
return len(p.nodes)/2 + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool {
|
||||||
|
return len(m) >= p.quorum()
|
||||||
|
}
|
||||||
|
|
||||||
|
// committed returns the largest log index known to be committed based on what
|
||||||
|
// the voting members of the group have acknowledged.
|
||||||
|
func (p *progressTracker) committed() uint64 {
|
||||||
|
// Preserving matchBuf across calls is an optimization
|
||||||
|
// used to avoid allocating a new slice on each call.
|
||||||
|
if cap(p.matchBuf) < len(p.nodes) {
|
||||||
|
p.matchBuf = make(uint64Slice, len(p.nodes))
|
||||||
|
}
|
||||||
|
p.matchBuf = p.matchBuf[:len(p.nodes)]
|
||||||
|
idx := 0
|
||||||
|
for _, pr := range p.nodes {
|
||||||
|
p.matchBuf[idx] = pr.Match
|
||||||
|
idx++
|
||||||
|
}
|
||||||
|
sort.Sort(&p.matchBuf)
|
||||||
|
return p.matchBuf[len(p.matchBuf)-p.quorum()]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *progressTracker) removeAny(id uint64) {
|
||||||
|
pN := p.nodes[id]
|
||||||
|
pL := p.learners[id]
|
||||||
|
|
||||||
|
if pN == nil && pL == nil {
|
||||||
|
panic("attempting to remove unknown peer %x")
|
||||||
|
} else if pN != nil && pL != nil {
|
||||||
|
panic(fmt.Sprintf("peer %x is both voter and learner", id))
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(p.nodes, id)
|
||||||
|
delete(p.learners, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
// initProgress initializes a new progress for the given node or learner. The
|
||||||
|
// node may not exist yet in either form or a panic will ensue.
|
||||||
|
func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
|
||||||
|
if pr := p.nodes[id]; pr != nil {
|
||||||
|
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
|
||||||
|
}
|
||||||
|
if pr := p.learners[id]; pr != nil {
|
||||||
|
panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr))
|
||||||
|
}
|
||||||
|
if !isLearner {
|
||||||
|
p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *progressTracker) getProgress(id uint64) *Progress {
|
||||||
|
if pr, ok := p.nodes[id]; ok {
|
||||||
|
return pr
|
||||||
|
}
|
||||||
|
|
||||||
|
return p.learners[id]
|
||||||
|
}
|
||||||
|
|
||||||
|
// visit invokes the supplied closure for all tracked progresses.
|
||||||
|
func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
|
||||||
|
for id, pr := range p.nodes {
|
||||||
|
f(id, pr)
|
||||||
|
}
|
||||||
|
|
||||||
|
for id, pr := range p.learners {
|
||||||
|
f(id, pr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkQuorumActive returns true if the quorum is active from
|
||||||
|
// the view of the local raft state machine. Otherwise, it returns
|
||||||
|
// false.
|
||||||
|
func (p *progressTracker) quorumActive() bool {
|
||||||
|
var act int
|
||||||
|
p.visit(func(id uint64, pr *Progress) {
|
||||||
|
if pr.RecentActive && !pr.IsLearner {
|
||||||
|
act++
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
return act >= p.quorum()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *progressTracker) voterNodes() []uint64 {
|
||||||
|
nodes := make([]uint64, 0, len(p.nodes))
|
||||||
|
for id := range p.nodes {
|
||||||
|
nodes = append(nodes, id)
|
||||||
|
}
|
||||||
|
sort.Sort(uint64Slice(nodes))
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *progressTracker) learnerNodes() []uint64 {
|
||||||
|
nodes := make([]uint64, 0, len(p.learners))
|
||||||
|
for id := range p.learners {
|
||||||
|
nodes = append(nodes, id)
|
||||||
|
}
|
||||||
|
sort.Sort(uint64Slice(nodes))
|
||||||
|
return nodes
|
||||||
|
}
|
||||||
|
|
||||||
|
// resetVotes prepares for a new round of vote counting via recordVote.
|
||||||
|
func (p *progressTracker) resetVotes() {
|
||||||
|
p.votes = map[uint64]bool{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// recordVote records that the node with the given id voted for this Raft
|
||||||
|
// instance if v == true (and declined it otherwise).
|
||||||
|
func (p *progressTracker) recordVote(id uint64, v bool) {
|
||||||
|
_, ok := p.votes[id]
|
||||||
|
if !ok {
|
||||||
|
p.votes[id] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// tallyVotes returns the number of granted and rejected votes, and whether the
|
||||||
|
// election outcome is known.
|
||||||
|
func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) {
|
||||||
|
for _, v := range p.votes {
|
||||||
|
if v {
|
||||||
|
granted++
|
||||||
|
} else {
|
||||||
|
rejected++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
q := p.quorum()
|
||||||
|
|
||||||
|
result = electionIndeterminate
|
||||||
|
if granted >= q {
|
||||||
|
result = electionWon
|
||||||
|
} else if rejected >= q {
|
||||||
|
result = electionLost
|
||||||
|
}
|
||||||
|
return granted, rejected, result
|
||||||
|
}
|
||||||
|
244
raft/raft.go
244
raft/raft.go
@ -20,7 +20,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -261,18 +260,13 @@ type raft struct {
|
|||||||
|
|
||||||
maxMsgSize uint64
|
maxMsgSize uint64
|
||||||
maxUncommittedSize uint64
|
maxUncommittedSize uint64
|
||||||
maxInflight int
|
prs progressTracker
|
||||||
prs map[uint64]*Progress
|
|
||||||
learnerPrs map[uint64]*Progress
|
|
||||||
matchBuf uint64Slice
|
|
||||||
|
|
||||||
state StateType
|
state StateType
|
||||||
|
|
||||||
// isLearner is true if the local raft node is a learner.
|
// isLearner is true if the local raft node is a learner.
|
||||||
isLearner bool
|
isLearner bool
|
||||||
|
|
||||||
votes map[uint64]bool
|
|
||||||
|
|
||||||
msgs []pb.Message
|
msgs []pb.Message
|
||||||
|
|
||||||
// the leader id
|
// the leader id
|
||||||
@ -348,10 +342,8 @@ func newRaft(c *Config) *raft {
|
|||||||
isLearner: false,
|
isLearner: false,
|
||||||
raftLog: raftlog,
|
raftLog: raftlog,
|
||||||
maxMsgSize: c.MaxSizePerMsg,
|
maxMsgSize: c.MaxSizePerMsg,
|
||||||
maxInflight: c.MaxInflightMsgs,
|
|
||||||
maxUncommittedSize: c.MaxUncommittedEntriesSize,
|
maxUncommittedSize: c.MaxUncommittedEntriesSize,
|
||||||
prs: make(map[uint64]*Progress),
|
prs: makePRS(c.MaxInflightMsgs),
|
||||||
learnerPrs: make(map[uint64]*Progress),
|
|
||||||
electionTimeout: c.ElectionTick,
|
electionTimeout: c.ElectionTick,
|
||||||
heartbeatTimeout: c.HeartbeatTick,
|
heartbeatTimeout: c.HeartbeatTick,
|
||||||
logger: c.Logger,
|
logger: c.Logger,
|
||||||
@ -361,13 +353,13 @@ func newRaft(c *Config) *raft {
|
|||||||
disableProposalForwarding: c.DisableProposalForwarding,
|
disableProposalForwarding: c.DisableProposalForwarding,
|
||||||
}
|
}
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
|
// Add node to active config.
|
||||||
|
r.prs.initProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */)
|
||||||
}
|
}
|
||||||
for _, p := range learners {
|
for _, p := range learners {
|
||||||
if _, ok := r.prs[p]; ok {
|
// Add learner to active config.
|
||||||
panic(fmt.Sprintf("node %x is in both learner and peer list", p))
|
r.prs.initProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */)
|
||||||
}
|
|
||||||
r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true}
|
|
||||||
if r.id == p {
|
if r.id == p {
|
||||||
r.isLearner = true
|
r.isLearner = true
|
||||||
}
|
}
|
||||||
@ -382,7 +374,7 @@ func newRaft(c *Config) *raft {
|
|||||||
r.becomeFollower(r.Term, None)
|
r.becomeFollower(r.Term, None)
|
||||||
|
|
||||||
var nodesStrs []string
|
var nodesStrs []string
|
||||||
for _, n := range r.nodes() {
|
for _, n := range r.prs.voterNodes() {
|
||||||
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
|
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -403,26 +395,6 @@ func (r *raft) hardState() pb.HardState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
|
|
||||||
|
|
||||||
func (r *raft) nodes() []uint64 {
|
|
||||||
nodes := make([]uint64, 0, len(r.prs))
|
|
||||||
for id := range r.prs {
|
|
||||||
nodes = append(nodes, id)
|
|
||||||
}
|
|
||||||
sort.Sort(uint64Slice(nodes))
|
|
||||||
return nodes
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *raft) learnerNodes() []uint64 {
|
|
||||||
nodes := make([]uint64, 0, len(r.learnerPrs))
|
|
||||||
for id := range r.learnerPrs {
|
|
||||||
nodes = append(nodes, id)
|
|
||||||
}
|
|
||||||
sort.Sort(uint64Slice(nodes))
|
|
||||||
return nodes
|
|
||||||
}
|
|
||||||
|
|
||||||
// send persists state to stable storage and then sends to its mailbox.
|
// send persists state to stable storage and then sends to its mailbox.
|
||||||
func (r *raft) send(m pb.Message) {
|
func (r *raft) send(m pb.Message) {
|
||||||
m.From = r.id
|
m.From = r.id
|
||||||
@ -457,14 +429,6 @@ func (r *raft) send(m pb.Message) {
|
|||||||
r.msgs = append(r.msgs, m)
|
r.msgs = append(r.msgs, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) getProgress(id uint64) *Progress {
|
|
||||||
if pr, ok := r.prs[id]; ok {
|
|
||||||
return pr
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.learnerPrs[id]
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendAppend sends an append RPC with new entries (if any) and the
|
// sendAppend sends an append RPC with new entries (if any) and the
|
||||||
// current commit index to the given peer.
|
// current commit index to the given peer.
|
||||||
func (r *raft) sendAppend(to uint64) {
|
func (r *raft) sendAppend(to uint64) {
|
||||||
@ -477,7 +441,7 @@ func (r *raft) sendAppend(to uint64) {
|
|||||||
// ("empty" messages are useful to convey updated Commit indexes, but
|
// ("empty" messages are useful to convey updated Commit indexes, but
|
||||||
// are undesirable when we're sending multiple messages in a batch).
|
// are undesirable when we're sending multiple messages in a batch).
|
||||||
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
|
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
|
||||||
pr := r.getProgress(to)
|
pr := r.prs.getProgress(to)
|
||||||
if pr.IsPaused() {
|
if pr.IsPaused() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -546,7 +510,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
|
|||||||
// or it might not have all the committed entries.
|
// or it might not have all the committed entries.
|
||||||
// The leader MUST NOT forward the follower's commit to
|
// The leader MUST NOT forward the follower's commit to
|
||||||
// an unmatched index.
|
// an unmatched index.
|
||||||
commit := min(r.getProgress(to).Match, r.raftLog.committed)
|
commit := min(r.prs.getProgress(to).Match, r.raftLog.committed)
|
||||||
m := pb.Message{
|
m := pb.Message{
|
||||||
To: to,
|
To: to,
|
||||||
Type: pb.MsgHeartbeat,
|
Type: pb.MsgHeartbeat,
|
||||||
@ -557,20 +521,10 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
|
|||||||
r.send(m)
|
r.send(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) {
|
|
||||||
for id, pr := range r.prs {
|
|
||||||
f(id, pr)
|
|
||||||
}
|
|
||||||
|
|
||||||
for id, pr := range r.learnerPrs {
|
|
||||||
f(id, pr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
|
// bcastAppend sends RPC, with entries to all peers that are not up-to-date
|
||||||
// according to the progress recorded in r.prs.
|
// according to the progress recorded in r.prs.
|
||||||
func (r *raft) bcastAppend() {
|
func (r *raft) bcastAppend() {
|
||||||
r.forEachProgress(func(id uint64, _ *Progress) {
|
r.prs.visit(func(id uint64, _ *Progress) {
|
||||||
if id == r.id {
|
if id == r.id {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -590,7 +544,7 @@ func (r *raft) bcastHeartbeat() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
|
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
|
||||||
r.forEachProgress(func(id uint64, _ *Progress) {
|
r.prs.visit(func(id uint64, _ *Progress) {
|
||||||
if id == r.id {
|
if id == r.id {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -602,19 +556,7 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
|
|||||||
// the commit index changed (in which case the caller should call
|
// the commit index changed (in which case the caller should call
|
||||||
// r.bcastAppend).
|
// r.bcastAppend).
|
||||||
func (r *raft) maybeCommit() bool {
|
func (r *raft) maybeCommit() bool {
|
||||||
// Preserving matchBuf across calls is an optimization
|
mci := r.prs.committed()
|
||||||
// used to avoid allocating a new slice on each call.
|
|
||||||
if cap(r.matchBuf) < len(r.prs) {
|
|
||||||
r.matchBuf = make(uint64Slice, len(r.prs))
|
|
||||||
}
|
|
||||||
r.matchBuf = r.matchBuf[:len(r.prs)]
|
|
||||||
idx := 0
|
|
||||||
for _, p := range r.prs {
|
|
||||||
r.matchBuf[idx] = p.Match
|
|
||||||
idx++
|
|
||||||
}
|
|
||||||
sort.Sort(&r.matchBuf)
|
|
||||||
mci := r.matchBuf[len(r.matchBuf)-r.quorum()]
|
|
||||||
return r.raftLog.maybeCommit(mci, r.Term)
|
return r.raftLog.maybeCommit(mci, r.Term)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -631,9 +573,14 @@ func (r *raft) reset(term uint64) {
|
|||||||
|
|
||||||
r.abortLeaderTransfer()
|
r.abortLeaderTransfer()
|
||||||
|
|
||||||
r.votes = make(map[uint64]bool)
|
r.prs.resetVotes()
|
||||||
r.forEachProgress(func(id uint64, pr *Progress) {
|
r.prs.visit(func(id uint64, pr *Progress) {
|
||||||
*pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
|
*pr = Progress{
|
||||||
|
Match: 0,
|
||||||
|
Next: r.raftLog.lastIndex() + 1,
|
||||||
|
ins: newInflights(r.prs.maxInflight),
|
||||||
|
IsLearner: pr.IsLearner,
|
||||||
|
}
|
||||||
if id == r.id {
|
if id == r.id {
|
||||||
pr.Match = r.raftLog.lastIndex()
|
pr.Match = r.raftLog.lastIndex()
|
||||||
}
|
}
|
||||||
@ -661,7 +608,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
|
|||||||
}
|
}
|
||||||
// use latest "last" index after truncate/append
|
// use latest "last" index after truncate/append
|
||||||
li = r.raftLog.append(es...)
|
li = r.raftLog.append(es...)
|
||||||
r.getProgress(r.id).maybeUpdate(li)
|
r.prs.getProgress(r.id).maybeUpdate(li)
|
||||||
// Regardless of maybeCommit's return, our caller will call bcastAppend.
|
// Regardless of maybeCommit's return, our caller will call bcastAppend.
|
||||||
r.maybeCommit()
|
r.maybeCommit()
|
||||||
return true
|
return true
|
||||||
@ -734,7 +681,7 @@ func (r *raft) becomePreCandidate() {
|
|||||||
// but doesn't change anything else. In particular it does not increase
|
// but doesn't change anything else. In particular it does not increase
|
||||||
// r.Term or change r.Vote.
|
// r.Term or change r.Vote.
|
||||||
r.step = stepCandidate
|
r.step = stepCandidate
|
||||||
r.votes = make(map[uint64]bool)
|
r.prs.resetVotes()
|
||||||
r.tick = r.tickElection
|
r.tick = r.tickElection
|
||||||
r.lead = None
|
r.lead = None
|
||||||
r.state = StatePreCandidate
|
r.state = StatePreCandidate
|
||||||
@ -755,7 +702,7 @@ func (r *raft) becomeLeader() {
|
|||||||
// (perhaps after having received a snapshot as a result). The leader is
|
// (perhaps after having received a snapshot as a result). The leader is
|
||||||
// trivially in this state. Note that r.reset() has initialized this
|
// trivially in this state. Note that r.reset() has initialized this
|
||||||
// progress with the last index already.
|
// progress with the last index already.
|
||||||
r.prs[r.id].becomeReplicate()
|
r.prs.getProgress(r.id).becomeReplicate()
|
||||||
|
|
||||||
// Conservatively set the pendingConfIndex to the last index in the
|
// Conservatively set the pendingConfIndex to the last index in the
|
||||||
// log. There may or may not be a pending config change, but it's
|
// log. There may or may not be a pending config change, but it's
|
||||||
@ -790,7 +737,7 @@ func (r *raft) campaign(t CampaignType) {
|
|||||||
voteMsg = pb.MsgVote
|
voteMsg = pb.MsgVote
|
||||||
term = r.Term
|
term = r.Term
|
||||||
}
|
}
|
||||||
if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
|
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == electionWon {
|
||||||
// We won the election after voting for ourselves (which must mean that
|
// We won the election after voting for ourselves (which must mean that
|
||||||
// this is a single-node cluster). Advance to the next state.
|
// this is a single-node cluster). Advance to the next state.
|
||||||
if t == campaignPreElection {
|
if t == campaignPreElection {
|
||||||
@ -800,7 +747,7 @@ func (r *raft) campaign(t CampaignType) {
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for id := range r.prs {
|
for id := range r.prs.nodes {
|
||||||
if id == r.id {
|
if id == r.id {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -815,21 +762,22 @@ func (r *raft) campaign(t CampaignType) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) {
|
type electionResult byte
|
||||||
|
|
||||||
|
const (
|
||||||
|
electionIndeterminate electionResult = iota
|
||||||
|
electionLost
|
||||||
|
electionWon
|
||||||
|
)
|
||||||
|
|
||||||
|
func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result electionResult) {
|
||||||
if v {
|
if v {
|
||||||
r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
|
r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
|
||||||
} else {
|
} else {
|
||||||
r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
|
r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
|
||||||
}
|
}
|
||||||
if _, ok := r.votes[id]; !ok {
|
r.prs.recordVote(id, v)
|
||||||
r.votes[id] = v
|
return r.prs.tallyVotes()
|
||||||
}
|
|
||||||
for _, vv := range r.votes {
|
|
||||||
if vv {
|
|
||||||
granted++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return granted
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) Step(m pb.Message) error {
|
func (r *raft) Step(m pb.Message) error {
|
||||||
@ -985,16 +933,32 @@ func stepLeader(r *raft, m pb.Message) error {
|
|||||||
r.bcastHeartbeat()
|
r.bcastHeartbeat()
|
||||||
return nil
|
return nil
|
||||||
case pb.MsgCheckQuorum:
|
case pb.MsgCheckQuorum:
|
||||||
if !r.checkQuorumActive() {
|
// The leader should always see itself as active. As a precaution, handle
|
||||||
|
// the case in which the leader isn't in the configuration any more (for
|
||||||
|
// example if it just removed itself).
|
||||||
|
//
|
||||||
|
// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
|
||||||
|
// leader steps down when removing itself. I might be missing something.
|
||||||
|
if pr := r.prs.getProgress(r.id); pr != nil {
|
||||||
|
pr.RecentActive = true
|
||||||
|
}
|
||||||
|
if !r.prs.quorumActive() {
|
||||||
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
|
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
|
||||||
r.becomeFollower(r.Term, None)
|
r.becomeFollower(r.Term, None)
|
||||||
}
|
}
|
||||||
|
// Mark everyone (but ourselves) as inactive in preparation for the next
|
||||||
|
// CheckQuorum.
|
||||||
|
r.prs.visit(func(id uint64, pr *Progress) {
|
||||||
|
if id != r.id {
|
||||||
|
pr.RecentActive = false
|
||||||
|
}
|
||||||
|
})
|
||||||
return nil
|
return nil
|
||||||
case pb.MsgProp:
|
case pb.MsgProp:
|
||||||
if len(m.Entries) == 0 {
|
if len(m.Entries) == 0 {
|
||||||
r.logger.Panicf("%x stepped empty MsgProp", r.id)
|
r.logger.Panicf("%x stepped empty MsgProp", r.id)
|
||||||
}
|
}
|
||||||
if _, ok := r.prs[r.id]; !ok {
|
if r.prs.getProgress(r.id) == nil {
|
||||||
// If we are not currently a member of the range (i.e. this node
|
// If we are not currently a member of the range (i.e. this node
|
||||||
// was removed from the configuration while serving as leader),
|
// was removed from the configuration while serving as leader),
|
||||||
// drop any new proposals.
|
// drop any new proposals.
|
||||||
@ -1024,7 +988,7 @@ func stepLeader(r *raft, m pb.Message) error {
|
|||||||
r.bcastAppend()
|
r.bcastAppend()
|
||||||
return nil
|
return nil
|
||||||
case pb.MsgReadIndex:
|
case pb.MsgReadIndex:
|
||||||
if r.quorum() > 1 {
|
if !r.prs.isSingleton() { // more than one voting member in cluster
|
||||||
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
|
if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
|
||||||
// Reject read only request when this leader has not committed any log entry at its term.
|
// Reject read only request when this leader has not committed any log entry at its term.
|
||||||
return nil
|
return nil
|
||||||
@ -1036,6 +1000,8 @@ func stepLeader(r *raft, m pb.Message) error {
|
|||||||
switch r.readOnly.option {
|
switch r.readOnly.option {
|
||||||
case ReadOnlySafe:
|
case ReadOnlySafe:
|
||||||
r.readOnly.addRequest(r.raftLog.committed, m)
|
r.readOnly.addRequest(r.raftLog.committed, m)
|
||||||
|
// The local node automatically acks the request.
|
||||||
|
r.readOnly.recvAck(r.id, m.Entries[0].Data)
|
||||||
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
|
r.bcastHeartbeatWithCtx(m.Entries[0].Data)
|
||||||
case ReadOnlyLeaseBased:
|
case ReadOnlyLeaseBased:
|
||||||
ri := r.raftLog.committed
|
ri := r.raftLog.committed
|
||||||
@ -1045,7 +1011,7 @@ func stepLeader(r *raft, m pb.Message) error {
|
|||||||
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
|
r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { // there is only one voting member (the leader) in the cluster
|
} else { // only one voting member (the leader) in the cluster
|
||||||
if m.From == None || m.From == r.id { // from leader itself
|
if m.From == None || m.From == r.id { // from leader itself
|
||||||
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
|
r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
|
||||||
} else { // from learner member
|
} else { // from learner member
|
||||||
@ -1057,7 +1023,7 @@ func stepLeader(r *raft, m pb.Message) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// All other message types require a progress for m.From (pr).
|
// All other message types require a progress for m.From (pr).
|
||||||
pr := r.getProgress(m.From)
|
pr := r.prs.getProgress(m.From)
|
||||||
if pr == nil {
|
if pr == nil {
|
||||||
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
|
r.logger.Debugf("%x no progress available for %x", r.id, m.From)
|
||||||
return nil
|
return nil
|
||||||
@ -1133,8 +1099,7 @@ func stepLeader(r *raft, m pb.Message) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ackCount := r.readOnly.recvAck(m)
|
if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) {
|
||||||
if ackCount < r.quorum() {
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1231,17 +1196,17 @@ func stepCandidate(r *raft, m pb.Message) error {
|
|||||||
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
|
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
|
||||||
r.handleSnapshot(m)
|
r.handleSnapshot(m)
|
||||||
case myVoteRespType:
|
case myVoteRespType:
|
||||||
gr := r.poll(m.From, m.Type, !m.Reject)
|
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
|
||||||
r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
|
r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
|
||||||
switch r.quorum() {
|
switch res {
|
||||||
case gr:
|
case electionWon:
|
||||||
if r.state == StatePreCandidate {
|
if r.state == StatePreCandidate {
|
||||||
r.campaign(campaignElection)
|
r.campaign(campaignElection)
|
||||||
} else {
|
} else {
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.bcastAppend()
|
r.bcastAppend()
|
||||||
}
|
}
|
||||||
case len(r.votes) - gr:
|
case electionLost:
|
||||||
// pb.MsgPreVoteResp contains future term of pre-candidate
|
// pb.MsgPreVoteResp contains future term of pre-candidate
|
||||||
// m.Term > r.Term; reuse r.Term
|
// m.Term > r.Term; reuse r.Term
|
||||||
r.becomeFollower(r.Term, None)
|
r.becomeFollower(r.Term, None)
|
||||||
@ -1370,8 +1335,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
|||||||
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||||
|
|
||||||
r.raftLog.restore(s)
|
r.raftLog.restore(s)
|
||||||
r.prs = make(map[uint64]*Progress)
|
r.prs = makePRS(r.prs.maxInflight)
|
||||||
r.learnerPrs = make(map[uint64]*Progress)
|
|
||||||
r.restoreNode(s.Metadata.ConfState.Nodes, false)
|
r.restoreNode(s.Metadata.ConfState.Nodes, false)
|
||||||
r.restoreNode(s.Metadata.ConfState.Learners, true)
|
r.restoreNode(s.Metadata.ConfState.Learners, true)
|
||||||
return true
|
return true
|
||||||
@ -1384,16 +1348,16 @@ func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
|
|||||||
match = next - 1
|
match = next - 1
|
||||||
r.isLearner = isLearner
|
r.isLearner = isLearner
|
||||||
}
|
}
|
||||||
r.setProgress(n, match, next, isLearner)
|
r.prs.initProgress(n, match, next, isLearner)
|
||||||
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n))
|
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.getProgress(n))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// promotable indicates whether state machine can be promoted to leader,
|
// promotable indicates whether state machine can be promoted to leader,
|
||||||
// which is true when its own id is in progress list.
|
// which is true when its own id is in progress list.
|
||||||
func (r *raft) promotable() bool {
|
func (r *raft) promotable() bool {
|
||||||
_, ok := r.prs[r.id]
|
pr := r.prs.getProgress(r.id)
|
||||||
return ok
|
return pr != nil && !pr.IsLearner
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) addNode(id uint64) {
|
func (r *raft) addNode(id uint64) {
|
||||||
@ -1405,12 +1369,12 @@ func (r *raft) addLearner(id uint64) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
|
func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
|
||||||
pr := r.getProgress(id)
|
pr := r.prs.getProgress(id)
|
||||||
if pr == nil {
|
if pr == nil {
|
||||||
r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
|
r.prs.initProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
|
||||||
} else {
|
} else {
|
||||||
if isLearner && !pr.IsLearner {
|
if isLearner && !pr.IsLearner {
|
||||||
// can only change Learner to Voter
|
// Can only change Learner to Voter.
|
||||||
r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
|
r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -1421,10 +1385,11 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// change Learner to Voter, use origin Learner progress
|
// Change Learner to Voter, use origin Learner progress.
|
||||||
delete(r.learnerPrs, id)
|
r.prs.removeAny(id)
|
||||||
|
r.prs.initProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
|
||||||
pr.IsLearner = false
|
pr.IsLearner = false
|
||||||
r.prs[id] = pr
|
*r.prs.getProgress(id) = *pr
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.id == id {
|
if r.id == id {
|
||||||
@ -1434,18 +1399,20 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
|
|||||||
// When a node is first added, we should mark it as recently active.
|
// When a node is first added, we should mark it as recently active.
|
||||||
// Otherwise, CheckQuorum may cause us to step down if it is invoked
|
// Otherwise, CheckQuorum may cause us to step down if it is invoked
|
||||||
// before the added node has a chance to communicate with us.
|
// before the added node has a chance to communicate with us.
|
||||||
pr = r.getProgress(id)
|
r.prs.getProgress(id).RecentActive = true
|
||||||
pr.RecentActive = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) removeNode(id uint64) {
|
func (r *raft) removeNode(id uint64) {
|
||||||
r.delProgress(id)
|
r.prs.removeAny(id)
|
||||||
|
|
||||||
// do not try to commit or abort transferring if there is no nodes in the cluster.
|
// Do not try to commit or abort transferring if the cluster is now empty.
|
||||||
if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
|
if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(tbg): won't bad (or at least unfortunate) things happen if the
|
||||||
|
// leader just removed itself?
|
||||||
|
|
||||||
// The quorum size is now smaller, so see if any pending entries can
|
// The quorum size is now smaller, so see if any pending entries can
|
||||||
// be committed.
|
// be committed.
|
||||||
if r.maybeCommit() {
|
if r.maybeCommit() {
|
||||||
@ -1457,24 +1424,6 @@ func (r *raft) removeNode(id uint64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
|
|
||||||
if !isLearner {
|
|
||||||
delete(r.learnerPrs, id)
|
|
||||||
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, ok := r.prs[id]; ok {
|
|
||||||
panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id))
|
|
||||||
}
|
|
||||||
r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *raft) delProgress(id uint64) {
|
|
||||||
delete(r.prs, id)
|
|
||||||
delete(r.learnerPrs, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *raft) loadState(state pb.HardState) {
|
func (r *raft) loadState(state pb.HardState) {
|
||||||
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
|
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
|
||||||
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
|
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
|
||||||
@ -1495,29 +1444,6 @@ func (r *raft) resetRandomizedElectionTimeout() {
|
|||||||
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
|
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkQuorumActive returns true if the quorum is active from
|
|
||||||
// the view of the local raft state machine. Otherwise, it returns
|
|
||||||
// false.
|
|
||||||
// checkQuorumActive also resets all RecentActive to false.
|
|
||||||
func (r *raft) checkQuorumActive() bool {
|
|
||||||
var act int
|
|
||||||
|
|
||||||
r.forEachProgress(func(id uint64, pr *Progress) {
|
|
||||||
if id == r.id { // self is always active
|
|
||||||
act++
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if pr.RecentActive && !pr.IsLearner {
|
|
||||||
act++
|
|
||||||
}
|
|
||||||
|
|
||||||
pr.RecentActive = false
|
|
||||||
})
|
|
||||||
|
|
||||||
return act >= r.quorum()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *raft) sendTimeoutNow(to uint64) {
|
func (r *raft) sendTimeoutNow(to uint64) {
|
||||||
r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
|
r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
|
||||||
}
|
}
|
||||||
|
@ -29,11 +29,11 @@ func TestMsgAppFlowControlFull(t *testing.T) {
|
|||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
|
|
||||||
pr2 := r.prs[2]
|
pr2 := r.prs.nodes[2]
|
||||||
// force the progress to be in replicate state
|
// force the progress to be in replicate state
|
||||||
pr2.becomeReplicate()
|
pr2.becomeReplicate()
|
||||||
// fill in the inflights window
|
// fill in the inflights window
|
||||||
for i := 0; i < r.maxInflight; i++ {
|
for i := 0; i < r.prs.maxInflight; i++ {
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||||
ms := r.readMessages()
|
ms := r.readMessages()
|
||||||
if len(ms) != 1 {
|
if len(ms) != 1 {
|
||||||
@ -65,18 +65,18 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
|
|||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
|
|
||||||
pr2 := r.prs[2]
|
pr2 := r.prs.nodes[2]
|
||||||
// force the progress to be in replicate state
|
// force the progress to be in replicate state
|
||||||
pr2.becomeReplicate()
|
pr2.becomeReplicate()
|
||||||
// fill in the inflights window
|
// fill in the inflights window
|
||||||
for i := 0; i < r.maxInflight; i++ {
|
for i := 0; i < r.prs.maxInflight; i++ {
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||||
r.readMessages()
|
r.readMessages()
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1 is noop, 2 is the first proposal we just sent.
|
// 1 is noop, 2 is the first proposal we just sent.
|
||||||
// so we start with 2.
|
// so we start with 2.
|
||||||
for tt := 2; tt < r.maxInflight; tt++ {
|
for tt := 2; tt < r.prs.maxInflight; tt++ {
|
||||||
// move forward the window
|
// move forward the window
|
||||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)})
|
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)})
|
||||||
r.readMessages()
|
r.readMessages()
|
||||||
@ -110,11 +110,11 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
|
|||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
|
|
||||||
pr2 := r.prs[2]
|
pr2 := r.prs.nodes[2]
|
||||||
// force the progress to be in replicate state
|
// force the progress to be in replicate state
|
||||||
pr2.becomeReplicate()
|
pr2.becomeReplicate()
|
||||||
// fill in the inflights window
|
// fill in the inflights window
|
||||||
for i := 0; i < r.maxInflight; i++ {
|
for i := 0; i < r.prs.maxInflight; i++ {
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||||
r.readMessages()
|
r.readMessages()
|
||||||
}
|
}
|
||||||
|
@ -169,7 +169,7 @@ func testNonleaderStartElection(t *testing.T, state StateType) {
|
|||||||
if r.state != StateCandidate {
|
if r.state != StateCandidate {
|
||||||
t.Errorf("state = %s, want %s", r.state, StateCandidate)
|
t.Errorf("state = %s, want %s", r.state, StateCandidate)
|
||||||
}
|
}
|
||||||
if !r.votes[r.id] {
|
if !r.prs.votes[r.id] {
|
||||||
t.Errorf("vote for self = false, want true")
|
t.Errorf("vote for self = false, want true")
|
||||||
}
|
}
|
||||||
msgs := r.readMessages()
|
msgs := r.readMessages()
|
||||||
|
@ -40,11 +40,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
|
|||||||
|
|
||||||
// force set the next of node 2, so that
|
// force set the next of node 2, so that
|
||||||
// node 2 needs a snapshot
|
// node 2 needs a snapshot
|
||||||
sm.prs[2].Next = sm.raftLog.firstIndex()
|
sm.prs.nodes[2].Next = sm.raftLog.firstIndex()
|
||||||
|
|
||||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true})
|
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true})
|
||||||
if sm.prs[2].PendingSnapshot != 11 {
|
if sm.prs.nodes[2].PendingSnapshot != 11 {
|
||||||
t.Fatalf("PendingSnapshot = %d, want 11", sm.prs[2].PendingSnapshot)
|
t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.nodes[2].PendingSnapshot)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,7 +56,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
|
|||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
|
|
||||||
sm.prs[2].becomeSnapshot(11)
|
sm.prs.nodes[2].becomeSnapshot(11)
|
||||||
|
|
||||||
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||||
msgs := sm.readMessages()
|
msgs := sm.readMessages()
|
||||||
@ -73,18 +73,18 @@ func TestSnapshotFailure(t *testing.T) {
|
|||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
|
|
||||||
sm.prs[2].Next = 1
|
sm.prs.nodes[2].Next = 1
|
||||||
sm.prs[2].becomeSnapshot(11)
|
sm.prs.nodes[2].becomeSnapshot(11)
|
||||||
|
|
||||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
|
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
|
||||||
if sm.prs[2].PendingSnapshot != 0 {
|
if sm.prs.nodes[2].PendingSnapshot != 0 {
|
||||||
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot)
|
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
|
||||||
}
|
}
|
||||||
if sm.prs[2].Next != 1 {
|
if sm.prs.nodes[2].Next != 1 {
|
||||||
t.Fatalf("Next = %d, want 1", sm.prs[2].Next)
|
t.Fatalf("Next = %d, want 1", sm.prs.nodes[2].Next)
|
||||||
}
|
}
|
||||||
if !sm.prs[2].Paused {
|
if !sm.prs.nodes[2].Paused {
|
||||||
t.Errorf("Paused = %v, want true", sm.prs[2].Paused)
|
t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -96,18 +96,18 @@ func TestSnapshotSucceed(t *testing.T) {
|
|||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
|
|
||||||
sm.prs[2].Next = 1
|
sm.prs.nodes[2].Next = 1
|
||||||
sm.prs[2].becomeSnapshot(11)
|
sm.prs.nodes[2].becomeSnapshot(11)
|
||||||
|
|
||||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
|
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
|
||||||
if sm.prs[2].PendingSnapshot != 0 {
|
if sm.prs.nodes[2].PendingSnapshot != 0 {
|
||||||
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot)
|
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
|
||||||
}
|
}
|
||||||
if sm.prs[2].Next != 12 {
|
if sm.prs.nodes[2].Next != 12 {
|
||||||
t.Fatalf("Next = %d, want 12", sm.prs[2].Next)
|
t.Fatalf("Next = %d, want 12", sm.prs.nodes[2].Next)
|
||||||
}
|
}
|
||||||
if !sm.prs[2].Paused {
|
if !sm.prs.nodes[2].Paused {
|
||||||
t.Errorf("Paused = %v, want true", sm.prs[2].Paused)
|
t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -206,7 +206,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) {
|
|||||||
mustSend(n2, n1, pb.MsgAppResp)
|
mustSend(n2, n1, pb.MsgAppResp)
|
||||||
|
|
||||||
// Leader has correct state for follower.
|
// Leader has correct state for follower.
|
||||||
pr := n1.prs[2]
|
pr := n1.prs.nodes[2]
|
||||||
if pr.State != ProgressStateReplicate {
|
if pr.State != ProgressStateReplicate {
|
||||||
t.Fatalf("unexpected state %v", pr)
|
t.Fatalf("unexpected state %v", pr)
|
||||||
}
|
}
|
||||||
@ -227,23 +227,23 @@ func TestSnapshotAbort(t *testing.T) {
|
|||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
|
|
||||||
sm.prs[2].Next = 1
|
sm.prs.nodes[2].Next = 1
|
||||||
sm.prs[2].becomeSnapshot(11)
|
sm.prs.nodes[2].becomeSnapshot(11)
|
||||||
|
|
||||||
// A successful msgAppResp that has a higher/equal index than the
|
// A successful msgAppResp that has a higher/equal index than the
|
||||||
// pending snapshot should abort the pending snapshot.
|
// pending snapshot should abort the pending snapshot.
|
||||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
|
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
|
||||||
if sm.prs[2].PendingSnapshot != 0 {
|
if sm.prs.nodes[2].PendingSnapshot != 0 {
|
||||||
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot)
|
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
|
||||||
}
|
}
|
||||||
// The follower entered ProgressStateReplicate and the leader send an append
|
// The follower entered ProgressStateReplicate and the leader send an append
|
||||||
// and optimistically updated the progress (so we see 13 instead of 12).
|
// and optimistically updated the progress (so we see 13 instead of 12).
|
||||||
// There is something to append because the leader appended an empty entry
|
// There is something to append because the leader appended an empty entry
|
||||||
// to the log at index 12 when it assumed leadership.
|
// to the log at index 12 when it assumed leadership.
|
||||||
if sm.prs[2].Next != 13 {
|
if sm.prs.nodes[2].Next != 13 {
|
||||||
t.Fatalf("Next = %d, want 13", sm.prs[2].Next)
|
t.Fatalf("Next = %d, want 13", sm.prs.nodes[2].Next)
|
||||||
}
|
}
|
||||||
if n := sm.prs[2].ins.count; n != 1 {
|
if n := sm.prs.nodes[2].ins.count; n != 1 {
|
||||||
t.Fatalf("expected an inflight message, got %d", n)
|
t.Fatalf("expected an inflight message, got %d", n)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -271,12 +271,12 @@ func TestProgressLeader(t *testing.T) {
|
|||||||
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.prs[2].becomeReplicate()
|
r.prs.nodes[2].becomeReplicate()
|
||||||
|
|
||||||
// Send proposals to r1. The first 5 entries should be appended to the log.
|
// Send proposals to r1. The first 5 entries should be appended to the log.
|
||||||
propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
|
propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
if pr := r.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
|
if pr := r.prs.nodes[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
|
||||||
t.Errorf("unexpected progress %v", pr)
|
t.Errorf("unexpected progress %v", pr)
|
||||||
}
|
}
|
||||||
if err := r.Step(propMsg); err != nil {
|
if err := r.Step(propMsg); err != nil {
|
||||||
@ -291,17 +291,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
|
|||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
|
|
||||||
r.prs[2].Paused = true
|
r.prs.nodes[2].Paused = true
|
||||||
|
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||||
if !r.prs[2].Paused {
|
if !r.prs.nodes[2].Paused {
|
||||||
t.Errorf("paused = %v, want true", r.prs[2].Paused)
|
t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.prs[2].becomeReplicate()
|
r.prs.nodes[2].becomeReplicate()
|
||||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
|
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
|
||||||
if r.prs[2].Paused {
|
if r.prs.nodes[2].Paused {
|
||||||
t.Errorf("paused = %v, want false", r.prs[2].Paused)
|
t.Errorf("paused = %v, want false", r.prs.nodes[2].Paused)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -331,7 +331,7 @@ func TestProgressFlowControl(t *testing.T) {
|
|||||||
r.readMessages()
|
r.readMessages()
|
||||||
|
|
||||||
// While node 2 is in probe state, propose a bunch of entries.
|
// While node 2 is in probe state, propose a bunch of entries.
|
||||||
r.prs[2].becomeProbe()
|
r.prs.nodes[2].becomeProbe()
|
||||||
blob := []byte(strings.Repeat("a", 1000))
|
blob := []byte(strings.Repeat("a", 1000))
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
|
||||||
@ -409,8 +409,8 @@ func TestUncommittedEntryLimit(t *testing.T) {
|
|||||||
|
|
||||||
// Set the two followers to the replicate state. Commit to tail of log.
|
// Set the two followers to the replicate state. Commit to tail of log.
|
||||||
const numFollowers = 2
|
const numFollowers = 2
|
||||||
r.prs[2].becomeReplicate()
|
r.prs.nodes[2].becomeReplicate()
|
||||||
r.prs[3].becomeReplicate()
|
r.prs.nodes[3].becomeReplicate()
|
||||||
r.uncommittedSize = 0
|
r.uncommittedSize = 0
|
||||||
|
|
||||||
// Send proposals to r1. The first 5 entries should be appended to the log.
|
// Send proposals to r1. The first 5 entries should be appended to the log.
|
||||||
@ -889,7 +889,7 @@ func TestLearnerLogReplication(t *testing.T) {
|
|||||||
t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed)
|
t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed)
|
||||||
}
|
}
|
||||||
|
|
||||||
match := n1.getProgress(2).Match
|
match := n1.prs.getProgress(2).Match
|
||||||
if match != n2.raftLog.committed {
|
if match != n2.raftLog.committed {
|
||||||
t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match)
|
t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match)
|
||||||
}
|
}
|
||||||
@ -1351,8 +1351,9 @@ func TestCommit(t *testing.T) {
|
|||||||
storage.hardState = pb.HardState{Term: tt.smTerm}
|
storage.hardState = pb.HardState{Term: tt.smTerm}
|
||||||
|
|
||||||
sm := newTestRaft(1, []uint64{1}, 10, 2, storage)
|
sm := newTestRaft(1, []uint64{1}, 10, 2, storage)
|
||||||
|
sm.prs.removeAny(1)
|
||||||
for j := 0; j < len(tt.matches); j++ {
|
for j := 0; j < len(tt.matches); j++ {
|
||||||
sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false)
|
sm.prs.initProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false)
|
||||||
}
|
}
|
||||||
sm.maybeCommit()
|
sm.maybeCommit()
|
||||||
if g := sm.raftLog.committed; g != tt.w {
|
if g := sm.raftLog.committed; g != tt.w {
|
||||||
@ -2137,7 +2138,7 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) {
|
|||||||
nt := newNetwork(a, b)
|
nt := newNetwork(a, b)
|
||||||
setRandomizedElectionTimeout(b, b.electionTimeout+1)
|
setRandomizedElectionTimeout(b, b.electionTimeout+1)
|
||||||
// Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states
|
// Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states
|
||||||
b.delProgress(2)
|
b.prs.removeAny(2)
|
||||||
|
|
||||||
if b.promotable() {
|
if b.promotable() {
|
||||||
t.Fatalf("promotable = %v, want false", b.promotable())
|
t.Fatalf("promotable = %v, want false", b.promotable())
|
||||||
@ -2631,7 +2632,7 @@ func TestLeaderAppResp(t *testing.T) {
|
|||||||
sm.readMessages()
|
sm.readMessages()
|
||||||
sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
|
sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
|
||||||
|
|
||||||
p := sm.prs[2]
|
p := sm.prs.nodes[2]
|
||||||
if p.Match != tt.wmatch {
|
if p.Match != tt.wmatch {
|
||||||
t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
|
t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
|
||||||
}
|
}
|
||||||
@ -2678,9 +2679,9 @@ func TestBcastBeat(t *testing.T) {
|
|||||||
mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
|
mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
|
||||||
}
|
}
|
||||||
// slow follower
|
// slow follower
|
||||||
sm.prs[2].Match, sm.prs[2].Next = 5, 6
|
sm.prs.nodes[2].Match, sm.prs.nodes[2].Next = 5, 6
|
||||||
// normal follower
|
// normal follower
|
||||||
sm.prs[3].Match, sm.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
|
sm.prs.nodes[3].Match, sm.prs.nodes[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
|
||||||
|
|
||||||
sm.Step(pb.Message{Type: pb.MsgBeat})
|
sm.Step(pb.Message{Type: pb.MsgBeat})
|
||||||
msgs := sm.readMessages()
|
msgs := sm.readMessages()
|
||||||
@ -2688,8 +2689,8 @@ func TestBcastBeat(t *testing.T) {
|
|||||||
t.Fatalf("len(msgs) = %v, want 2", len(msgs))
|
t.Fatalf("len(msgs) = %v, want 2", len(msgs))
|
||||||
}
|
}
|
||||||
wantCommitMap := map[uint64]uint64{
|
wantCommitMap := map[uint64]uint64{
|
||||||
2: min(sm.raftLog.committed, sm.prs[2].Match),
|
2: min(sm.raftLog.committed, sm.prs.nodes[2].Match),
|
||||||
3: min(sm.raftLog.committed, sm.prs[3].Match),
|
3: min(sm.raftLog.committed, sm.prs.nodes[3].Match),
|
||||||
}
|
}
|
||||||
for i, m := range msgs {
|
for i, m := range msgs {
|
||||||
if m.Type != pb.MsgHeartbeat {
|
if m.Type != pb.MsgHeartbeat {
|
||||||
@ -2775,11 +2776,11 @@ func TestLeaderIncreaseNext(t *testing.T) {
|
|||||||
sm.raftLog.append(previousEnts...)
|
sm.raftLog.append(previousEnts...)
|
||||||
sm.becomeCandidate()
|
sm.becomeCandidate()
|
||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
sm.prs[2].State = tt.state
|
sm.prs.nodes[2].State = tt.state
|
||||||
sm.prs[2].Next = tt.next
|
sm.prs.nodes[2].Next = tt.next
|
||||||
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||||
|
|
||||||
p := sm.prs[2]
|
p := sm.prs.nodes[2]
|
||||||
if p.Next != tt.wnext {
|
if p.Next != tt.wnext {
|
||||||
t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
|
t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
|
||||||
}
|
}
|
||||||
@ -2791,7 +2792,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
|||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.readMessages()
|
r.readMessages()
|
||||||
r.prs[2].becomeProbe()
|
r.prs.nodes[2].becomeProbe()
|
||||||
|
|
||||||
// each round is a heartbeat
|
// each round is a heartbeat
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
@ -2810,8 +2811,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !r.prs[2].Paused {
|
if !r.prs.nodes[2].Paused {
|
||||||
t.Errorf("paused = %v, want true", r.prs[2].Paused)
|
t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
|
||||||
}
|
}
|
||||||
for j := 0; j < 10; j++ {
|
for j := 0; j < 10; j++ {
|
||||||
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
|
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
|
||||||
@ -2825,8 +2826,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
|||||||
for j := 0; j < r.heartbeatTimeout; j++ {
|
for j := 0; j < r.heartbeatTimeout; j++ {
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||||
}
|
}
|
||||||
if !r.prs[2].Paused {
|
if !r.prs.nodes[2].Paused {
|
||||||
t.Errorf("paused = %v, want true", r.prs[2].Paused)
|
t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
|
||||||
}
|
}
|
||||||
|
|
||||||
// consume the heartbeat
|
// consume the heartbeat
|
||||||
@ -2848,8 +2849,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
|||||||
if msg[0].Index != 0 {
|
if msg[0].Index != 0 {
|
||||||
t.Errorf("index = %d, want %d", msg[0].Index, 0)
|
t.Errorf("index = %d, want %d", msg[0].Index, 0)
|
||||||
}
|
}
|
||||||
if !r.prs[2].Paused {
|
if !r.prs.nodes[2].Paused {
|
||||||
t.Errorf("paused = %v, want true", r.prs[2].Paused)
|
t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2858,7 +2859,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
|
|||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.readMessages()
|
r.readMessages()
|
||||||
r.prs[2].becomeReplicate()
|
r.prs.nodes[2].becomeReplicate()
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
|
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
|
||||||
@ -2875,7 +2876,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
|
|||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.readMessages()
|
r.readMessages()
|
||||||
r.prs[2].becomeSnapshot(10)
|
r.prs.nodes[2].becomeSnapshot(10)
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
|
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
|
||||||
@ -2896,17 +2897,17 @@ func TestRecvMsgUnreachable(t *testing.T) {
|
|||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
r.readMessages()
|
r.readMessages()
|
||||||
// set node 2 to state replicate
|
// set node 2 to state replicate
|
||||||
r.prs[2].Match = 3
|
r.prs.nodes[2].Match = 3
|
||||||
r.prs[2].becomeReplicate()
|
r.prs.nodes[2].becomeReplicate()
|
||||||
r.prs[2].optimisticUpdate(5)
|
r.prs.nodes[2].optimisticUpdate(5)
|
||||||
|
|
||||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
|
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
|
||||||
|
|
||||||
if r.prs[2].State != ProgressStateProbe {
|
if r.prs.nodes[2].State != ProgressStateProbe {
|
||||||
t.Errorf("state = %s, want %s", r.prs[2].State, ProgressStateProbe)
|
t.Errorf("state = %s, want %s", r.prs.nodes[2].State, ProgressStateProbe)
|
||||||
}
|
}
|
||||||
if wnext := r.prs[2].Match + 1; r.prs[2].Next != wnext {
|
if wnext := r.prs.nodes[2].Match + 1; r.prs.nodes[2].Next != wnext {
|
||||||
t.Errorf("next = %d, want %d", r.prs[2].Next, wnext)
|
t.Errorf("next = %d, want %d", r.prs.nodes[2].Next, wnext)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2931,7 +2932,7 @@ func TestRestore(t *testing.T) {
|
|||||||
if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
|
if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
|
||||||
t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
|
t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
|
||||||
}
|
}
|
||||||
sg := sm.nodes()
|
sg := sm.prs.voterNodes()
|
||||||
if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) {
|
if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) {
|
||||||
t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes)
|
t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes)
|
||||||
}
|
}
|
||||||
@ -2963,22 +2964,22 @@ func TestRestoreWithLearner(t *testing.T) {
|
|||||||
if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
|
if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
|
||||||
t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
|
t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
|
||||||
}
|
}
|
||||||
sg := sm.nodes()
|
sg := sm.prs.voterNodes()
|
||||||
if len(sg) != len(s.Metadata.ConfState.Nodes) {
|
if len(sg) != len(s.Metadata.ConfState.Nodes) {
|
||||||
t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Nodes)
|
t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Nodes)
|
||||||
}
|
}
|
||||||
lns := sm.learnerNodes()
|
lns := sm.prs.learnerNodes()
|
||||||
if len(lns) != len(s.Metadata.ConfState.Learners) {
|
if len(lns) != len(s.Metadata.ConfState.Learners) {
|
||||||
t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners)
|
t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners)
|
||||||
}
|
}
|
||||||
for _, n := range s.Metadata.ConfState.Nodes {
|
for _, n := range s.Metadata.ConfState.Nodes {
|
||||||
if sm.prs[n].IsLearner {
|
if sm.prs.nodes[n].IsLearner {
|
||||||
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], false)
|
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, n := range s.Metadata.ConfState.Learners {
|
for _, n := range s.Metadata.ConfState.Learners {
|
||||||
if !sm.learnerPrs[n].IsLearner {
|
if !sm.prs.learners[n].IsLearner {
|
||||||
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], true)
|
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3120,8 +3121,8 @@ func TestProvideSnap(t *testing.T) {
|
|||||||
sm.becomeLeader()
|
sm.becomeLeader()
|
||||||
|
|
||||||
// force set the next of node 2, so that node 2 needs a snapshot
|
// force set the next of node 2, so that node 2 needs a snapshot
|
||||||
sm.prs[2].Next = sm.raftLog.firstIndex()
|
sm.prs.nodes[2].Next = sm.raftLog.firstIndex()
|
||||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs[2].Next - 1, Reject: true})
|
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true})
|
||||||
|
|
||||||
msgs := sm.readMessages()
|
msgs := sm.readMessages()
|
||||||
if len(msgs) != 1 {
|
if len(msgs) != 1 {
|
||||||
@ -3151,8 +3152,8 @@ func TestIgnoreProvidingSnap(t *testing.T) {
|
|||||||
|
|
||||||
// force set the next of node 2, so that node 2 needs a snapshot
|
// force set the next of node 2, so that node 2 needs a snapshot
|
||||||
// change node 2 to be inactive, expect node 1 ignore sending snapshot to 2
|
// change node 2 to be inactive, expect node 1 ignore sending snapshot to 2
|
||||||
sm.prs[2].Next = sm.raftLog.firstIndex() - 1
|
sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - 1
|
||||||
sm.prs[2].RecentActive = false
|
sm.prs.nodes[2].RecentActive = false
|
||||||
|
|
||||||
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||||
|
|
||||||
@ -3192,7 +3193,7 @@ func TestSlowNodeRestore(t *testing.T) {
|
|||||||
}
|
}
|
||||||
lead := nt.peers[1].(*raft)
|
lead := nt.peers[1].(*raft)
|
||||||
nextEnts(lead, nt.storage[1])
|
nextEnts(lead, nt.storage[1])
|
||||||
nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil)
|
nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil)
|
||||||
nt.storage[1].Compact(lead.raftLog.applied)
|
nt.storage[1].Compact(lead.raftLog.applied)
|
||||||
|
|
||||||
nt.recover()
|
nt.recover()
|
||||||
@ -3200,7 +3201,7 @@ func TestSlowNodeRestore(t *testing.T) {
|
|||||||
// node 3 will only be considered as active when node 1 receives a reply from it.
|
// node 3 will only be considered as active when node 1 receives a reply from it.
|
||||||
for {
|
for {
|
||||||
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||||
if lead.prs[3].RecentActive {
|
if lead.prs.nodes[3].RecentActive {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3287,7 +3288,7 @@ func TestNewLeaderPendingConfig(t *testing.T) {
|
|||||||
func TestAddNode(t *testing.T) {
|
func TestAddNode(t *testing.T) {
|
||||||
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||||
r.addNode(2)
|
r.addNode(2)
|
||||||
nodes := r.nodes()
|
nodes := r.prs.voterNodes()
|
||||||
wnodes := []uint64{1, 2}
|
wnodes := []uint64{1, 2}
|
||||||
if !reflect.DeepEqual(nodes, wnodes) {
|
if !reflect.DeepEqual(nodes, wnodes) {
|
||||||
t.Errorf("nodes = %v, want %v", nodes, wnodes)
|
t.Errorf("nodes = %v, want %v", nodes, wnodes)
|
||||||
@ -3298,13 +3299,13 @@ func TestAddNode(t *testing.T) {
|
|||||||
func TestAddLearner(t *testing.T) {
|
func TestAddLearner(t *testing.T) {
|
||||||
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
|
||||||
r.addLearner(2)
|
r.addLearner(2)
|
||||||
nodes := r.learnerNodes()
|
nodes := r.prs.learnerNodes()
|
||||||
wnodes := []uint64{2}
|
wnodes := []uint64{2}
|
||||||
if !reflect.DeepEqual(nodes, wnodes) {
|
if !reflect.DeepEqual(nodes, wnodes) {
|
||||||
t.Errorf("nodes = %v, want %v", nodes, wnodes)
|
t.Errorf("nodes = %v, want %v", nodes, wnodes)
|
||||||
}
|
}
|
||||||
if !r.learnerPrs[2].IsLearner {
|
if !r.prs.learners[2].IsLearner {
|
||||||
t.Errorf("node 2 is learner %t, want %t", r.prs[2].IsLearner, true)
|
t.Errorf("node 2 is learner %t, want %t", r.prs.nodes[2].IsLearner, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3348,14 +3349,14 @@ func TestRemoveNode(t *testing.T) {
|
|||||||
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
|
||||||
r.removeNode(2)
|
r.removeNode(2)
|
||||||
w := []uint64{1}
|
w := []uint64{1}
|
||||||
if g := r.nodes(); !reflect.DeepEqual(g, w) {
|
if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
|
||||||
t.Errorf("nodes = %v, want %v", g, w)
|
t.Errorf("nodes = %v, want %v", g, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove all nodes from cluster
|
// remove all nodes from cluster
|
||||||
r.removeNode(1)
|
r.removeNode(1)
|
||||||
w = []uint64{}
|
w = []uint64{}
|
||||||
if g := r.nodes(); !reflect.DeepEqual(g, w) {
|
if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
|
||||||
t.Errorf("nodes = %v, want %v", g, w)
|
t.Errorf("nodes = %v, want %v", g, w)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3366,18 +3367,18 @@ func TestRemoveLearner(t *testing.T) {
|
|||||||
r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
|
||||||
r.removeNode(2)
|
r.removeNode(2)
|
||||||
w := []uint64{1}
|
w := []uint64{1}
|
||||||
if g := r.nodes(); !reflect.DeepEqual(g, w) {
|
if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
|
||||||
t.Errorf("nodes = %v, want %v", g, w)
|
t.Errorf("nodes = %v, want %v", g, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
w = []uint64{}
|
w = []uint64{}
|
||||||
if g := r.learnerNodes(); !reflect.DeepEqual(g, w) {
|
if g := r.prs.learnerNodes(); !reflect.DeepEqual(g, w) {
|
||||||
t.Errorf("nodes = %v, want %v", g, w)
|
t.Errorf("nodes = %v, want %v", g, w)
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove all nodes from cluster
|
// remove all nodes from cluster
|
||||||
r.removeNode(1)
|
r.removeNode(1)
|
||||||
if g := r.nodes(); !reflect.DeepEqual(g, w) {
|
if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
|
||||||
t.Errorf("nodes = %v, want %v", g, w)
|
t.Errorf("nodes = %v, want %v", g, w)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3416,8 +3417,8 @@ func TestRaftNodes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage())
|
r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage())
|
||||||
if !reflect.DeepEqual(r.nodes(), tt.wids) {
|
if !reflect.DeepEqual(r.prs.voterNodes(), tt.wids) {
|
||||||
t.Errorf("#%d: nodes = %+v, want %+v", i, r.nodes(), tt.wids)
|
t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.voterNodes(), tt.wids)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3618,8 +3619,8 @@ func TestLeaderTransferToSlowFollower(t *testing.T) {
|
|||||||
|
|
||||||
nt.recover()
|
nt.recover()
|
||||||
lead := nt.peers[1].(*raft)
|
lead := nt.peers[1].(*raft)
|
||||||
if lead.prs[3].Match != 1 {
|
if lead.prs.nodes[3].Match != 1 {
|
||||||
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1)
|
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transfer leadership to 3 when node 3 is lack of log.
|
// Transfer leadership to 3 when node 3 is lack of log.
|
||||||
@ -3637,12 +3638,12 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) {
|
|||||||
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
|
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
|
||||||
lead := nt.peers[1].(*raft)
|
lead := nt.peers[1].(*raft)
|
||||||
nextEnts(lead, nt.storage[1])
|
nextEnts(lead, nt.storage[1])
|
||||||
nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.nodes()}, nil)
|
nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil)
|
||||||
nt.storage[1].Compact(lead.raftLog.applied)
|
nt.storage[1].Compact(lead.raftLog.applied)
|
||||||
|
|
||||||
nt.recover()
|
nt.recover()
|
||||||
if lead.prs[3].Match != 1 {
|
if lead.prs.nodes[3].Match != 1 {
|
||||||
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs[3].Match, 1)
|
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transfer leadership to 3 when node 3 is lack of snapshot.
|
// Transfer leadership to 3 when node 3 is lack of snapshot.
|
||||||
@ -3721,8 +3722,8 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) {
|
|||||||
t.Fatalf("should return drop proposal error while transferring")
|
t.Fatalf("should return drop proposal error while transferring")
|
||||||
}
|
}
|
||||||
|
|
||||||
if lead.prs[1].Match != 1 {
|
if lead.prs.nodes[1].Match != 1 {
|
||||||
t.Fatalf("node 1 has match %x, want %x", lead.prs[1].Match, 1)
|
t.Fatalf("node 1 has match %x, want %x", lead.prs.nodes[1].Match, 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4294,18 +4295,18 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
|
|||||||
sm := newRaft(cfg)
|
sm := newRaft(cfg)
|
||||||
npeers[id] = sm
|
npeers[id] = sm
|
||||||
case *raft:
|
case *raft:
|
||||||
learners := make(map[uint64]bool, len(v.learnerPrs))
|
learners := make(map[uint64]bool, len(v.prs.learners))
|
||||||
for i := range v.learnerPrs {
|
for i := range v.prs.learners {
|
||||||
learners[i] = true
|
learners[i] = true
|
||||||
}
|
}
|
||||||
v.id = id
|
v.id = id
|
||||||
v.prs = make(map[uint64]*Progress)
|
v.prs.nodes = make(map[uint64]*Progress)
|
||||||
v.learnerPrs = make(map[uint64]*Progress)
|
v.prs.learners = make(map[uint64]*Progress)
|
||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
if _, ok := learners[peerAddrs[i]]; ok {
|
if _, ok := learners[peerAddrs[i]]; ok {
|
||||||
v.learnerPrs[peerAddrs[i]] = &Progress{IsLearner: true}
|
v.prs.learners[peerAddrs[i]] = &Progress{IsLearner: true}
|
||||||
} else {
|
} else {
|
||||||
v.prs[peerAddrs[i]] = &Progress{}
|
v.prs.nodes[peerAddrs[i]] = &Progress{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
v.reset(v.Term)
|
v.reset(v.Term)
|
||||||
|
@ -166,7 +166,7 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
|
|||||||
// ApplyConfChange applies a config change to the local node.
|
// ApplyConfChange applies a config change to the local node.
|
||||||
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
|
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
|
||||||
if cc.NodeID == None {
|
if cc.NodeID == None {
|
||||||
return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()}
|
return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()}
|
||||||
}
|
}
|
||||||
switch cc.Type {
|
switch cc.Type {
|
||||||
case pb.ConfChangeAddNode:
|
case pb.ConfChangeAddNode:
|
||||||
@ -179,7 +179,7 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
|
|||||||
default:
|
default:
|
||||||
panic("unexpected conf type")
|
panic("unexpected conf type")
|
||||||
}
|
}
|
||||||
return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()}
|
return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step advances the state machine using the given message.
|
// Step advances the state machine using the given message.
|
||||||
@ -188,7 +188,7 @@ func (rn *RawNode) Step(m pb.Message) error {
|
|||||||
if IsLocalMsg(m.Type) {
|
if IsLocalMsg(m.Type) {
|
||||||
return ErrStepLocalMsg
|
return ErrStepLocalMsg
|
||||||
}
|
}
|
||||||
if pr := rn.raft.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
|
if pr := rn.raft.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
|
||||||
return rn.raft.Step(m)
|
return rn.raft.Step(m)
|
||||||
}
|
}
|
||||||
return ErrStepPeerNotFound
|
return ErrStepPeerNotFound
|
||||||
@ -257,16 +257,15 @@ const (
|
|||||||
// WithProgress is a helper to introspect the Progress for this node and its
|
// WithProgress is a helper to introspect the Progress for this node and its
|
||||||
// peers.
|
// peers.
|
||||||
func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) {
|
func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) {
|
||||||
for id, pr := range rn.raft.prs {
|
rn.raft.prs.visit(func(id uint64, pr *Progress) {
|
||||||
pr := *pr
|
typ := ProgressTypePeer
|
||||||
pr.ins = nil
|
if pr.IsLearner {
|
||||||
visitor(id, ProgressTypePeer, pr)
|
typ = ProgressTypeLearner
|
||||||
}
|
}
|
||||||
for id, pr := range rn.raft.learnerPrs {
|
p := *pr
|
||||||
pr := *pr
|
p.ins = nil
|
||||||
pr.ins = nil
|
visitor(id, typ, p)
|
||||||
visitor(id, ProgressTypeLearner, pr)
|
})
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReportUnreachable reports the given node is not reachable for the last send.
|
// ReportUnreachable reports the given node is not reachable for the last send.
|
||||||
|
@ -50,26 +50,25 @@ func newReadOnly(option ReadOnlyOption) *readOnly {
|
|||||||
// the read only request.
|
// the read only request.
|
||||||
// `m` is the original read only request message from the local or remote node.
|
// `m` is the original read only request message from the local or remote node.
|
||||||
func (ro *readOnly) addRequest(index uint64, m pb.Message) {
|
func (ro *readOnly) addRequest(index uint64, m pb.Message) {
|
||||||
ctx := string(m.Entries[0].Data)
|
s := string(m.Entries[0].Data)
|
||||||
if _, ok := ro.pendingReadIndex[ctx]; ok {
|
if _, ok := ro.pendingReadIndex[s]; ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
|
ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
|
||||||
ro.readIndexQueue = append(ro.readIndexQueue, ctx)
|
ro.readIndexQueue = append(ro.readIndexQueue, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
// recvAck notifies the readonly struct that the raft state machine received
|
// recvAck notifies the readonly struct that the raft state machine received
|
||||||
// an acknowledgment of the heartbeat that attached with the read only request
|
// an acknowledgment of the heartbeat that attached with the read only request
|
||||||
// context.
|
// context.
|
||||||
func (ro *readOnly) recvAck(m pb.Message) int {
|
func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} {
|
||||||
rs, ok := ro.pendingReadIndex[string(m.Context)]
|
rs, ok := ro.pendingReadIndex[string(context)]
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
rs.acks[m.From] = struct{}{}
|
rs.acks[id] = struct{}{}
|
||||||
// add one to include an ack from local node
|
return rs.acks
|
||||||
return len(rs.acks) + 1
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// advance advances the read only request queue kept by the readonly struct.
|
// advance advances the read only request queue kept by the readonly struct.
|
||||||
|
@ -33,15 +33,18 @@ type Status struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func getProgressCopy(r *raft) map[uint64]Progress {
|
func getProgressCopy(r *raft) map[uint64]Progress {
|
||||||
prs := make(map[uint64]Progress)
|
m := make(map[uint64]Progress)
|
||||||
for id, p := range r.prs {
|
r.prs.visit(func(id uint64, pr *Progress) {
|
||||||
prs[id] = *p
|
var p Progress
|
||||||
}
|
p, pr = *pr, nil /* avoid accidental reuse below */
|
||||||
|
|
||||||
for id, p := range r.learnerPrs {
|
// The inflight buffer is tricky to copy and besides, it isn't exposed
|
||||||
prs[id] = *p
|
// to the client, so pretend it's nil.
|
||||||
}
|
p.ins = nil
|
||||||
return prs
|
|
||||||
|
m[id] = p
|
||||||
|
})
|
||||||
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func getStatusWithoutProgress(r *raft) Status {
|
func getStatusWithoutProgress(r *raft) Status {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user