raft: add MaxInflightBytes to Config

This commit introduces the max inflight bytes setting at the Config level, and
tests that raft flow control honours it.

Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
This commit is contained in:
Pavel Kalinnikov 2022-10-20 16:39:49 +01:00 committed by Tobias Grieger
parent 8c9c557d85
commit 68af01ca6e
3 changed files with 56 additions and 36 deletions

View File

@ -160,6 +160,16 @@ type Config struct {
// overflowing that sending buffer. TODO (xiangli): feedback to application to
// limit the proposal rate?
MaxInflightMsgs int
// MaxInflightBytes limits the number of in-flight bytes in append messages.
// Complements MaxInflightMsgs. Ignored if zero.
//
// This effectively bounds the bandwidth-delay product. Note that especially
// in high-latency deployments setting this too low can lead to a dramatic
// reduction in throughput. For example, with a peer that has a round-trip
// latency of 100ms to the leader and this setting is set to 1 MB, there is a
// throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops
// to 2.5 MB/s. See Little's law to understand the maths behind.
MaxInflightBytes uint64
// CheckQuorum specifies if the leader should check quorum activity. Leader
// steps down when quorum is not active for an electionTimeout.
@ -228,6 +238,11 @@ func (c *Config) validate() error {
if c.MaxInflightMsgs <= 0 {
return errors.New("max inflight messages must be greater than 0")
}
if c.MaxInflightBytes == 0 {
c.MaxInflightBytes = noLimit
} else if c.MaxInflightBytes < c.MaxSizePerMsg {
return errors.New("max inflight bytes must be >= max message size")
}
if c.Logger == nil {
c.Logger = getLogger()
@ -332,7 +347,7 @@ func newRaft(c *Config) *raft {
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, 0), // TODO: set maxBytes
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
@ -484,8 +499,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
// Send the actual MsgApp otherwise, and update the progress accordingly.
next := pr.Next // save Next for later, as the progress update can change it
// TODO(pavelkalinnikov): set bytes to sum(Entries[].Size())
if err := pr.UpdateOnEntriesSend(len(ents), 0 /* bytes */, next); err != nil {
if err := pr.UpdateOnEntriesSend(len(ents), payloadsSize(ents), next); err != nil {
r.logger.Panicf("%x: %v", r.id, err)
}
r.send(pb.Message{

View File

@ -130,6 +130,7 @@ func TestProgressFlowControl(t *testing.T) {
cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
cfg.MaxInflightMsgs = 3
cfg.MaxSizePerMsg = 2048
cfg.MaxInflightBytes = 9000 // A little over MaxInflightMsgs * MaxSizePerMsg.
r := newRaft(cfg)
r.becomeCandidate()
r.becomeLeader()
@ -140,7 +141,12 @@ func TestProgressFlowControl(t *testing.T) {
// While node 2 is in probe state, propose a bunch of entries.
r.prs.Progress[2].BecomeProbe()
blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 10; i++ {
large := []byte(strings.Repeat("b", 5000))
for i := 0; i < 22; i++ {
blob := blob
if i >= 10 && i < 16 { // Temporarily send large messages.
blob = large
}
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
}
@ -158,40 +164,40 @@ func TestProgressFlowControl(t *testing.T) {
t.Fatalf("unexpected entry sizes: %v", ms[0].Entries)
}
// When this append is acked, we change to replicate state and can
// send multiple messages at once.
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index})
ms = r.readMessages()
if len(ms) != 3 {
t.Fatalf("expected 3 messages, got %d", len(ms))
}
for i, m := range ms {
if m.Type != pb.MsgApp {
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
ackAndVerify := func(index uint64, expEntries ...int) uint64 {
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: index})
ms := r.readMessages()
if got, want := len(ms), len(expEntries); got != want {
t.Fatalf("expected %d messages, got %d", want, got)
}
if len(m.Entries) != 2 {
t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries))
for i, m := range ms {
if got, want := m.Type, pb.MsgApp; got != want {
t.Errorf("%d: expected MsgApp, got %s", i, got)
}
if got, want := len(m.Entries), expEntries[i]; got != want {
t.Errorf("%d: expected %d entries, got %d", i, want, got)
}
}
last := ms[len(ms)-1].Entries
if len(last) == 0 {
return index
}
return last[len(last)-1].Index
}
// Ack all three of those messages together and get the last two
// messages (containing three entries).
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index})
ms = r.readMessages()
if len(ms) != 2 {
t.Fatalf("expected 2 messages, got %d", len(ms))
}
for i, m := range ms {
if m.Type != pb.MsgApp {
t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
}
}
if len(ms[0].Entries) != 2 {
t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries))
}
if len(ms[1].Entries) != 1 {
t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries))
}
// When this append is acked, we change to replicate state and can
// send multiple messages at once.
index := ackAndVerify(ms[0].Entries[1].Index, 2, 2, 2)
// Ack all three of those messages together and get another 3 messages. The
// third message contains a single large entry, in contrast to 2 before.
index = ackAndVerify(index, 2, 1, 1)
// All subsequent messages contain one large entry, and we cap at 2 messages
// because it overflows MaxInflightBytes.
index = ackAndVerify(index, 1, 1)
index = ackAndVerify(index, 1, 1)
// Start getting small messages again.
index = ackAndVerify(index, 1, 2, 2)
ackAndVerify(index, 2)
}
func TestUncommittedEntryLimit(t *testing.T) {

View File

@ -53,9 +53,9 @@ func TestProgressIsPaused(t *testing.T) {
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
State: tt.state,
MsgAppFlowPaused: tt.paused,
Inflights: NewInflights(256, 0),
Inflights: NewInflights(256, 0),
}
assert.Equal(t, tt.w, p.IsPaused(), i)
}