mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: Make flow control more aggressive
We allow multiple in-flight append messages, but prior to this change the only way we'd ever send them is if there is a steady stream of new proposals. Catching up a follower that is far behind would be unnecessarily slow (this is exacerbated by a quirk of CockroachDB's use of raft which limits our ability to catch up via snapshot in some cases). See cockroachdb/cockroach#27983
This commit is contained in:
parent
93be31d43a
commit
a9e7c1e11f
34
raft/raft.go
34
raft/raft.go
@ -441,22 +441,35 @@ func (r *raft) getProgress(id uint64) *Progress {
|
||||
return r.learnerPrs[id]
|
||||
}
|
||||
|
||||
// sendAppend sends RPC, with entries to the given peer.
|
||||
// sendAppend sends an append RPC with new entries (if any) and the
|
||||
// current commit index to the given peer.
|
||||
func (r *raft) sendAppend(to uint64) {
|
||||
r.maybeSendAppend(to, true)
|
||||
}
|
||||
|
||||
// maybeSendAppend sends an append RPC with new entries to the given peer,
|
||||
// if necessary. Returns true if a message was sent. The sendIfEmpty
|
||||
// argument controls whether messages with no entries will be sent
|
||||
// ("empty" messages are useful to convey updated Commit indexes, but
|
||||
// are undesirable when we're sending multiple messages in a batch).
|
||||
func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
|
||||
pr := r.getProgress(to)
|
||||
if pr.IsPaused() {
|
||||
return
|
||||
return false
|
||||
}
|
||||
m := pb.Message{}
|
||||
m.To = to
|
||||
|
||||
term, errt := r.raftLog.term(pr.Next - 1)
|
||||
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
|
||||
if len(ents) == 0 && !sendIfEmpty {
|
||||
return false
|
||||
}
|
||||
|
||||
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
|
||||
if !pr.RecentActive {
|
||||
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
m.Type = pb.MsgSnap
|
||||
@ -464,7 +477,7 @@ func (r *raft) sendAppend(to uint64) {
|
||||
if err != nil {
|
||||
if err == ErrSnapshotTemporarilyUnavailable {
|
||||
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
|
||||
return
|
||||
return false
|
||||
}
|
||||
panic(err) // TODO(bdarnell)
|
||||
}
|
||||
@ -498,6 +511,7 @@ func (r *raft) sendAppend(to uint64) {
|
||||
}
|
||||
}
|
||||
r.send(m)
|
||||
return true
|
||||
}
|
||||
|
||||
// sendHeartbeat sends an empty MsgApp
|
||||
@ -1020,10 +1034,18 @@ func stepLeader(r *raft, m pb.Message) error {
|
||||
if r.maybeCommit() {
|
||||
r.bcastAppend()
|
||||
} else if oldPaused {
|
||||
// update() reset the wait state on this node. If we had delayed sending
|
||||
// an update before, send it now.
|
||||
// If we were paused before, this node may be missing the
|
||||
// latest commit index, so send it.
|
||||
r.sendAppend(m.From)
|
||||
}
|
||||
// We've updated flow control information above, which may
|
||||
// allow us to send multiple (size-limited) in-flight messages
|
||||
// at once (such as when transitioning from probe to
|
||||
// replicate, or when freeTo() covers multiple messages). If
|
||||
// we have more entries to send, send as many messages as we
|
||||
// can (without sending empty messages for the commit index)
|
||||
for r.maybeSendAppend(m.From, false) {
|
||||
}
|
||||
// Transfer leadership is in progress.
|
||||
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
|
||||
r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
pb "github.com/coreos/etcd/raft/raftpb"
|
||||
@ -293,6 +294,74 @@ func TestProgressPaused(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProgressFlowControl(t *testing.T) {
|
||||
cfg := newTestConfig(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
|
||||
cfg.MaxInflightMsgs = 3
|
||||
cfg.MaxSizePerMsg = 2048
|
||||
r := newRaft(cfg)
|
||||
r.becomeCandidate()
|
||||
r.becomeLeader()
|
||||
|
||||
// Throw away all the messages relating to the initial election.
|
||||
r.readMessages()
|
||||
|
||||
// While node 2 is in probe state, propose a bunch of entries.
|
||||
r.prs[2].becomeProbe()
|
||||
blob := []byte(strings.Repeat("a", 1000))
|
||||
for i := 0; i < 10; i++ {
|
||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
|
||||
}
|
||||
|
||||
ms := r.readMessages()
|
||||
// First append has two entries: the empty entry to confirm the
|
||||
// election, and the first proposal (only one proposal gets sent
|
||||
// because we're in probe state).
|
||||
if len(ms) != 1 || ms[0].Type != pb.MsgApp {
|
||||
t.Fatalf("expected 1 MsgApp, got %v", ms)
|
||||
}
|
||||
if len(ms[0].Entries) != 2 {
|
||||
t.Fatalf("expected 2 entries, got %d", len(ms[0].Entries))
|
||||
}
|
||||
if len(ms[0].Entries[0].Data) != 0 || len(ms[0].Entries[1].Data) != 1000 {
|
||||
t.Fatalf("unexpected entry sizes: %v", ms[0].Entries)
|
||||
}
|
||||
|
||||
// When this append is acked, we change to replicate state and can
|
||||
// send multiple messages at once.
|
||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index})
|
||||
ms = r.readMessages()
|
||||
if len(ms) != 3 {
|
||||
t.Fatalf("expected 3 messages, got %d", len(ms))
|
||||
}
|
||||
for i, m := range ms {
|
||||
if m.Type != pb.MsgApp {
|
||||
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
|
||||
}
|
||||
if len(m.Entries) != 2 {
|
||||
t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries))
|
||||
}
|
||||
}
|
||||
|
||||
// Ack all three of those messages together and get the last two
|
||||
// messages (containing three entries).
|
||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index})
|
||||
ms = r.readMessages()
|
||||
if len(ms) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(ms))
|
||||
}
|
||||
for i, m := range ms {
|
||||
if m.Type != pb.MsgApp {
|
||||
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
|
||||
}
|
||||
}
|
||||
if len(ms[0].Entries) != 2 {
|
||||
t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries))
|
||||
}
|
||||
if len(ms[1].Entries) != 1 {
|
||||
t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries))
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeaderElection(t *testing.T) {
|
||||
testLeaderElection(t, false)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user