From bfb7b16f4f5653403a99065d4983c77a59ac600c Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 20 Oct 2022 16:29:40 +0100 Subject: [PATCH 1/5] raft/tracker: add byte size limit to Inflights type The Inflights type has limits on the message size and the number of inflight messages. However, a single large entry that exceeds the size limit can still be sent. In combination with the max messages count limit, many large messages can be sent in a row and overflow the receiver. In effect, the "max" values act as "target" rather than hard limits. This commit adds an additional soft limit on the total size of inflight messages, which catches such situations and prevents the receiver overflow. Signed-off-by: Pavel Kalinnikov --- raft/confchange/confchange.go | 2 +- raft/raft.go | 2 +- raft/tracker/inflights.go | 56 +++++++++----- raft/tracker/inflights_test.go | 137 ++++++++++++++++++++++++++------- raft/tracker/progress.go | 2 +- raft/tracker/progress_test.go | 18 ++--- 6 files changed, 157 insertions(+), 60 deletions(-) diff --git a/raft/confchange/confchange.go b/raft/confchange/confchange.go index dddbcc9d9..4e54e30f2 100644 --- a/raft/confchange/confchange.go +++ b/raft/confchange/confchange.go @@ -265,7 +265,7 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u // making the first index the better choice). Next: c.LastIndex, Match: 0, - Inflights: tracker.NewInflights(c.Tracker.MaxInflight), + Inflights: tracker.NewInflights(c.Tracker.MaxInflight, 0), // TODO: set maxBytes IsLearner: isLearner, // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked diff --git a/raft/raft.go b/raft/raft.go index 5b3139196..33d608d41 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -629,7 +629,7 @@ func (r *raft) reset(term uint64) { *pr = tracker.Progress{ Match: 0, Next: r.raftLog.lastIndex() + 1, - Inflights: tracker.NewInflights(r.prs.MaxInflight), + Inflights: tracker.NewInflights(r.prs.MaxInflight, 0), // TODO: set maxBytes IsLearner: pr.IsLearner, } if id == r.id { diff --git a/raft/tracker/inflights.go b/raft/tracker/inflights.go index 242d1cab1..350728aec 100644 --- a/raft/tracker/inflights.go +++ b/raft/tracker/inflights.go @@ -14,6 +14,12 @@ package tracker +// inflight describes an in-flight MsgApp message. +type inflight struct { + index uint64 // the index of the last entry inside the message + bytes uint64 // the total byte size of the entries in the message +} + // Inflights limits the number of MsgApp (represented by the largest index // contained within) sent to followers but not yet acknowledged by them. Callers // use Full() to check whether more messages can be sent, call Add() whenever @@ -22,21 +28,25 @@ package tracker type Inflights struct { // the starting index in the buffer start int - // number of inflights in the buffer - count int - // the size of the buffer - size int + count int // number of inflight messages in the buffer + bytes uint64 // number of inflight bytes - // buffer contains the index of the last entry - // inside one message. - buffer []uint64 + size int // the max number of inflight messages + maxBytes uint64 // the max total byte size of inflight messages + + // buffer is a ring buffer containing info about all in-flight messages. + buffer []inflight } -// NewInflights sets up an Inflights that allows up to 'size' inflight messages. -func NewInflights(size int) *Inflights { +// NewInflights sets up an Inflights that allows up to size inflight messages, +// with the total byte size up to maxBytes. If maxBytes is 0 then there is no +// byte size limit. The maxBytes limit is soft, i.e. we accept a single message +// that brings it from size < maxBytes to size >= maxBytes. +func NewInflights(size int, maxBytes uint64) *Inflights { return &Inflights{ - size: size, + size: size, + maxBytes: maxBytes, } } @@ -44,15 +54,15 @@ func NewInflights(size int) *Inflights { // the receiver. func (in *Inflights) Clone() *Inflights { ins := *in - ins.buffer = append([]uint64(nil), in.buffer...) + ins.buffer = append([]inflight(nil), in.buffer...) return &ins } -// Add notifies the Inflights that a new message with the given index is being -// dispatched. Full() must be called prior to Add() to verify that there is room -// for one more message, and consecutive calls to add Add() must provide a -// monotonic sequence of indexes. -func (in *Inflights) Add(inflight uint64) { +// Add notifies the Inflights that a new message with the given index and byte +// size is being dispatched. Full() must be called prior to Add() to verify that +// there is room for one more message, and consecutive calls to Add() must +// provide a monotonic sequence of indexes. +func (in *Inflights) Add(index, bytes uint64) { if in.Full() { panic("cannot add into a Full inflights") } @@ -64,8 +74,9 @@ func (in *Inflights) Add(inflight uint64) { if next >= len(in.buffer) { in.grow() } - in.buffer[next] = inflight + in.buffer[next] = inflight{index: index, bytes: bytes} in.count++ + in.bytes += bytes } // grow the inflight buffer by doubling up to inflights.size. We grow on demand @@ -78,24 +89,26 @@ func (in *Inflights) grow() { } else if newSize > in.size { newSize = in.size } - newBuffer := make([]uint64, newSize) + newBuffer := make([]inflight, newSize) copy(newBuffer, in.buffer) in.buffer = newBuffer } // FreeLE frees the inflights smaller or equal to the given `to` flight. func (in *Inflights) FreeLE(to uint64) { - if in.count == 0 || to < in.buffer[in.start] { + if in.count == 0 || to < in.buffer[in.start].index { // out of the left side of the window return } idx := in.start var i int + var bytes uint64 for i = 0; i < in.count; i++ { - if to < in.buffer[idx] { // found the first large inflight + if to < in.buffer[idx].index { // found the first large inflight break } + bytes += in.buffer[idx].bytes // increase index and maybe rotate size := in.size @@ -105,6 +118,7 @@ func (in *Inflights) FreeLE(to uint64) { } // free i inflights and set new start index in.count -= i + in.bytes -= bytes in.start = idx if in.count == 0 { // inflights is empty, reset the start index so that we don't grow the @@ -115,7 +129,7 @@ func (in *Inflights) FreeLE(to uint64) { // Full returns true if no more messages can be sent at the moment. func (in *Inflights) Full() bool { - return in.count == in.size + return in.count == in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes) } // Count returns the number of inflight messages. diff --git a/raft/tracker/inflights_test.go b/raft/tracker/inflights_test.go index fe2a1b564..3514220df 100644 --- a/raft/tracker/inflights_test.go +++ b/raft/tracker/inflights_test.go @@ -24,32 +24,38 @@ func TestInflightsAdd(t *testing.T) { // no rotating case in := &Inflights{ size: 10, - buffer: make([]uint64, 10), + buffer: make([]inflight, 10), } for i := 0; i < 5; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } wantIn := &Inflights{ start: 0, count: 5, + bytes: 510, size: 10, - // ↓------------ - buffer: []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0}, + []uint64{100, 101, 102, 103, 104, 0, 0, 0, 0, 0}), } require.Equal(t, wantIn, in) for i := 5; i < 10; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } wantIn2 := &Inflights{ start: 0, count: 10, + bytes: 1045, size: 10, - // ↓--------------------------- - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓--------------------------- + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn2, in) @@ -57,41 +63,47 @@ func TestInflightsAdd(t *testing.T) { in2 := &Inflights{ start: 5, size: 10, - buffer: make([]uint64, 10), + buffer: make([]inflight, 10), } for i := 0; i < 5; i++ { - in2.Add(uint64(i)) + in2.Add(uint64(i), uint64(100+i)) } wantIn21 := &Inflights{ start: 5, count: 5, + bytes: 510, size: 10, - // ↓------------ - buffer: []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4}, + []uint64{0, 0, 0, 0, 0, 100, 101, 102, 103, 104}), } require.Equal(t, wantIn21, in2) for i := 5; i < 10; i++ { - in2.Add(uint64(i)) + in2.Add(uint64(i), uint64(100+i)) } wantIn22 := &Inflights{ start: 5, count: 10, + bytes: 1045, size: 10, - // -------------- ↓------------ - buffer: []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4}, + buffer: inflightsBuffer( + // -------------- ↓------------ + []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4}, + []uint64{105, 106, 107, 108, 109, 100, 101, 102, 103, 104}), } require.Equal(t, wantIn22, in2) } func TestInflightFreeTo(t *testing.T) { // no rotating case - in := NewInflights(10) + in := NewInflights(10, 0) for i := 0; i < 10; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } in.FreeLE(0) @@ -99,9 +111,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn0 := &Inflights{ start: 1, count: 9, + bytes: 945, size: 10, - // ↓------------------------ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓------------------------ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn0, in) @@ -110,9 +125,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn := &Inflights{ start: 5, count: 5, + bytes: 535, size: 10, - // ↓------------ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn, in) @@ -121,15 +139,18 @@ func TestInflightFreeTo(t *testing.T) { wantIn2 := &Inflights{ start: 9, count: 1, + bytes: 109, size: 10, - // ↓ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn2, in) // rotating case for i := 10; i < 15; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } in.FreeLE(12) @@ -137,9 +158,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn3 := &Inflights{ start: 3, count: 2, + bytes: 227, size: 10, - // ↓----- - buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓----- + []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn3, in) @@ -149,8 +173,67 @@ func TestInflightFreeTo(t *testing.T) { start: 0, count: 0, size: 10, - // ↓ - buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓ + []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn4, in) } + +func TestInflightsFull(t *testing.T) { + for _, tc := range []struct { + name string + size int + maxBytes uint64 + fullAt int + freeLE uint64 + againAt int + }{ + {name: "always-full", size: 0, fullAt: 0}, + {name: "single-entry", size: 1, fullAt: 1, freeLE: 1, againAt: 2}, + {name: "single-entry-overflow", size: 1, maxBytes: 10, fullAt: 1, freeLE: 1, againAt: 2}, + {name: "multi-entry", size: 15, fullAt: 15, freeLE: 6, againAt: 22}, + {name: "slight-overflow", size: 8, maxBytes: 400, fullAt: 4, freeLE: 2, againAt: 7}, + {name: "exact-max-bytes", size: 8, maxBytes: 406, fullAt: 4, freeLE: 3, againAt: 8}, + {name: "larger-overflow", size: 15, maxBytes: 408, fullAt: 5, freeLE: 1, againAt: 6}, + } { + t.Run(tc.name, func(t *testing.T) { + in := NewInflights(tc.size, tc.maxBytes) + + addUntilFull := func(begin, end int) { + for i := begin; i < end; i++ { + if in.Full() { + t.Fatalf("full at %d, want %d", i, end) + } + in.Add(uint64(i), uint64(100+i)) + } + if !in.Full() { + t.Fatalf("not full at %d", end) + } + } + + addUntilFull(0, tc.fullAt) + in.FreeLE(tc.freeLE) + addUntilFull(tc.fullAt, tc.againAt) + + defer func() { + if r := recover(); r == nil { + t.Errorf("Add() did not panic") + } + }() + in.Add(100, 1024) + }) + } +} + +func inflightsBuffer(indices []uint64, sizes []uint64) []inflight { + if len(indices) != len(sizes) { + panic("len(indices) != len(sizes)") + } + buffer := make([]inflight, 0, len(indices)) + for i, idx := range indices { + buffer = append(buffer, inflight{index: idx, bytes: sizes[i]}) + } + return buffer +} diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index c6272d22d..adf9cb4ba 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -141,7 +141,7 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { if entries > 0 { last := nextIndex + uint64(entries) - 1 pr.OptimisticUpdate(last) - pr.Inflights.Add(last) + pr.Inflights.Add(last, 0) // TODO: set bytes to sum(Entries[].Size()) } // If this message overflows the in-flights tracker, or it was already full, // consider this message being a probe, so that the flow is paused. diff --git a/raft/tracker/progress_test.go b/raft/tracker/progress_test.go index 974c383f0..ed80e0da4 100644 --- a/raft/tracker/progress_test.go +++ b/raft/tracker/progress_test.go @@ -21,8 +21,8 @@ import ( ) func TestProgressString(t *testing.T) { - ins := NewInflights(1) - ins.Add(123) + ins := NewInflights(1, 0) + ins.Add(123, 1) pr := &Progress{ Match: 1, Next: 2, @@ -53,9 +53,9 @@ func TestProgressIsPaused(t *testing.T) { } for i, tt := range tests { p := &Progress{ - State: tt.state, + State: tt.state, MsgAppFlowPaused: tt.paused, - Inflights: NewInflights(256), + Inflights: NewInflights(256, 0), } assert.Equal(t, tt.w, p.IsPaused(), i) } @@ -82,17 +82,17 @@ func TestProgressBecomeProbe(t *testing.T) { wnext uint64 }{ { - &Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256)}, + &Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256, 0)}, 2, }, { // snapshot finish - &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256)}, + &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256, 0)}, 11, }, { // snapshot failure - &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256)}, + &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256, 0)}, 2, }, } @@ -105,7 +105,7 @@ func TestProgressBecomeProbe(t *testing.T) { } func TestProgressBecomeReplicate(t *testing.T) { - p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)} + p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)} p.BecomeReplicate() assert.Equal(t, StateReplicate, p.State) assert.Equal(t, uint64(1), p.Match) @@ -113,7 +113,7 @@ func TestProgressBecomeReplicate(t *testing.T) { } func TestProgressBecomeSnapshot(t *testing.T) { - p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)} + p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)} p.BecomeSnapshot(10) assert.Equal(t, StateSnapshot, p.State) assert.Equal(t, uint64(1), p.Match) From 7bda0d7773e4017853343b219790f9cc7b28ace1 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 20 Oct 2022 16:36:44 +0100 Subject: [PATCH 2/5] raft/tracker: add MaxInflightBytes to ProgressTracker This commit plumbs the max total byte size of the Inflights type higher up the stack to the ProgressTracker. Signed-off-by: Pavel Kalinnikov --- raft/confchange/confchange.go | 2 +- raft/confchange/datadriven_test.go | 2 +- raft/confchange/quick_test.go | 2 +- raft/confchange/restore_test.go | 2 +- raft/raft.go | 9 +++++---- raft/raft_test.go | 2 +- raft/tracker/progress.go | 7 ++++--- raft/tracker/tracker.go | 8 +++++--- 8 files changed, 19 insertions(+), 15 deletions(-) diff --git a/raft/confchange/confchange.go b/raft/confchange/confchange.go index 4e54e30f2..bc60abf7f 100644 --- a/raft/confchange/confchange.go +++ b/raft/confchange/confchange.go @@ -265,7 +265,7 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u // making the first index the better choice). Next: c.LastIndex, Match: 0, - Inflights: tracker.NewInflights(c.Tracker.MaxInflight, 0), // TODO: set maxBytes + Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes), IsLearner: isLearner, // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked diff --git a/raft/confchange/datadriven_test.go b/raft/confchange/datadriven_test.go index ab1524091..f179f1f43 100644 --- a/raft/confchange/datadriven_test.go +++ b/raft/confchange/datadriven_test.go @@ -28,7 +28,7 @@ import ( func TestConfChangeDataDriven(t *testing.T) { datadriven.Walk(t, "testdata", func(t *testing.T, path string) { - tr := tracker.MakeProgressTracker(10) + tr := tracker.MakeProgressTracker(10, 0) c := Changer{ Tracker: tr, LastIndex: 0, // incremented in this test with each cmd diff --git a/raft/confchange/quick_test.go b/raft/confchange/quick_test.go index 16d72c199..76018f634 100644 --- a/raft/confchange/quick_test.go +++ b/raft/confchange/quick_test.go @@ -89,7 +89,7 @@ func TestConfChangeQuick(t *testing.T) { wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) { return func(setup initialChanges, ccs confChanges) (*Changer, error) { - tr := tracker.MakeProgressTracker(10) + tr := tracker.MakeProgressTracker(10, 0) c := &Changer{ Tracker: tr, LastIndex: 10, diff --git a/raft/confchange/restore_test.go b/raft/confchange/restore_test.go index 50712c794..ec45e5144 100644 --- a/raft/confchange/restore_test.go +++ b/raft/confchange/restore_test.go @@ -86,7 +86,7 @@ func TestRestore(t *testing.T) { f := func(cs pb.ConfState) bool { chg := Changer{ - Tracker: tracker.MakeProgressTracker(20), + Tracker: tracker.MakeProgressTracker(20, 0), LastIndex: 10, } cfg, prs, err := Restore(chg, cs) diff --git a/raft/raft.go b/raft/raft.go index 33d608d41..38d02ae27 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -332,7 +332,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: tracker.MakeProgressTracker(c.MaxInflightMsgs), + prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, 0), // TODO: set maxBytes electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -484,7 +484,8 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // Send the actual MsgApp otherwise, and update the progress accordingly. next := pr.Next // save Next for later, as the progress update can change it - if err := pr.UpdateOnEntriesSend(len(ents), next); err != nil { + // TODO(pavelkalinnikov): set bytes to sum(Entries[].Size()) + if err := pr.UpdateOnEntriesSend(len(ents), 0 /* bytes */, next); err != nil { r.logger.Panicf("%x: %v", r.id, err) } r.send(pb.Message{ @@ -629,7 +630,7 @@ func (r *raft) reset(term uint64) { *pr = tracker.Progress{ Match: 0, Next: r.raftLog.lastIndex() + 1, - Inflights: tracker.NewInflights(r.prs.MaxInflight, 0), // TODO: set maxBytes + Inflights: tracker.NewInflights(r.prs.MaxInflight, r.prs.MaxInflightBytes), IsLearner: pr.IsLearner, } if id == r.id { @@ -1618,7 +1619,7 @@ func (r *raft) restore(s pb.Snapshot) bool { r.raftLog.restore(s) // Reset the configuration and add the (potentially updated) peers in anew. - r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) + r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight, r.prs.MaxInflightBytes) cfg, prs, err := confchange.Restore(confchange.Changer{ Tracker: r.prs, LastIndex: r.raftLog.lastIndex(), diff --git a/raft/raft_test.go b/raft/raft_test.go index 95408976b..29eec28c1 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4706,7 +4706,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw learners[i] = true } v.id = id - v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight) + v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight, v.prs.MaxInflightBytes) if len(learners) > 0 { v.prs.Learners = map[uint64]struct{}{} } diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index adf9cb4ba..f4e1e07d8 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -134,14 +134,15 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { } // UpdateOnEntriesSend updates the progress on the given number of consecutive -// entries being sent in a MsgApp, appended at and after the given log index. -func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { +// entries being sent in a MsgApp, with the given total bytes size, appended at +// and after the given log index. +func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error { switch pr.State { case StateReplicate: if entries > 0 { last := nextIndex + uint64(entries) - 1 pr.OptimisticUpdate(last) - pr.Inflights.Add(last, 0) // TODO: set bytes to sum(Entries[].Size()) + pr.Inflights.Add(last, bytes) } // If this message overflows the in-flights tracker, or it was already full, // consider this message being a probe, so that the flow is paused. diff --git a/raft/tracker/tracker.go b/raft/tracker/tracker.go index 72dcc73b8..938b7878c 100644 --- a/raft/tracker/tracker.go +++ b/raft/tracker/tracker.go @@ -121,13 +121,15 @@ type ProgressTracker struct { Votes map[uint64]bool - MaxInflight int + MaxInflight int + MaxInflightBytes uint64 } // MakeProgressTracker initializes a ProgressTracker. -func MakeProgressTracker(maxInflight int) ProgressTracker { +func MakeProgressTracker(maxInflight int, maxBytes uint64) ProgressTracker { p := ProgressTracker{ - MaxInflight: maxInflight, + MaxInflight: maxInflight, + MaxInflightBytes: maxBytes, Config: Config{ Voters: quorum.JointConfig{ quorum.MajorityConfig{}, From 8c9c557d85fa22d99a38a5255cdf175aaf63adf8 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 27 Oct 2022 14:52:55 +0100 Subject: [PATCH 3/5] raft: factor out payloadsSize helper Signed-off-by: Pavel Kalinnikov --- raft/raft.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 38d02ae27..880d23c60 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1790,11 +1790,7 @@ func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Messa // Empty payloads are never refused. This is used both for appending an empty // entry at a new leader's term, as well as leaving a joint configuration. func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { - var s uint64 - for _, e := range ents { - s += uint64(PayloadSize(e)) - } - + s := payloadsSize(ents) if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize { // If the uncommitted tail of the Raft log is empty, allow any size // proposal. Otherwise, limit the size of the uncommitted tail of the @@ -1816,12 +1812,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) { // Fast-path for followers, who do not track or enforce the limit. return } - - var s uint64 - for _, e := range ents { - s += uint64(PayloadSize(e)) - } - if s > r.uncommittedSize { + if s := payloadsSize(ents); s > r.uncommittedSize { // uncommittedSize may underestimate the size of the uncommitted Raft // log tail but will never overestimate it. Saturate at 0 instead of // allowing overflow. @@ -1831,6 +1822,14 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) { } } +func payloadsSize(ents []pb.Entry) uint64 { + var s uint64 + for _, e := range ents { + s += uint64(PayloadSize(e)) + } + return s +} + func numOfPendingConf(ents []pb.Entry) int { n := 0 for i := range ents { From 68af01ca6e47d31509b6104a4392007b1fd561da Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 20 Oct 2022 16:39:49 +0100 Subject: [PATCH 4/5] raft: add MaxInflightBytes to Config This commit introduces the max inflight bytes setting at the Config level, and tests that raft flow control honours it. Signed-off-by: Pavel Kalinnikov --- raft/raft.go | 20 +++++++++-- raft/raft_test.go | 68 +++++++++++++++++++---------------- raft/tracker/progress_test.go | 4 +-- 3 files changed, 56 insertions(+), 36 deletions(-) diff --git a/raft/raft.go b/raft/raft.go index 880d23c60..180a96e93 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -160,6 +160,16 @@ type Config struct { // overflowing that sending buffer. TODO (xiangli): feedback to application to // limit the proposal rate? MaxInflightMsgs int + // MaxInflightBytes limits the number of in-flight bytes in append messages. + // Complements MaxInflightMsgs. Ignored if zero. + // + // This effectively bounds the bandwidth-delay product. Note that especially + // in high-latency deployments setting this too low can lead to a dramatic + // reduction in throughput. For example, with a peer that has a round-trip + // latency of 100ms to the leader and this setting is set to 1 MB, there is a + // throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops + // to 2.5 MB/s. See Little's law to understand the maths behind. + MaxInflightBytes uint64 // CheckQuorum specifies if the leader should check quorum activity. Leader // steps down when quorum is not active for an electionTimeout. @@ -228,6 +238,11 @@ func (c *Config) validate() error { if c.MaxInflightMsgs <= 0 { return errors.New("max inflight messages must be greater than 0") } + if c.MaxInflightBytes == 0 { + c.MaxInflightBytes = noLimit + } else if c.MaxInflightBytes < c.MaxSizePerMsg { + return errors.New("max inflight bytes must be >= max message size") + } if c.Logger == nil { c.Logger = getLogger() @@ -332,7 +347,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, 0), // TODO: set maxBytes + prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -484,8 +499,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // Send the actual MsgApp otherwise, and update the progress accordingly. next := pr.Next // save Next for later, as the progress update can change it - // TODO(pavelkalinnikov): set bytes to sum(Entries[].Size()) - if err := pr.UpdateOnEntriesSend(len(ents), 0 /* bytes */, next); err != nil { + if err := pr.UpdateOnEntriesSend(len(ents), payloadsSize(ents), next); err != nil { r.logger.Panicf("%x: %v", r.id, err) } r.send(pb.Message{ diff --git a/raft/raft_test.go b/raft/raft_test.go index 29eec28c1..6563b1748 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -130,6 +130,7 @@ func TestProgressFlowControl(t *testing.T) { cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2))) cfg.MaxInflightMsgs = 3 cfg.MaxSizePerMsg = 2048 + cfg.MaxInflightBytes = 9000 // A little over MaxInflightMsgs * MaxSizePerMsg. r := newRaft(cfg) r.becomeCandidate() r.becomeLeader() @@ -140,7 +141,12 @@ func TestProgressFlowControl(t *testing.T) { // While node 2 is in probe state, propose a bunch of entries. r.prs.Progress[2].BecomeProbe() blob := []byte(strings.Repeat("a", 1000)) - for i := 0; i < 10; i++ { + large := []byte(strings.Repeat("b", 5000)) + for i := 0; i < 22; i++ { + blob := blob + if i >= 10 && i < 16 { // Temporarily send large messages. + blob = large + } r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}}) } @@ -158,40 +164,40 @@ func TestProgressFlowControl(t *testing.T) { t.Fatalf("unexpected entry sizes: %v", ms[0].Entries) } - // When this append is acked, we change to replicate state and can - // send multiple messages at once. - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index}) - ms = r.readMessages() - if len(ms) != 3 { - t.Fatalf("expected 3 messages, got %d", len(ms)) - } - for i, m := range ms { - if m.Type != pb.MsgApp { - t.Errorf("%d: expected MsgApp, got %s", i, m.Type) + ackAndVerify := func(index uint64, expEntries ...int) uint64 { + r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: index}) + ms := r.readMessages() + if got, want := len(ms), len(expEntries); got != want { + t.Fatalf("expected %d messages, got %d", want, got) } - if len(m.Entries) != 2 { - t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries)) + for i, m := range ms { + if got, want := m.Type, pb.MsgApp; got != want { + t.Errorf("%d: expected MsgApp, got %s", i, got) + } + if got, want := len(m.Entries), expEntries[i]; got != want { + t.Errorf("%d: expected %d entries, got %d", i, want, got) + } } + last := ms[len(ms)-1].Entries + if len(last) == 0 { + return index + } + return last[len(last)-1].Index } - // Ack all three of those messages together and get the last two - // messages (containing three entries). - r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index}) - ms = r.readMessages() - if len(ms) != 2 { - t.Fatalf("expected 2 messages, got %d", len(ms)) - } - for i, m := range ms { - if m.Type != pb.MsgApp { - t.Errorf("%d: expected MsgApp, got %s", i, m.Type) - } - } - if len(ms[0].Entries) != 2 { - t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries)) - } - if len(ms[1].Entries) != 1 { - t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries)) - } + // When this append is acked, we change to replicate state and can + // send multiple messages at once. + index := ackAndVerify(ms[0].Entries[1].Index, 2, 2, 2) + // Ack all three of those messages together and get another 3 messages. The + // third message contains a single large entry, in contrast to 2 before. + index = ackAndVerify(index, 2, 1, 1) + // All subsequent messages contain one large entry, and we cap at 2 messages + // because it overflows MaxInflightBytes. + index = ackAndVerify(index, 1, 1) + index = ackAndVerify(index, 1, 1) + // Start getting small messages again. + index = ackAndVerify(index, 1, 2, 2) + ackAndVerify(index, 2) } func TestUncommittedEntryLimit(t *testing.T) { diff --git a/raft/tracker/progress_test.go b/raft/tracker/progress_test.go index ed80e0da4..49dedb536 100644 --- a/raft/tracker/progress_test.go +++ b/raft/tracker/progress_test.go @@ -53,9 +53,9 @@ func TestProgressIsPaused(t *testing.T) { } for i, tt := range tests { p := &Progress{ - State: tt.state, + State: tt.state, MsgAppFlowPaused: tt.paused, - Inflights: NewInflights(256, 0), + Inflights: NewInflights(256, 0), } assert.Equal(t, tt.w, p.IsPaused(), i) } From 0ef5df11a62c0bbd7ae8fcd0e24d8d619fca18f2 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 9 Nov 2022 18:36:32 +0000 Subject: [PATCH 5/5] raft: update changelog Signed-off-by: Pavel Kalinnikov --- CHANGELOG/CHANGELOG-3.6.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG/CHANGELOG-3.6.md b/CHANGELOG/CHANGELOG-3.6.md index 8d53c51cb..e51c81985 100644 --- a/CHANGELOG/CHANGELOG-3.6.md +++ b/CHANGELOG/CHANGELOG-3.6.md @@ -46,6 +46,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0). - Package `wal` was moved to `storage/wal` - Package `datadir` was moved to `storage/datadir` +### Package `raft` +- Send empty `MsgApp` when entry in-flight limits are exceeded. See [pull/14633](https://github.com/etcd-io/etcd/pull/14633). +- Add [MaxInflightBytes](https://github.com/etcd-io/etcd/pull/14624) setting in `raft.Config` for better flow control of entries. + ### etcd server - Add [`etcd --log-format`](https://github.com/etcd-io/etcd/pull/13339) flag to support log format.