From e625400f1d6f4db8f3929f19a409d44cb0395135 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Mon, 19 Dec 2016 15:24:21 -0500 Subject: [PATCH] raft: resume paused followers on receipt of MsgHeartbeatResp Previously, paused followers were resumed upon sending a MsgHearbeat. Fixes #7037 --- raft/raft.go | 2 +- raft/raft_test.go | 82 +++++++++++++++++++++++++++-------------------- 2 files changed, 48 insertions(+), 36 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 288989d16..1ac1a2b32 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -469,7 +469,6 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) { continue } r.sendHeartbeat(id, ctx) - r.prs[id].resume() } } @@ -898,6 +897,7 @@ func stepLeader(r *raft, m pb.Message) { } case pb.MsgHeartbeatResp: pr.RecentActive = true + pr.resume() // free one slot for the full inflights window to allow progress. if pr.State == ProgressStateReplicate && pr.ins.full() { diff --git a/raft/raft_test.go b/raft/raft_test.go index 99579d94f..3486d7b38 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -260,14 +260,20 @@ func TestProgressResume(t *testing.T) { } } -// TestProgressResumeByHeartbeat ensures raft.heartbeat reset progress.paused by heartbeat. -func TestProgressResumeByHeartbeat(t *testing.T) { +// TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat. +func TestProgressResumeByHeartbeatResp(t *testing.T) { r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage()) r.becomeCandidate() r.becomeLeader() r.prs[2].Paused = true r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) + if !r.prs[2].Paused { + t.Errorf("paused = %v, want false", r.prs[2].Paused) + } + + r.prs[2].becomeReplicate() + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) if r.prs[2].Paused { t.Errorf("paused = %v, want false", r.prs[2].Paused) } @@ -1139,44 +1145,29 @@ func TestHandleHeartbeatResp(t *testing.T) { t.Errorf("type = %v, want MsgApp", msgs[0].Type) } - // A second heartbeat response with no AppResp does not re-send because we are in the wait state. + // A second heartbeat response generates another MsgApp re-send sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) msgs = sm.readMessages() - if len(msgs) != 0 { - t.Fatalf("len(msgs) = %d, want 0", len(msgs)) + if len(msgs) != 1 { + t.Fatalf("len(msgs) = %d, want 1", len(msgs)) } - - // Send a heartbeat to reset the wait state; next heartbeat will re-send MsgApp. - sm.bcastHeartbeat() - sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) - msgs = sm.readMessages() - if len(msgs) != 2 { - t.Fatalf("len(msgs) = %d, want 2", len(msgs)) - } - if msgs[0].Type != pb.MsgHeartbeat { - t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type) - } - if msgs[1].Type != pb.MsgApp { - t.Errorf("type = %v, want MsgApp", msgs[1].Type) + if msgs[0].Type != pb.MsgApp { + t.Errorf("type = %v, want MsgApp", msgs[0].Type) } // Once we have an MsgAppResp, heartbeats no longer send MsgApp. sm.Step(pb.Message{ From: 2, Type: pb.MsgAppResp, - Index: msgs[1].Index + uint64(len(msgs[1].Entries)), + Index: msgs[0].Index + uint64(len(msgs[0].Entries)), }) // Consume the message sent in response to MsgAppResp sm.readMessages() - sm.bcastHeartbeat() // reset wait state sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp}) msgs = sm.readMessages() - if len(msgs) != 1 { - t.Fatalf("len(msgs) = %d, want 1: %+v", len(msgs), msgs) - } - if msgs[0].Type != pb.MsgHeartbeat { - t.Errorf("type = %v, want MsgHeartbeat", msgs[0].Type) + if len(msgs) != 0 { + t.Fatalf("len(msgs) = %d, want 0: %+v", len(msgs), msgs) } } @@ -1988,15 +1979,19 @@ func TestSendAppendForProgressProbe(t *testing.T) { // each round is a heartbeat for i := 0; i < 3; i++ { - // we expect that raft will only send out one msgAPP per heartbeat timeout - r.appendEntry(pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) - msg := r.readMessages() - if len(msg) != 1 { - t.Errorf("len(msg) = %d, want %d", len(msg), 1) - } - if msg[0].Index != 0 { - t.Errorf("index = %d, want %d", msg[0].Index, 0) + if i == 0 { + // we expect that raft will only send out one msgAPP on the first + // loop. After that, the follower is paused until a heartbeat response is + // received. + r.appendEntry(pb.Entry{Data: []byte("somedata")}) + r.sendAppend(2) + msg := r.readMessages() + if len(msg) != 1 { + t.Errorf("len(msg) = %d, want %d", len(msg), 1) + } + if msg[0].Index != 0 { + t.Errorf("index = %d, want %d", msg[0].Index, 0) + } } if !r.prs[2].Paused { @@ -2014,8 +2009,12 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } + if !r.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs[2].Paused) + } + // consume the heartbeat - msg = r.readMessages() + msg := r.readMessages() if len(msg) != 1 { t.Errorf("len(msg) = %d, want %d", len(msg), 1) } @@ -2023,6 +2022,19 @@ func TestSendAppendForProgressProbe(t *testing.T) { t.Errorf("type = %v, want %v", msg[0].Type, pb.MsgHeartbeat) } } + + // a heartbeat response will allow another message to be sent + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) + msg := r.readMessages() + if len(msg) != 1 { + t.Errorf("len(msg) = %d, want %d", len(msg), 1) + } + if msg[0].Index != 0 { + t.Errorf("index = %d, want %d", msg[0].Index, 0) + } + if !r.prs[2].Paused { + t.Errorf("paused = %v, want true", r.prs[2].Paused) + } } func TestSendAppendForProgressReplicate(t *testing.T) {