raft: introduce progress states

This commit is contained in:
Yicheng Qin 2015-03-16 00:46:16 -07:00
parent 862c16e821
commit 67194c0b22
4 changed files with 329 additions and 196 deletions

View File

@ -49,36 +49,88 @@ func (st StateType) String() string {
return stmap[uint64(st)]
}
const (
ProgressStateProbe ProgressStateType = iota
ProgressStateReplicate
ProgressStateSnapshot
)
type ProgressStateType uint64
var prstmap = [...]string{
"ProgressStateProbe",
"ProgressStateReplicate",
"ProgressStateSnapshot",
}
func (st ProgressStateType) String() string { return prstmap[uint64(st)] }
type Progress struct {
Match, Next uint64
Wait int
// If the last sent to the Progress failed and reported
// by the link layer via MsgUnreachable, Unreachable will be set.
// If the Progress is unreachable, snapshot and optimistically append
// will be disabled.
// Unreachable will be unset if raft starts to receive message (msgAppResp,
// msgHeartbeatResp) from the remote peer of the Progress.
Unreachable bool
// When in ProgressStateProbe, leader sends at most one replication message
// per heartbeat interval. It also probes actual progress of the follower.
//
// When in ProgressStateReplicate, leader optimistically increases next
// to the latest entry sent after sending replication message. This is
// an optimized state for fast replicating log entries to the follower.
//
// When in ProgressStateSnapshot, leader should have sent out snapshot
// before and stops sending any replication message.
State ProgressStateType
// Paused is used in ProgressStateProbe.
// When Paused is true, raft should pause sending replication message to this peer.
Paused bool
// PendingSnapshot is used in ProgressStateSnapshot.
// If there is a pending snapshot, the pendingSnapshot will be set to the
// index of the snapshot. If pendingSnapshot is set, the replication process of
// this Progress will be paused. raft will not resend snapshot until the pending one
// is reported to be failed.
//
// PendingSnapshot is set when raft sends out a snapshot to this Progress.
// PendingSnapshot is unset when the snapshot is reported to be successfully,
// or raft updates an equal or higher Match for this Progress.
PendingSnapshot uint64
}
func (pr *Progress) update(n uint64) {
pr.waitReset()
func (pr *Progress) resetState(state ProgressStateType) {
pr.Paused = false
pr.PendingSnapshot = 0
pr.State = state
}
func (pr *Progress) becomeProbe() {
// If the original state is ProgressStateSnapshot, progress knows that
// the pending snapshot has been sent to this peer successfully, then
// probes from pendingSnapshot + 1.
if pr.State == ProgressStateSnapshot {
pendingSnapshot := pr.PendingSnapshot
pr.resetState(ProgressStateProbe)
pr.Next = max(pr.Match+1, pendingSnapshot+1)
} else {
pr.resetState(ProgressStateProbe)
pr.Next = pr.Match + 1
}
}
func (pr *Progress) becomeReplicate() {
pr.resetState(ProgressStateReplicate)
pr.Next = pr.Match + 1
}
func (pr *Progress) becomeSnapshot(snapshoti uint64) {
pr.resetState(ProgressStateSnapshot)
pr.PendingSnapshot = snapshoti
}
// maybeUpdate returns false if the given n index comes from an outdated message.
// Otherwise it updates the progress and returns true.
func (pr *Progress) maybeUpdate(n uint64) bool {
var updated bool
if pr.Match < n {
pr.Match = n
updated = true
pr.resume()
}
if pr.Next < n+1 {
pr.Next = n + 1
}
return updated
}
func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
@ -86,9 +138,7 @@ func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
// maybeDecrTo returns false if the given to index comes from an out of order message.
// Otherwise it decreases the progress next index to min(rejected, last) and returns true.
func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
pr.waitReset()
if pr.Match != 0 {
if pr.State == ProgressStateReplicate {
// the rejection must be stale if the progress has matched and "rejected"
// is smaller than "match".
if rejected <= pr.Match {
@ -107,61 +157,28 @@ func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
if pr.Next = min(rejected, last+1); pr.Next < 1 {
pr.Next = 1
}
pr.resume()
return true
}
func (pr *Progress) waitDecr(i int) {
pr.Wait -= i
if pr.Wait < 0 {
pr.Wait = 0
}
}
func (pr *Progress) waitSet(w int) { pr.Wait = w }
func (pr *Progress) waitReset() { pr.Wait = 0 }
func (pr *Progress) isUnreachable() bool { return pr.Unreachable }
func (pr *Progress) reachable() { pr.Unreachable = false }
func (pr *Progress) pause() { pr.Paused = true }
func (pr *Progress) resume() { pr.Paused = false }
func (pr *Progress) unreachable() {
pr.Unreachable = true
// When in optimistic appending path, if the remote becomes unreachable,
// there is big probability that it loses MsgApp. Fall back to bad
// path to recover it steadily.
if pr.Match != 0 {
pr.Next = pr.Match + 1
}
// isPaused returns whether progress stops sending message.
func (pr *Progress) isPaused() bool {
return pr.State == ProgressStateProbe && pr.Paused || pr.State == ProgressStateSnapshot
}
func (pr *Progress) shouldWait() bool { return (pr.Unreachable || pr.Match == 0) && pr.Wait > 0 }
func (pr *Progress) hasPendingSnapshot() bool { return pr.PendingSnapshot != 0 }
func (pr *Progress) setPendingSnapshot(i uint64) { pr.PendingSnapshot = i }
// finishSnapshot unsets the pending snapshot and optimistically increase Next to
// the index of pendingSnapshot + 1. The next replication message is expected
// to be msgApp.
func (pr *Progress) snapshotFinish() {
pr.Next = pr.PendingSnapshot + 1
pr.PendingSnapshot = 0
}
// snapshotFail unsets the pending snapshot. The next replication message is expected
// to be another msgSnap.
func (pr *Progress) snapshotFail() {
pr.PendingSnapshot = 0
}
func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 }
// maybeSnapshotAbort unsets pendingSnapshot if Match is equal or higher than
// the pendingSnapshot
func (pr *Progress) maybeSnapshotAbort() bool {
if pr.hasPendingSnapshot() && pr.Match >= pr.PendingSnapshot {
pr.PendingSnapshot = 0
return true
}
return false
return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot
}
func (pr *Progress) String() string {
return fmt.Sprintf("next = %d, match = %d, wait = %v", pr.Next, pr.Match, pr.Wait)
return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d", pr.Next, pr.Match, pr.State, pr.isPaused(), pr.PendingSnapshot)
}
type raft struct {
@ -273,18 +290,12 @@ func (r *raft) send(m pb.Message) {
// sendAppend sends RRPC, with entries to the given peer.
func (r *raft) sendAppend(to uint64) {
pr := r.prs[to]
if pr.shouldWait() || pr.hasPendingSnapshot() {
if pr.isPaused() {
return
}
m := pb.Message{}
m.To = to
if r.needSnapshot(pr.Next) {
if pr.isUnreachable() {
// do not try to send snapshot until the Progress is
// reachable
return
}
m.Type = pb.MsgSnap
snapshot, err := r.raftLog.snapshot()
if err != nil {
@ -297,7 +308,7 @@ func (r *raft) sendAppend(to uint64) {
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
raftLogger.Infof("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
pr.setPendingSnapshot(sindex)
pr.becomeSnapshot(sindex)
raftLogger.Infof("raft: %x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
@ -305,12 +316,16 @@ func (r *raft) sendAppend(to uint64) {
m.LogTerm = r.raftLog.term(pr.Next - 1)
m.Entries = r.raftLog.entries(pr.Next)
m.Commit = r.raftLog.committed
// optimistically increase the next if the follower
// has been matched.
if n := len(m.Entries); pr.Match != 0 && !pr.isUnreachable() && n != 0 {
pr.optimisticUpdate(m.Entries[n-1].Index)
} else if pr.Match == 0 || pr.isUnreachable() {
pr.waitSet(r.heartbeatTimeout)
if n := len(m.Entries); n != 0 {
switch pr.State {
// optimistically increase the next when in ProgressStateReplicate
case ProgressStateReplicate:
pr.optimisticUpdate(m.Entries[n-1].Index)
case ProgressStateProbe:
pr.pause()
default:
raftLogger.Panicf("raft: %x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
r.send(m)
@ -351,7 +366,7 @@ func (r *raft) bcastHeartbeat() {
continue
}
r.sendHeartbeat(i)
r.prs[i].waitDecr(r.heartbeatTimeout)
r.prs[i].resume()
}
}
@ -390,7 +405,7 @@ func (r *raft) appendEntry(es ...pb.Entry) {
es[i].Index = li + 1 + uint64(i)
}
r.raftLog.append(es...)
r.prs[r.id].update(r.raftLog.lastIndex())
r.prs[r.id].maybeUpdate(r.raftLog.lastIndex())
r.maybeCommit()
}
@ -547,36 +562,37 @@ func stepLeader(r *raft, m pb.Message) {
r.appendEntry(m.Entries...)
r.bcastAppend()
case pb.MsgAppResp:
if pr.isUnreachable() {
pr.reachable()
raftLogger.Infof("raft: %x received msgAppResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
}
if m.Reject {
raftLogger.Infof("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.maybeDecrTo(m.Index, m.RejectHint) {
raftLogger.Infof("raft: %x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
r.sendAppend(m.From)
}
} else {
oldWait := pr.shouldWait()
pr.update(m.Index)
if r.prs[m.From].maybeSnapshotAbort() {
raftLogger.Infof("raft: %x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
if r.maybeCommit() {
r.bcastAppend()
} else if oldWait {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r.sendAppend(m.From)
oldPaused := pr.isPaused()
if pr.maybeUpdate(m.Index) {
switch {
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
raftLogger.Infof("raft: %x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
}
if r.maybeCommit() {
r.bcastAppend()
} else if oldPaused {
// update() reset the wait state on this node. If we had delayed sending
// an update before, send it now.
r.sendAppend(m.From)
}
}
}
case pb.MsgHeartbeatResp:
if pr.isUnreachable() {
pr.reachable()
raftLogger.Infof("raft: %x received msgHeartbeatResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
}
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
@ -585,24 +601,28 @@ func stepLeader(r *raft, m pb.Message) {
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
case pb.MsgSnapStatus:
if !pr.hasPendingSnapshot() {
if pr.State != ProgressStateSnapshot {
return
}
if m.Reject {
pr.snapshotFail()
raftLogger.Infof("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
pr.snapshotFinish()
if !m.Reject {
pr.becomeProbe()
raftLogger.Infof("raft: %x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
// wait for the msgAppResp from the remote node before sending
// out the next msgApp
pr.waitSet(r.electionTimeout)
} else {
pr.snapshotFailure()
pr.becomeProbe()
raftLogger.Infof("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
// If snapshot finish, wait for the msgAppResp from the remote node before sending
// out the next msgApp.
// If snapshot failure, wait for a heartbeat interval before next try
pr.pause()
case pb.MsgUnreachable:
if !pr.isUnreachable() {
pr.unreachable()
raftLogger.Infof("raft: %x failed to send message to %x and changed it to be unreachable [%s]", r.id, m.From, pr)
// During optimistic replication, if the remote becomes unreachable,
// there is huge probability that a MsgApp is lost.
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
raftLogger.Infof("raft: %x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
}
}

View File

@ -56,7 +56,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()
sm.prs[2].setPendingSnapshot(11)
sm.prs[2].becomeSnapshot(11)
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
msgs := sm.readMessages()
@ -74,7 +74,7 @@ func TestSnapshotFailure(t *testing.T) {
sm.becomeLeader()
sm.prs[2].Next = 1
sm.prs[2].setPendingSnapshot(11)
sm.prs[2].becomeSnapshot(11)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
if sm.prs[2].PendingSnapshot != 0 {
@ -83,6 +83,9 @@ func TestSnapshotFailure(t *testing.T) {
if sm.prs[2].Next != 1 {
t.Fatalf("Next = %d, want 1", sm.prs[2].Next)
}
if sm.prs[2].Paused != true {
t.Errorf("Paused = %v, want true", sm.prs[2].Paused)
}
}
func TestSnapshotSucceed(t *testing.T) {
@ -94,7 +97,7 @@ func TestSnapshotSucceed(t *testing.T) {
sm.becomeLeader()
sm.prs[2].Next = 1
sm.prs[2].setPendingSnapshot(11)
sm.prs[2].becomeSnapshot(11)
sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
if sm.prs[2].PendingSnapshot != 0 {
@ -103,6 +106,9 @@ func TestSnapshotSucceed(t *testing.T) {
if sm.prs[2].Next != 12 {
t.Fatalf("Next = %d, want 12", sm.prs[2].Next)
}
if sm.prs[2].Paused != true {
t.Errorf("Paused = %v, want true", sm.prs[2].Paused)
}
}
func TestSnapshotAbort(t *testing.T) {
@ -114,7 +120,7 @@ func TestSnapshotAbort(t *testing.T) {
sm.becomeLeader()
sm.prs[2].Next = 1
sm.prs[2].setPendingSnapshot(11)
sm.prs[2].becomeSnapshot(11)
// A successful msgAppResp that has a higher/equal index than the
// pending snapshot should abort the pending snapshot.

View File

@ -48,25 +48,94 @@ func (r *raft) readMessages() []pb.Message {
return msgs
}
func TestBecomeProbe(t *testing.T) {
match := uint64(1)
tests := []struct {
p *Progress
wnext uint64
}{
{
&Progress{State: ProgressStateReplicate, Match: match, Next: 5},
2,
},
{
// snapshot finish
&Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 10},
11,
},
{
// snapshot failure
&Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 0},
2,
},
}
for i, tt := range tests {
tt.p.becomeProbe()
if tt.p.State != ProgressStateProbe {
t.Errorf("#%d: state = %s, want %s", i, tt.p.State, ProgressStateProbe)
}
if tt.p.Match != match {
t.Errorf("#%d: match = %d, want %d", i, tt.p.Match, match)
}
if tt.p.Next != tt.wnext {
t.Errorf("#%d: next = %d, want %d", i, tt.p.Next, tt.wnext)
}
}
}
func TestBecomeReplicate(t *testing.T) {
p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5}
p.becomeReplicate()
if p.State != ProgressStateReplicate {
t.Errorf("state = %s, want %s", p.State, ProgressStateReplicate)
}
if p.Match != 1 {
t.Errorf("match = %d, want 1", p.Match)
}
if w := p.Match + 1; p.Next != w {
t.Errorf("next = %d, want %d", p.Next, w)
}
}
func TestBecomeSnapshot(t *testing.T) {
p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5}
p.becomeSnapshot(10)
if p.State != ProgressStateSnapshot {
t.Errorf("state = %s, want %s", p.State, ProgressStateSnapshot)
}
if p.Match != 1 {
t.Errorf("match = %d, want 1", p.Match)
}
if p.PendingSnapshot != 10 {
t.Errorf("pendingSnapshot = %d, want 10", p.PendingSnapshot)
}
}
func TestProgressUpdate(t *testing.T) {
prevM, prevN := uint64(3), uint64(5)
tests := []struct {
update uint64
wm uint64
wn uint64
wm uint64
wn uint64
wok bool
}{
{prevM - 1, prevM, prevN}, // do not decrease match, next
{prevM, prevM, prevN}, // do not decrease next
{prevM + 1, prevM + 1, prevN}, // increase match, do not decrease next
{prevM + 2, prevM + 2, prevN + 1}, // increase match, next
{prevM - 1, prevM, prevN, false}, // do not decrease match, next
{prevM, prevM, prevN, false}, // do not decrease next
{prevM + 1, prevM + 1, prevN, true}, // increase match, do not decrease next
{prevM + 2, prevM + 2, prevN + 1, true}, // increase match, next
}
for i, tt := range tests {
p := &Progress{
Match: prevM,
Next: prevN,
}
p.update(tt.update)
ok := p.maybeUpdate(tt.update)
if ok != tt.wok {
t.Errorf("#%d: ok= %v, want %v", i, ok, tt.wok)
}
if p.Match != tt.wm {
t.Errorf("#%d: match= %d, want %d", i, p.Match, tt.wm)
}
@ -78,6 +147,7 @@ func TestProgressUpdate(t *testing.T) {
func TestProgressMaybeDecr(t *testing.T) {
tests := []struct {
state ProgressStateType
m uint64
n uint64
rejected uint64
@ -87,54 +157,50 @@ func TestProgressMaybeDecr(t *testing.T) {
wn uint64
}{
{
// match != 0 is always false
1, 0, 0, 0, false, 0,
// state replicate and rejected is not greater than match
ProgressStateReplicate, 5, 10, 5, 5, false, 10,
},
{
// match != 0 and to is greater than match
// state replicate and rejected is not greater than match
ProgressStateReplicate, 5, 10, 4, 4, false, 10,
},
{
// state replicate and rejected is greater than match
// directly decrease to match+1
5, 10, 5, 5, false, 10,
},
{
// match != 0 and to is greater than match
// directly decrease to match+1
5, 10, 4, 4, false, 10,
},
{
// match != 0 and to is not greater than match
5, 10, 9, 9, true, 6,
ProgressStateReplicate, 5, 10, 9, 9, true, 6,
},
{
// next-1 != rejected is always false
0, 0, 0, 0, false, 0,
ProgressStateProbe, 0, 0, 0, 0, false, 0,
},
{
// next-1 != rejected is always false
0, 10, 5, 5, false, 10,
ProgressStateProbe, 0, 10, 5, 5, false, 10,
},
{
// next>1 = decremented by 1
0, 10, 9, 9, true, 9,
ProgressStateProbe, 0, 10, 9, 9, true, 9,
},
{
// next>1 = decremented by 1
0, 2, 1, 1, true, 1,
ProgressStateProbe, 0, 2, 1, 1, true, 1,
},
{
// next<=1 = reset to 1
0, 1, 0, 0, true, 1,
ProgressStateProbe, 0, 1, 0, 0, true, 1,
},
{
// decrease to min(rejected, last+1)
0, 10, 9, 2, true, 3,
ProgressStateProbe, 0, 10, 9, 2, true, 3,
},
{
// rejected < 1, reset to 1
0, 10, 9, 0, true, 1,
ProgressStateProbe, 0, 10, 9, 0, true, 1,
},
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
Match: tt.m,
Next: tt.n,
}
@ -150,61 +216,63 @@ func TestProgressMaybeDecr(t *testing.T) {
}
}
func TestProgressShouldWait(t *testing.T) {
func TestProgressIsPaused(t *testing.T) {
tests := []struct {
m uint64
wait int
state ProgressStateType
paused bool
w bool
}{
// match != 0 is always not wait
{1, 0, false},
{1, 1, false},
{0, 1, true},
{0, 0, false},
{ProgressStateProbe, false, false},
{ProgressStateProbe, true, true},
{ProgressStateReplicate, false, false},
{ProgressStateReplicate, true, false},
{ProgressStateSnapshot, false, true},
{ProgressStateSnapshot, true, true},
}
for i, tt := range tests {
p := &Progress{
Match: tt.m,
Wait: tt.wait,
State: tt.state,
Paused: tt.paused,
}
if g := p.shouldWait(); g != tt.w {
if g := p.isPaused(); g != tt.w {
t.Errorf("#%d: shouldwait = %t, want %t", i, g, tt.w)
}
}
}
// TestProgressWaitReset ensures that progress.Update and progress.DercTo
// will reset progress.wait.
func TestProgressWaitReset(t *testing.T) {
// TestProgressResume ensures that progress.maybeUpdate and progress.maybeDecrTo
// will reset progress.paused.
func TestProgressResume(t *testing.T) {
p := &Progress{
Wait: 1,
Next: 2,
Paused: true,
}
p.maybeDecrTo(1, 1)
if p.Wait != 0 {
t.Errorf("wait= %d, want 0", p.Wait)
if p.Paused != false {
t.Errorf("paused= %v, want false", p.Paused)
}
p.Wait = 1
p.update(2)
if p.Wait != 0 {
t.Errorf("wait= %d, want 0", p.Wait)
p.Paused = true
p.maybeUpdate(2)
if p.Paused != false {
t.Errorf("paused= %v, want false", p.Paused)
}
}
// TestProgressDecr ensures raft.heartbeat decreases progress.wait by heartbeat.
func TestProgressDecr(t *testing.T) {
// TestProgressResumeByHeartbeat ensures raft.heartbeat reset progress.paused by heartbeat.
func TestProgressResumeByHeartbeat(t *testing.T) {
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
r.becomeCandidate()
r.becomeLeader()
r.prs[2].Wait = r.heartbeatTimeout * 2
r.prs[2].Paused = true
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
if r.prs[2].Wait != r.heartbeatTimeout*(2-1) {
t.Errorf("wait = %d, want %d", r.prs[2].Wait, r.heartbeatTimeout*(2-1))
if r.prs[2].Paused != false {
t.Errorf("paused = %v, want false", r.prs[2].Paused)
}
}
func TestProgressWait(t *testing.T) {
func TestProgressPaused(t *testing.T) {
r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
r.becomeCandidate()
r.becomeLeader()
@ -1262,16 +1330,16 @@ func TestLeaderIncreaseNext(t *testing.T) {
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
tests := []struct {
// progress
match uint64
state ProgressStateType
next uint64
wnext uint64
}{
// match is not zero, optimistically increase next
// state replicate, optimistically increase next
// previous entries + noop entry + propose + 1
{1, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
// match is zero, not optimistically increase next
{0, 2, 2},
{ProgressStateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
// state probe, not optimistically increase next
{ProgressStateProbe, 2, 2},
}
for i, tt := range tests {
@ -1279,7 +1347,8 @@ func TestLeaderIncreaseNext(t *testing.T) {
sm.raftLog.append(previousEnts...)
sm.becomeCandidate()
sm.becomeLeader()
sm.prs[2].Match, sm.prs[2].Next = tt.match, tt.next
sm.prs[2].State = tt.state
sm.prs[2].Next = tt.next
sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
p := sm.prs[2]
@ -1289,41 +1358,32 @@ func TestLeaderIncreaseNext(t *testing.T) {
}
}
func TestUnreachable(t *testing.T) {
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
s := NewMemoryStorage()
s.Append(previousEnts)
r := newRaft(1, []uint64{1, 2}, 10, 1, s, 0)
func TestSendAppendForProgressProbe(t *testing.T) {
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.prs[2].becomeProbe()
// set node 2 to unreachable
r.prs[2].Match = 3
r.prs[2].Next = 5
r.prs[2].Wait = 0
r.prs[2].unreachable()
if wnext := r.prs[2].Match + 1; r.prs[2].Next != wnext {
t.Errorf("next = %d, want %d", r.prs[2].Next, wnext)
}
// each round is a heartbeat
for i := 0; i < 3; i++ {
// node 2 is unreachable, we expect that raft will only send out one msgAPP per heartbeat timeout
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
// we expect that raft will only send out one msgAPP per heartbeat timeout
r.appendEntry(pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
msg := r.readMessages()
if len(msg) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msg), 1)
}
if msg[0].Index != 3 {
t.Errorf("index = %d, want %d", msg[0].Index, 3)
if msg[0].Index != 0 {
t.Errorf("index = %d, want %d", msg[0].Index, 0)
}
if r.prs[2].Wait != r.heartbeatTimeout {
t.Errorf("wait = %d, want %d", r.prs[1].Wait, r.heartbeatTimeout)
if r.prs[2].Paused != true {
t.Errorf("paused = %v, want true", r.prs[2].Paused)
}
for j := 0; j < 10; j++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.appendEntry(pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
if l := len(r.readMessages()); l != 0 {
t.Errorf("len(msg) = %d, want %d", l, 0)
}
@ -1342,11 +1402,18 @@ func TestUnreachable(t *testing.T) {
t.Errorf("type = %s, want %s", msg[0].Type, pb.MsgHeartbeat)
}
}
}
func TestSendAppendForProgressReplicate(t *testing.T) {
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.prs[2].becomeReplicate()
// recover node 2
r.prs[2].reachable()
for i := 0; i < 10; i++ {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
r.appendEntry(pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
msgs := r.readMessages()
if len(msgs) != 1 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
@ -1354,6 +1421,46 @@ func TestUnreachable(t *testing.T) {
}
}
func TestSendAppendForProgressSnapshot(t *testing.T) {
r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
r.prs[2].becomeSnapshot(10)
for i := 0; i < 10; i++ {
r.appendEntry(pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
msgs := r.readMessages()
if len(msgs) != 0 {
t.Errorf("len(msg) = %d, want %d", len(msgs), 0)
}
}
}
func TestRecvMsgUnreachable(t *testing.T) {
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
s := NewMemoryStorage()
s.Append(previousEnts)
r := newRaft(1, []uint64{1, 2}, 10, 1, s, 0)
r.becomeCandidate()
r.becomeLeader()
r.readMessages()
// set node 2 to state replicate
r.prs[2].Match = 3
r.prs[2].becomeReplicate()
r.prs[2].optimisticUpdate(5)
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
if r.prs[2].State != ProgressStateProbe {
t.Errorf("state = %s, want %s", r.prs[2].State, ProgressStateProbe)
}
if wnext := r.prs[2].Match + 1; r.prs[2].Next != wnext {
t.Errorf("next = %d, want %d", r.prs[2].Next, wnext)
}
}
func TestRestore(t *testing.T) {
s := pb.Snapshot{
Metadata: pb.SnapshotMetadata{

View File

@ -57,7 +57,7 @@ func (s Status) MarshalJSON() ([]byte, error) {
j += "}}"
} else {
for k, v := range s.Progress {
subj := fmt.Sprintf(`"%x":{"match":%d,"next":%d,"unreachable":%t},`, k, v.Match, v.Next, v.Unreachable)
subj := fmt.Sprintf(`"%x":{"match":%d,"next":%d,"state":%s},`, k, v.Match, v.Next, v.State)
j += subj
}
// remove the trailing ","