raft/tracker: add byte size limit to Inflights type

The Inflights type has limits on the message size and the number of inflight
messages. However, a single large entry that exceeds the size limit can still
be sent. In combination with the max messages count limit, many large messages
can be sent in a row and overflow the receiver. In effect, the "max" values act
as "target" rather than hard limits.

This commit adds an additional soft limit on the total size of inflight
messages, which catches such situations and prevents the receiver overflow.

Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
This commit is contained in:
Pavel Kalinnikov 2022-10-20 16:29:40 +01:00 committed by Tobias Grieger
parent 0e4bf4ac4e
commit bfb7b16f4f
6 changed files with 157 additions and 60 deletions

View File

@ -265,7 +265,7 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u
// making the first index the better choice).
Next: c.LastIndex,
Match: 0,
Inflights: tracker.NewInflights(c.Tracker.MaxInflight),
Inflights: tracker.NewInflights(c.Tracker.MaxInflight, 0), // TODO: set maxBytes
IsLearner: isLearner,
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked

View File

@ -629,7 +629,7 @@ func (r *raft) reset(term uint64) {
*pr = tracker.Progress{
Match: 0,
Next: r.raftLog.lastIndex() + 1,
Inflights: tracker.NewInflights(r.prs.MaxInflight),
Inflights: tracker.NewInflights(r.prs.MaxInflight, 0), // TODO: set maxBytes
IsLearner: pr.IsLearner,
}
if id == r.id {

View File

@ -14,6 +14,12 @@
package tracker
// inflight describes an in-flight MsgApp message.
type inflight struct {
index uint64 // the index of the last entry inside the message
bytes uint64 // the total byte size of the entries in the message
}
// Inflights limits the number of MsgApp (represented by the largest index
// contained within) sent to followers but not yet acknowledged by them. Callers
// use Full() to check whether more messages can be sent, call Add() whenever
@ -22,21 +28,25 @@ package tracker
type Inflights struct {
// the starting index in the buffer
start int
// number of inflights in the buffer
count int
// the size of the buffer
size int
count int // number of inflight messages in the buffer
bytes uint64 // number of inflight bytes
// buffer contains the index of the last entry
// inside one message.
buffer []uint64
size int // the max number of inflight messages
maxBytes uint64 // the max total byte size of inflight messages
// buffer is a ring buffer containing info about all in-flight messages.
buffer []inflight
}
// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
func NewInflights(size int) *Inflights {
// NewInflights sets up an Inflights that allows up to size inflight messages,
// with the total byte size up to maxBytes. If maxBytes is 0 then there is no
// byte size limit. The maxBytes limit is soft, i.e. we accept a single message
// that brings it from size < maxBytes to size >= maxBytes.
func NewInflights(size int, maxBytes uint64) *Inflights {
return &Inflights{
size: size,
size: size,
maxBytes: maxBytes,
}
}
@ -44,15 +54,15 @@ func NewInflights(size int) *Inflights {
// the receiver.
func (in *Inflights) Clone() *Inflights {
ins := *in
ins.buffer = append([]uint64(nil), in.buffer...)
ins.buffer = append([]inflight(nil), in.buffer...)
return &ins
}
// Add notifies the Inflights that a new message with the given index is being
// dispatched. Full() must be called prior to Add() to verify that there is room
// for one more message, and consecutive calls to add Add() must provide a
// monotonic sequence of indexes.
func (in *Inflights) Add(inflight uint64) {
// Add notifies the Inflights that a new message with the given index and byte
// size is being dispatched. Full() must be called prior to Add() to verify that
// there is room for one more message, and consecutive calls to Add() must
// provide a monotonic sequence of indexes.
func (in *Inflights) Add(index, bytes uint64) {
if in.Full() {
panic("cannot add into a Full inflights")
}
@ -64,8 +74,9 @@ func (in *Inflights) Add(inflight uint64) {
if next >= len(in.buffer) {
in.grow()
}
in.buffer[next] = inflight
in.buffer[next] = inflight{index: index, bytes: bytes}
in.count++
in.bytes += bytes
}
// grow the inflight buffer by doubling up to inflights.size. We grow on demand
@ -78,24 +89,26 @@ func (in *Inflights) grow() {
} else if newSize > in.size {
newSize = in.size
}
newBuffer := make([]uint64, newSize)
newBuffer := make([]inflight, newSize)
copy(newBuffer, in.buffer)
in.buffer = newBuffer
}
// FreeLE frees the inflights smaller or equal to the given `to` flight.
func (in *Inflights) FreeLE(to uint64) {
if in.count == 0 || to < in.buffer[in.start] {
if in.count == 0 || to < in.buffer[in.start].index {
// out of the left side of the window
return
}
idx := in.start
var i int
var bytes uint64
for i = 0; i < in.count; i++ {
if to < in.buffer[idx] { // found the first large inflight
if to < in.buffer[idx].index { // found the first large inflight
break
}
bytes += in.buffer[idx].bytes
// increase index and maybe rotate
size := in.size
@ -105,6 +118,7 @@ func (in *Inflights) FreeLE(to uint64) {
}
// free i inflights and set new start index
in.count -= i
in.bytes -= bytes
in.start = idx
if in.count == 0 {
// inflights is empty, reset the start index so that we don't grow the
@ -115,7 +129,7 @@ func (in *Inflights) FreeLE(to uint64) {
// Full returns true if no more messages can be sent at the moment.
func (in *Inflights) Full() bool {
return in.count == in.size
return in.count == in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes)
}
// Count returns the number of inflight messages.

View File

@ -24,32 +24,38 @@ func TestInflightsAdd(t *testing.T) {
// no rotating case
in := &Inflights{
size: 10,
buffer: make([]uint64, 10),
buffer: make([]inflight, 10),
}
for i := 0; i < 5; i++ {
in.Add(uint64(i))
in.Add(uint64(i), uint64(100+i))
}
wantIn := &Inflights{
start: 0,
count: 5,
bytes: 510,
size: 10,
// ↓------------
buffer: []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0},
buffer: inflightsBuffer(
// ↓------------
[]uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0},
[]uint64{100, 101, 102, 103, 104, 0, 0, 0, 0, 0}),
}
require.Equal(t, wantIn, in)
for i := 5; i < 10; i++ {
in.Add(uint64(i))
in.Add(uint64(i), uint64(100+i))
}
wantIn2 := &Inflights{
start: 0,
count: 10,
bytes: 1045,
size: 10,
// ↓---------------------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓---------------------------
[]uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
[]uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn2, in)
@ -57,41 +63,47 @@ func TestInflightsAdd(t *testing.T) {
in2 := &Inflights{
start: 5,
size: 10,
buffer: make([]uint64, 10),
buffer: make([]inflight, 10),
}
for i := 0; i < 5; i++ {
in2.Add(uint64(i))
in2.Add(uint64(i), uint64(100+i))
}
wantIn21 := &Inflights{
start: 5,
count: 5,
bytes: 510,
size: 10,
// ↓------------
buffer: []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4},
buffer: inflightsBuffer(
// ↓------------
[]uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4},
[]uint64{0, 0, 0, 0, 0, 100, 101, 102, 103, 104}),
}
require.Equal(t, wantIn21, in2)
for i := 5; i < 10; i++ {
in2.Add(uint64(i))
in2.Add(uint64(i), uint64(100+i))
}
wantIn22 := &Inflights{
start: 5,
count: 10,
bytes: 1045,
size: 10,
// -------------- ↓------------
buffer: []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4},
buffer: inflightsBuffer(
// -------------- ↓------------
[]uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4},
[]uint64{105, 106, 107, 108, 109, 100, 101, 102, 103, 104}),
}
require.Equal(t, wantIn22, in2)
}
func TestInflightFreeTo(t *testing.T) {
// no rotating case
in := NewInflights(10)
in := NewInflights(10, 0)
for i := 0; i < 10; i++ {
in.Add(uint64(i))
in.Add(uint64(i), uint64(100+i))
}
in.FreeLE(0)
@ -99,9 +111,12 @@ func TestInflightFreeTo(t *testing.T) {
wantIn0 := &Inflights{
start: 1,
count: 9,
bytes: 945,
size: 10,
// ↓------------------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓------------------------
[]uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
[]uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn0, in)
@ -110,9 +125,12 @@ func TestInflightFreeTo(t *testing.T) {
wantIn := &Inflights{
start: 5,
count: 5,
bytes: 535,
size: 10,
// ↓------------
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓------------
[]uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
[]uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn, in)
@ -121,15 +139,18 @@ func TestInflightFreeTo(t *testing.T) {
wantIn2 := &Inflights{
start: 9,
count: 1,
bytes: 109,
size: 10,
// ↓
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓
[]uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
[]uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn2, in)
// rotating case
for i := 10; i < 15; i++ {
in.Add(uint64(i))
in.Add(uint64(i), uint64(100+i))
}
in.FreeLE(12)
@ -137,9 +158,12 @@ func TestInflightFreeTo(t *testing.T) {
wantIn3 := &Inflights{
start: 3,
count: 2,
bytes: 227,
size: 10,
// ↓-----
buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓-----
[]uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
[]uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn3, in)
@ -149,8 +173,67 @@ func TestInflightFreeTo(t *testing.T) {
start: 0,
count: 0,
size: 10,
// ↓
buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
buffer: inflightsBuffer(
// ↓
[]uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9},
[]uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}),
}
require.Equal(t, wantIn4, in)
}
func TestInflightsFull(t *testing.T) {
for _, tc := range []struct {
name string
size int
maxBytes uint64
fullAt int
freeLE uint64
againAt int
}{
{name: "always-full", size: 0, fullAt: 0},
{name: "single-entry", size: 1, fullAt: 1, freeLE: 1, againAt: 2},
{name: "single-entry-overflow", size: 1, maxBytes: 10, fullAt: 1, freeLE: 1, againAt: 2},
{name: "multi-entry", size: 15, fullAt: 15, freeLE: 6, againAt: 22},
{name: "slight-overflow", size: 8, maxBytes: 400, fullAt: 4, freeLE: 2, againAt: 7},
{name: "exact-max-bytes", size: 8, maxBytes: 406, fullAt: 4, freeLE: 3, againAt: 8},
{name: "larger-overflow", size: 15, maxBytes: 408, fullAt: 5, freeLE: 1, againAt: 6},
} {
t.Run(tc.name, func(t *testing.T) {
in := NewInflights(tc.size, tc.maxBytes)
addUntilFull := func(begin, end int) {
for i := begin; i < end; i++ {
if in.Full() {
t.Fatalf("full at %d, want %d", i, end)
}
in.Add(uint64(i), uint64(100+i))
}
if !in.Full() {
t.Fatalf("not full at %d", end)
}
}
addUntilFull(0, tc.fullAt)
in.FreeLE(tc.freeLE)
addUntilFull(tc.fullAt, tc.againAt)
defer func() {
if r := recover(); r == nil {
t.Errorf("Add() did not panic")
}
}()
in.Add(100, 1024)
})
}
}
func inflightsBuffer(indices []uint64, sizes []uint64) []inflight {
if len(indices) != len(sizes) {
panic("len(indices) != len(sizes)")
}
buffer := make([]inflight, 0, len(indices))
for i, idx := range indices {
buffer = append(buffer, inflight{index: idx, bytes: sizes[i]})
}
return buffer
}

View File

@ -141,7 +141,7 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error {
if entries > 0 {
last := nextIndex + uint64(entries) - 1
pr.OptimisticUpdate(last)
pr.Inflights.Add(last)
pr.Inflights.Add(last, 0) // TODO: set bytes to sum(Entries[].Size())
}
// If this message overflows the in-flights tracker, or it was already full,
// consider this message being a probe, so that the flow is paused.

View File

@ -21,8 +21,8 @@ import (
)
func TestProgressString(t *testing.T) {
ins := NewInflights(1)
ins.Add(123)
ins := NewInflights(1, 0)
ins.Add(123, 1)
pr := &Progress{
Match: 1,
Next: 2,
@ -53,9 +53,9 @@ func TestProgressIsPaused(t *testing.T) {
}
for i, tt := range tests {
p := &Progress{
State: tt.state,
State: tt.state,
MsgAppFlowPaused: tt.paused,
Inflights: NewInflights(256),
Inflights: NewInflights(256, 0),
}
assert.Equal(t, tt.w, p.IsPaused(), i)
}
@ -82,17 +82,17 @@ func TestProgressBecomeProbe(t *testing.T) {
wnext uint64
}{
{
&Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256)},
&Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256, 0)},
2,
},
{
// snapshot finish
&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256)},
&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256, 0)},
11,
},
{
// snapshot failure
&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256)},
&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256, 0)},
2,
},
}
@ -105,7 +105,7 @@ func TestProgressBecomeProbe(t *testing.T) {
}
func TestProgressBecomeReplicate(t *testing.T) {
p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)}
p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)}
p.BecomeReplicate()
assert.Equal(t, StateReplicate, p.State)
assert.Equal(t, uint64(1), p.Match)
@ -113,7 +113,7 @@ func TestProgressBecomeReplicate(t *testing.T) {
}
func TestProgressBecomeSnapshot(t *testing.T) {
p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)}
p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)}
p.BecomeSnapshot(10)
assert.Equal(t, StateSnapshot, p.State)
assert.Equal(t, uint64(1), p.Match)