mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: use membership sets in progress tracking
Instead of having disjoint mappings of ID to *Progress for voters and learners, use a map[id]struct{} for each and share a map of *Progress among them. This is easier to handle when joint quorums are introduced, at which point a node may be a voting member of two quorums.
This commit is contained in:
parent
76c8ca5a55
commit
3def2364e4
@ -291,8 +291,9 @@ func (in *inflights) reset() {
|
||||
// known about the nodes and learners in it. In particular, it tracks the match
|
||||
// index for each peer which in turn allows reasoning about the committed index.
|
||||
type progressTracker struct {
|
||||
nodes map[uint64]*Progress
|
||||
learners map[uint64]*Progress
|
||||
nodes map[uint64]struct{}
|
||||
learners map[uint64]struct{}
|
||||
prs map[uint64]*Progress
|
||||
|
||||
votes map[uint64]bool
|
||||
|
||||
@ -303,8 +304,9 @@ type progressTracker struct {
|
||||
func makePRS(maxInflight int) progressTracker {
|
||||
p := progressTracker{
|
||||
maxInflight: maxInflight,
|
||||
nodes: map[uint64]*Progress{},
|
||||
learners: map[uint64]*Progress{},
|
||||
prs: map[uint64]*Progress{},
|
||||
nodes: map[uint64]struct{}{},
|
||||
learners: map[uint64]struct{}{},
|
||||
votes: map[uint64]bool{},
|
||||
}
|
||||
return p
|
||||
@ -334,8 +336,8 @@ func (p *progressTracker) committed() uint64 {
|
||||
}
|
||||
p.matchBuf = p.matchBuf[:len(p.nodes)]
|
||||
idx := 0
|
||||
for _, pr := range p.nodes {
|
||||
p.matchBuf[idx] = pr.Match
|
||||
for id := range p.nodes {
|
||||
p.matchBuf[idx] = p.prs[id].Match
|
||||
idx++
|
||||
}
|
||||
sort.Sort(&p.matchBuf)
|
||||
@ -343,50 +345,44 @@ func (p *progressTracker) committed() uint64 {
|
||||
}
|
||||
|
||||
func (p *progressTracker) removeAny(id uint64) {
|
||||
pN := p.nodes[id]
|
||||
pL := p.learners[id]
|
||||
_, okPR := p.prs[id]
|
||||
_, okV := p.nodes[id]
|
||||
_, okL := p.learners[id]
|
||||
|
||||
if pN == nil && pL == nil {
|
||||
if !okPR {
|
||||
panic("attempting to remove unknown peer %x")
|
||||
} else if pN != nil && pL != nil {
|
||||
} else if !okV && !okL {
|
||||
panic("attempting to remove unknown peer %x")
|
||||
} else if okV && okL {
|
||||
panic(fmt.Sprintf("peer %x is both voter and learner", id))
|
||||
}
|
||||
|
||||
delete(p.nodes, id)
|
||||
delete(p.learners, id)
|
||||
delete(p.prs, id)
|
||||
}
|
||||
|
||||
// initProgress initializes a new progress for the given node or learner. The
|
||||
// node may not exist yet in either form or a panic will ensue.
|
||||
func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
|
||||
if pr := p.nodes[id]; pr != nil {
|
||||
if pr := p.prs[id]; pr != nil {
|
||||
panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
|
||||
}
|
||||
if pr := p.learners[id]; pr != nil {
|
||||
panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr))
|
||||
}
|
||||
if !isLearner {
|
||||
p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)}
|
||||
return
|
||||
p.nodes[id] = struct{}{}
|
||||
} else {
|
||||
p.learners[id] = struct{}{}
|
||||
}
|
||||
p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true}
|
||||
p.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: isLearner}
|
||||
}
|
||||
|
||||
func (p *progressTracker) getProgress(id uint64) *Progress {
|
||||
if pr, ok := p.nodes[id]; ok {
|
||||
return pr
|
||||
}
|
||||
|
||||
return p.learners[id]
|
||||
return p.prs[id]
|
||||
}
|
||||
|
||||
// visit invokes the supplied closure for all tracked progresses.
|
||||
func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
|
||||
for id, pr := range p.nodes {
|
||||
f(id, pr)
|
||||
}
|
||||
|
||||
for id, pr := range p.learners {
|
||||
for id, pr := range p.prs {
|
||||
f(id, pr)
|
||||
}
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ func TestMsgAppFlowControlFull(t *testing.T) {
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
|
||||
pr2 := r.prs.nodes[2]
|
||||
pr2 := r.prs.prs[2]
|
||||
// force the progress to be in replicate state
|
||||
pr2.becomeReplicate()
|
||||
// fill in the inflights window
|
||||
@ -65,7 +65,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
|
||||
pr2 := r.prs.nodes[2]
|
||||
pr2 := r.prs.prs[2]
|
||||
// force the progress to be in replicate state
|
||||
pr2.becomeReplicate()
|
||||
// fill in the inflights window
|
||||
@ -110,7 +110,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
|
||||
pr2 := r.prs.nodes[2]
|
||||
pr2 := r.prs.prs[2]
|
||||
// force the progress to be in replicate state
|
||||
pr2.becomeReplicate()
|
||||
// fill in the inflights window
|
||||
|
@ -40,11 +40,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
|
||||
|
||||
// force set the next of node 2, so that
|
||||
// node 2 needs a snapshot
|
||||
sm.prs.nodes[2].Next = sm.raftLog.firstIndex()
|
||||
sm.prs.prs[2].Next = sm.raftLog.firstIndex()
|
||||
|
||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true})
|
||||
if sm.prs.nodes[2].PendingSnapshot != 11 {
|
||||
t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.nodes[2].PendingSnapshot)
|
||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true})
|
||||
if sm.prs.prs[2].PendingSnapshot != 11 {
|
||||
t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.prs[2].PendingSnapshot)
|
||||
}
|
||||
}
|
||||
|
||||
@ -56,7 +56,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
|
||||
sm.becomeCandidate()
|
||||
sm.becomeLeader()
|
||||
|
||||
sm.prs.nodes[2].becomeSnapshot(11)
|
||||
sm.prs.prs[2].becomeSnapshot(11)
|
||||
|
||||
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||
msgs := sm.readMessages()
|
||||
@ -73,18 +73,18 @@ func TestSnapshotFailure(t *testing.T) {
|
||||
sm.becomeCandidate()
|
||||
sm.becomeLeader()
|
||||
|
||||
sm.prs.nodes[2].Next = 1
|
||||
sm.prs.nodes[2].becomeSnapshot(11)
|
||||
sm.prs.prs[2].Next = 1
|
||||
sm.prs.prs[2].becomeSnapshot(11)
|
||||
|
||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
|
||||
if sm.prs.nodes[2].PendingSnapshot != 0 {
|
||||
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
|
||||
if sm.prs.prs[2].PendingSnapshot != 0 {
|
||||
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
|
||||
}
|
||||
if sm.prs.nodes[2].Next != 1 {
|
||||
t.Fatalf("Next = %d, want 1", sm.prs.nodes[2].Next)
|
||||
if sm.prs.prs[2].Next != 1 {
|
||||
t.Fatalf("Next = %d, want 1", sm.prs.prs[2].Next)
|
||||
}
|
||||
if !sm.prs.nodes[2].Paused {
|
||||
t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused)
|
||||
if !sm.prs.prs[2].Paused {
|
||||
t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused)
|
||||
}
|
||||
}
|
||||
|
||||
@ -96,18 +96,18 @@ func TestSnapshotSucceed(t *testing.T) {
|
||||
sm.becomeCandidate()
|
||||
sm.becomeLeader()
|
||||
|
||||
sm.prs.nodes[2].Next = 1
|
||||
sm.prs.nodes[2].becomeSnapshot(11)
|
||||
sm.prs.prs[2].Next = 1
|
||||
sm.prs.prs[2].becomeSnapshot(11)
|
||||
|
||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
|
||||
if sm.prs.nodes[2].PendingSnapshot != 0 {
|
||||
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
|
||||
if sm.prs.prs[2].PendingSnapshot != 0 {
|
||||
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
|
||||
}
|
||||
if sm.prs.nodes[2].Next != 12 {
|
||||
t.Fatalf("Next = %d, want 12", sm.prs.nodes[2].Next)
|
||||
if sm.prs.prs[2].Next != 12 {
|
||||
t.Fatalf("Next = %d, want 12", sm.prs.prs[2].Next)
|
||||
}
|
||||
if !sm.prs.nodes[2].Paused {
|
||||
t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused)
|
||||
if !sm.prs.prs[2].Paused {
|
||||
t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused)
|
||||
}
|
||||
}
|
||||
|
||||
@ -206,7 +206,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) {
|
||||
mustSend(n2, n1, pb.MsgAppResp)
|
||||
|
||||
// Leader has correct state for follower.
|
||||
pr := n1.prs.nodes[2]
|
||||
pr := n1.prs.prs[2]
|
||||
if pr.State != ProgressStateReplicate {
|
||||
t.Fatalf("unexpected state %v", pr)
|
||||
}
|
||||
@ -227,23 +227,23 @@ func TestSnapshotAbort(t *testing.T) {
|
||||
sm.becomeCandidate()
|
||||
sm.becomeLeader()
|
||||
|
||||
sm.prs.nodes[2].Next = 1
|
||||
sm.prs.nodes[2].becomeSnapshot(11)
|
||||
sm.prs.prs[2].Next = 1
|
||||
sm.prs.prs[2].becomeSnapshot(11)
|
||||
|
||||
// A successful msgAppResp that has a higher/equal index than the
|
||||
// pending snapshot should abort the pending snapshot.
|
||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
|
||||
if sm.prs.nodes[2].PendingSnapshot != 0 {
|
||||
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
|
||||
if sm.prs.prs[2].PendingSnapshot != 0 {
|
||||
t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
|
||||
}
|
||||
// The follower entered ProgressStateReplicate and the leader send an append
|
||||
// and optimistically updated the progress (so we see 13 instead of 12).
|
||||
// There is something to append because the leader appended an empty entry
|
||||
// to the log at index 12 when it assumed leadership.
|
||||
if sm.prs.nodes[2].Next != 13 {
|
||||
t.Fatalf("Next = %d, want 13", sm.prs.nodes[2].Next)
|
||||
if sm.prs.prs[2].Next != 13 {
|
||||
t.Fatalf("Next = %d, want 13", sm.prs.prs[2].Next)
|
||||
}
|
||||
if n := sm.prs.nodes[2].ins.count; n != 1 {
|
||||
if n := sm.prs.prs[2].ins.count; n != 1 {
|
||||
t.Fatalf("expected an inflight message, got %d", n)
|
||||
}
|
||||
}
|
||||
|
@ -271,12 +271,12 @@ func TestProgressLeader(t *testing.T) {
|
||||
r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
r.prs.nodes[2].becomeReplicate()
|
||||
r.prs.prs[2].becomeReplicate()
|
||||
|
||||
// Send proposals to r1. The first 5 entries should be appended to the log.
|
||||
propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
|
||||
for i := 0; i < 5; i++ {
|
||||
if pr := r.prs.nodes[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
|
||||
if pr := r.prs.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
|
||||
t.Errorf("unexpected progress %v", pr)
|
||||
}
|
||||
if err := r.Step(propMsg); err != nil {
|
||||
@ -291,17 +291,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
|
||||
r.prs.nodes[2].Paused = true
|
||||
r.prs.prs[2].Paused = true
|
||||
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||
if !r.prs.nodes[2].Paused {
|
||||
t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
|
||||
if !r.prs.prs[2].Paused {
|
||||
t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
|
||||
}
|
||||
|
||||
r.prs.nodes[2].becomeReplicate()
|
||||
r.prs.prs[2].becomeReplicate()
|
||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
|
||||
if r.prs.nodes[2].Paused {
|
||||
t.Errorf("paused = %v, want false", r.prs.nodes[2].Paused)
|
||||
if r.prs.prs[2].Paused {
|
||||
t.Errorf("paused = %v, want false", r.prs.prs[2].Paused)
|
||||
}
|
||||
}
|
||||
|
||||
@ -331,7 +331,7 @@ func TestProgressFlowControl(t *testing.T) {
|
||||
r.readMessages()
|
||||
|
||||
// While node 2 is in probe state, propose a bunch of entries.
|
||||
r.prs.nodes[2].becomeProbe()
|
||||
r.prs.prs[2].becomeProbe()
|
||||
blob := []byte(strings.Repeat("a", 1000))
|
||||
for i := 0; i < 10; i++ {
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
|
||||
@ -409,8 +409,8 @@ func TestUncommittedEntryLimit(t *testing.T) {
|
||||
|
||||
// Set the two followers to the replicate state. Commit to tail of log.
|
||||
const numFollowers = 2
|
||||
r.prs.nodes[2].becomeReplicate()
|
||||
r.prs.nodes[3].becomeReplicate()
|
||||
r.prs.prs[2].becomeReplicate()
|
||||
r.prs.prs[3].becomeReplicate()
|
||||
r.uncommittedSize = 0
|
||||
|
||||
// Send proposals to r1. The first 5 entries should be appended to the log.
|
||||
@ -2632,7 +2632,7 @@ func TestLeaderAppResp(t *testing.T) {
|
||||
sm.readMessages()
|
||||
sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
|
||||
|
||||
p := sm.prs.nodes[2]
|
||||
p := sm.prs.prs[2]
|
||||
if p.Match != tt.wmatch {
|
||||
t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
|
||||
}
|
||||
@ -2679,9 +2679,9 @@ func TestBcastBeat(t *testing.T) {
|
||||
mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
|
||||
}
|
||||
// slow follower
|
||||
sm.prs.nodes[2].Match, sm.prs.nodes[2].Next = 5, 6
|
||||
sm.prs.prs[2].Match, sm.prs.prs[2].Next = 5, 6
|
||||
// normal follower
|
||||
sm.prs.nodes[3].Match, sm.prs.nodes[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
|
||||
sm.prs.prs[3].Match, sm.prs.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
|
||||
|
||||
sm.Step(pb.Message{Type: pb.MsgBeat})
|
||||
msgs := sm.readMessages()
|
||||
@ -2689,8 +2689,8 @@ func TestBcastBeat(t *testing.T) {
|
||||
t.Fatalf("len(msgs) = %v, want 2", len(msgs))
|
||||
}
|
||||
wantCommitMap := map[uint64]uint64{
|
||||
2: min(sm.raftLog.committed, sm.prs.nodes[2].Match),
|
||||
3: min(sm.raftLog.committed, sm.prs.nodes[3].Match),
|
||||
2: min(sm.raftLog.committed, sm.prs.prs[2].Match),
|
||||
3: min(sm.raftLog.committed, sm.prs.prs[3].Match),
|
||||
}
|
||||
for i, m := range msgs {
|
||||
if m.Type != pb.MsgHeartbeat {
|
||||
@ -2776,11 +2776,11 @@ func TestLeaderIncreaseNext(t *testing.T) {
|
||||
sm.raftLog.append(previousEnts...)
|
||||
sm.becomeCandidate()
|
||||
sm.becomeLeader()
|
||||
sm.prs.nodes[2].State = tt.state
|
||||
sm.prs.nodes[2].Next = tt.next
|
||||
sm.prs.prs[2].State = tt.state
|
||||
sm.prs.prs[2].Next = tt.next
|
||||
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||
|
||||
p := sm.prs.nodes[2]
|
||||
p := sm.prs.prs[2]
|
||||
if p.Next != tt.wnext {
|
||||
t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
|
||||
}
|
||||
@ -2792,7 +2792,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
r.readMessages()
|
||||
r.prs.nodes[2].becomeProbe()
|
||||
r.prs.prs[2].becomeProbe()
|
||||
|
||||
// each round is a heartbeat
|
||||
for i := 0; i < 3; i++ {
|
||||
@ -2811,8 +2811,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
if !r.prs.nodes[2].Paused {
|
||||
t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
|
||||
if !r.prs.prs[2].Paused {
|
||||
t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
|
||||
}
|
||||
for j := 0; j < 10; j++ {
|
||||
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
|
||||
@ -2826,8 +2826,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
||||
for j := 0; j < r.heartbeatTimeout; j++ {
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||
}
|
||||
if !r.prs.nodes[2].Paused {
|
||||
t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
|
||||
if !r.prs.prs[2].Paused {
|
||||
t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
|
||||
}
|
||||
|
||||
// consume the heartbeat
|
||||
@ -2849,8 +2849,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
||||
if msg[0].Index != 0 {
|
||||
t.Errorf("index = %d, want %d", msg[0].Index, 0)
|
||||
}
|
||||
if !r.prs.nodes[2].Paused {
|
||||
t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
|
||||
if !r.prs.prs[2].Paused {
|
||||
t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
|
||||
}
|
||||
}
|
||||
|
||||
@ -2859,7 +2859,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
r.readMessages()
|
||||
r.prs.nodes[2].becomeReplicate()
|
||||
r.prs.prs[2].becomeReplicate()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
|
||||
@ -2876,7 +2876,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
r.readMessages()
|
||||
r.prs.nodes[2].becomeSnapshot(10)
|
||||
r.prs.prs[2].becomeSnapshot(10)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
|
||||
@ -2897,17 +2897,17 @@ func TestRecvMsgUnreachable(t *testing.T) {
|
||||
r.becomeLeader()
|
||||
r.readMessages()
|
||||
// set node 2 to state replicate
|
||||
r.prs.nodes[2].Match = 3
|
||||
r.prs.nodes[2].becomeReplicate()
|
||||
r.prs.nodes[2].optimisticUpdate(5)
|
||||
r.prs.prs[2].Match = 3
|
||||
r.prs.prs[2].becomeReplicate()
|
||||
r.prs.prs[2].optimisticUpdate(5)
|
||||
|
||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
|
||||
|
||||
if r.prs.nodes[2].State != ProgressStateProbe {
|
||||
t.Errorf("state = %s, want %s", r.prs.nodes[2].State, ProgressStateProbe)
|
||||
if r.prs.prs[2].State != ProgressStateProbe {
|
||||
t.Errorf("state = %s, want %s", r.prs.prs[2].State, ProgressStateProbe)
|
||||
}
|
||||
if wnext := r.prs.nodes[2].Match + 1; r.prs.nodes[2].Next != wnext {
|
||||
t.Errorf("next = %d, want %d", r.prs.nodes[2].Next, wnext)
|
||||
if wnext := r.prs.prs[2].Match + 1; r.prs.prs[2].Next != wnext {
|
||||
t.Errorf("next = %d, want %d", r.prs.prs[2].Next, wnext)
|
||||
}
|
||||
}
|
||||
|
||||
@ -2973,13 +2973,13 @@ func TestRestoreWithLearner(t *testing.T) {
|
||||
t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners)
|
||||
}
|
||||
for _, n := range s.Metadata.ConfState.Nodes {
|
||||
if sm.prs.nodes[n].IsLearner {
|
||||
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], false)
|
||||
if sm.prs.prs[n].IsLearner {
|
||||
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], false)
|
||||
}
|
||||
}
|
||||
for _, n := range s.Metadata.ConfState.Learners {
|
||||
if !sm.prs.learners[n].IsLearner {
|
||||
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], true)
|
||||
if !sm.prs.prs[n].IsLearner {
|
||||
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], true)
|
||||
}
|
||||
}
|
||||
|
||||
@ -3121,8 +3121,8 @@ func TestProvideSnap(t *testing.T) {
|
||||
sm.becomeLeader()
|
||||
|
||||
// force set the next of node 2, so that node 2 needs a snapshot
|
||||
sm.prs.nodes[2].Next = sm.raftLog.firstIndex()
|
||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true})
|
||||
sm.prs.prs[2].Next = sm.raftLog.firstIndex()
|
||||
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true})
|
||||
|
||||
msgs := sm.readMessages()
|
||||
if len(msgs) != 1 {
|
||||
@ -3152,8 +3152,8 @@ func TestIgnoreProvidingSnap(t *testing.T) {
|
||||
|
||||
// force set the next of node 2, so that node 2 needs a snapshot
|
||||
// change node 2 to be inactive, expect node 1 ignore sending snapshot to 2
|
||||
sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - 1
|
||||
sm.prs.nodes[2].RecentActive = false
|
||||
sm.prs.prs[2].Next = sm.raftLog.firstIndex() - 1
|
||||
sm.prs.prs[2].RecentActive = false
|
||||
|
||||
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||
|
||||
@ -3201,7 +3201,7 @@ func TestSlowNodeRestore(t *testing.T) {
|
||||
// node 3 will only be considered as active when node 1 receives a reply from it.
|
||||
for {
|
||||
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||
if lead.prs.nodes[3].RecentActive {
|
||||
if lead.prs.prs[3].RecentActive {
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -3304,8 +3304,8 @@ func TestAddLearner(t *testing.T) {
|
||||
if !reflect.DeepEqual(nodes, wnodes) {
|
||||
t.Errorf("nodes = %v, want %v", nodes, wnodes)
|
||||
}
|
||||
if !r.prs.learners[2].IsLearner {
|
||||
t.Errorf("node 2 is learner %t, want %t", r.prs.nodes[2].IsLearner, true)
|
||||
if !r.prs.prs[2].IsLearner {
|
||||
t.Errorf("node 2 is learner %t, want %t", r.prs.prs[2].IsLearner, true)
|
||||
}
|
||||
}
|
||||
|
||||
@ -3619,8 +3619,8 @@ func TestLeaderTransferToSlowFollower(t *testing.T) {
|
||||
|
||||
nt.recover()
|
||||
lead := nt.peers[1].(*raft)
|
||||
if lead.prs.nodes[3].Match != 1 {
|
||||
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1)
|
||||
if lead.prs.prs[3].Match != 1 {
|
||||
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1)
|
||||
}
|
||||
|
||||
// Transfer leadership to 3 when node 3 is lack of log.
|
||||
@ -3642,8 +3642,8 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) {
|
||||
nt.storage[1].Compact(lead.raftLog.applied)
|
||||
|
||||
nt.recover()
|
||||
if lead.prs.nodes[3].Match != 1 {
|
||||
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1)
|
||||
if lead.prs.prs[3].Match != 1 {
|
||||
t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1)
|
||||
}
|
||||
|
||||
// Transfer leadership to 3 when node 3 is lack of snapshot.
|
||||
@ -3722,8 +3722,8 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) {
|
||||
t.Fatalf("should return drop proposal error while transferring")
|
||||
}
|
||||
|
||||
if lead.prs.nodes[1].Match != 1 {
|
||||
t.Fatalf("node 1 has match %x, want %x", lead.prs.nodes[1].Match, 1)
|
||||
if lead.prs.prs[1].Match != 1 {
|
||||
t.Fatalf("node 1 has match %x, want %x", lead.prs.prs[1].Match, 1)
|
||||
}
|
||||
}
|
||||
|
||||
@ -4334,14 +4334,18 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
|
||||
learners[i] = true
|
||||
}
|
||||
v.id = id
|
||||
v.prs.nodes = make(map[uint64]*Progress)
|
||||
v.prs.learners = make(map[uint64]*Progress)
|
||||
v.prs.nodes = make(map[uint64]struct{})
|
||||
v.prs.learners = make(map[uint64]struct{})
|
||||
v.prs.prs = make(map[uint64]*Progress)
|
||||
for i := 0; i < size; i++ {
|
||||
pr := &Progress{}
|
||||
if _, ok := learners[peerAddrs[i]]; ok {
|
||||
v.prs.learners[peerAddrs[i]] = &Progress{IsLearner: true}
|
||||
pr.IsLearner = true
|
||||
v.prs.learners[peerAddrs[i]] = struct{}{}
|
||||
} else {
|
||||
v.prs.nodes[peerAddrs[i]] = &Progress{}
|
||||
v.prs.nodes[peerAddrs[i]] = struct{}{}
|
||||
}
|
||||
v.prs.prs[peerAddrs[i]] = pr
|
||||
}
|
||||
v.reset(v.Term)
|
||||
npeers[id] = v
|
||||
|
Loading…
x
Reference in New Issue
Block a user