diff --git a/raft/confchange/confchange.go b/raft/confchange/confchange.go index dddbcc9d9..4e54e30f2 100644 --- a/raft/confchange/confchange.go +++ b/raft/confchange/confchange.go @@ -265,7 +265,7 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u // making the first index the better choice). Next: c.LastIndex, Match: 0, - Inflights: tracker.NewInflights(c.Tracker.MaxInflight), + Inflights: tracker.NewInflights(c.Tracker.MaxInflight, 0), // TODO: set maxBytes IsLearner: isLearner, // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked diff --git a/raft/raft.go b/raft/raft.go index 5b3139196..33d608d41 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -629,7 +629,7 @@ func (r *raft) reset(term uint64) { *pr = tracker.Progress{ Match: 0, Next: r.raftLog.lastIndex() + 1, - Inflights: tracker.NewInflights(r.prs.MaxInflight), + Inflights: tracker.NewInflights(r.prs.MaxInflight, 0), // TODO: set maxBytes IsLearner: pr.IsLearner, } if id == r.id { diff --git a/raft/tracker/inflights.go b/raft/tracker/inflights.go index 242d1cab1..350728aec 100644 --- a/raft/tracker/inflights.go +++ b/raft/tracker/inflights.go @@ -14,6 +14,12 @@ package tracker +// inflight describes an in-flight MsgApp message. +type inflight struct { + index uint64 // the index of the last entry inside the message + bytes uint64 // the total byte size of the entries in the message +} + // Inflights limits the number of MsgApp (represented by the largest index // contained within) sent to followers but not yet acknowledged by them. Callers // use Full() to check whether more messages can be sent, call Add() whenever @@ -22,21 +28,25 @@ package tracker type Inflights struct { // the starting index in the buffer start int - // number of inflights in the buffer - count int - // the size of the buffer - size int + count int // number of inflight messages in the buffer + bytes uint64 // number of inflight bytes - // buffer contains the index of the last entry - // inside one message. - buffer []uint64 + size int // the max number of inflight messages + maxBytes uint64 // the max total byte size of inflight messages + + // buffer is a ring buffer containing info about all in-flight messages. + buffer []inflight } -// NewInflights sets up an Inflights that allows up to 'size' inflight messages. -func NewInflights(size int) *Inflights { +// NewInflights sets up an Inflights that allows up to size inflight messages, +// with the total byte size up to maxBytes. If maxBytes is 0 then there is no +// byte size limit. The maxBytes limit is soft, i.e. we accept a single message +// that brings it from size < maxBytes to size >= maxBytes. +func NewInflights(size int, maxBytes uint64) *Inflights { return &Inflights{ - size: size, + size: size, + maxBytes: maxBytes, } } @@ -44,15 +54,15 @@ func NewInflights(size int) *Inflights { // the receiver. func (in *Inflights) Clone() *Inflights { ins := *in - ins.buffer = append([]uint64(nil), in.buffer...) + ins.buffer = append([]inflight(nil), in.buffer...) return &ins } -// Add notifies the Inflights that a new message with the given index is being -// dispatched. Full() must be called prior to Add() to verify that there is room -// for one more message, and consecutive calls to add Add() must provide a -// monotonic sequence of indexes. -func (in *Inflights) Add(inflight uint64) { +// Add notifies the Inflights that a new message with the given index and byte +// size is being dispatched. Full() must be called prior to Add() to verify that +// there is room for one more message, and consecutive calls to Add() must +// provide a monotonic sequence of indexes. +func (in *Inflights) Add(index, bytes uint64) { if in.Full() { panic("cannot add into a Full inflights") } @@ -64,8 +74,9 @@ func (in *Inflights) Add(inflight uint64) { if next >= len(in.buffer) { in.grow() } - in.buffer[next] = inflight + in.buffer[next] = inflight{index: index, bytes: bytes} in.count++ + in.bytes += bytes } // grow the inflight buffer by doubling up to inflights.size. We grow on demand @@ -78,24 +89,26 @@ func (in *Inflights) grow() { } else if newSize > in.size { newSize = in.size } - newBuffer := make([]uint64, newSize) + newBuffer := make([]inflight, newSize) copy(newBuffer, in.buffer) in.buffer = newBuffer } // FreeLE frees the inflights smaller or equal to the given `to` flight. func (in *Inflights) FreeLE(to uint64) { - if in.count == 0 || to < in.buffer[in.start] { + if in.count == 0 || to < in.buffer[in.start].index { // out of the left side of the window return } idx := in.start var i int + var bytes uint64 for i = 0; i < in.count; i++ { - if to < in.buffer[idx] { // found the first large inflight + if to < in.buffer[idx].index { // found the first large inflight break } + bytes += in.buffer[idx].bytes // increase index and maybe rotate size := in.size @@ -105,6 +118,7 @@ func (in *Inflights) FreeLE(to uint64) { } // free i inflights and set new start index in.count -= i + in.bytes -= bytes in.start = idx if in.count == 0 { // inflights is empty, reset the start index so that we don't grow the @@ -115,7 +129,7 @@ func (in *Inflights) FreeLE(to uint64) { // Full returns true if no more messages can be sent at the moment. func (in *Inflights) Full() bool { - return in.count == in.size + return in.count == in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes) } // Count returns the number of inflight messages. diff --git a/raft/tracker/inflights_test.go b/raft/tracker/inflights_test.go index fe2a1b564..3514220df 100644 --- a/raft/tracker/inflights_test.go +++ b/raft/tracker/inflights_test.go @@ -24,32 +24,38 @@ func TestInflightsAdd(t *testing.T) { // no rotating case in := &Inflights{ size: 10, - buffer: make([]uint64, 10), + buffer: make([]inflight, 10), } for i := 0; i < 5; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } wantIn := &Inflights{ start: 0, count: 5, + bytes: 510, size: 10, - // ↓------------ - buffer: []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0}, + []uint64{100, 101, 102, 103, 104, 0, 0, 0, 0, 0}), } require.Equal(t, wantIn, in) for i := 5; i < 10; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } wantIn2 := &Inflights{ start: 0, count: 10, + bytes: 1045, size: 10, - // ↓--------------------------- - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓--------------------------- + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn2, in) @@ -57,41 +63,47 @@ func TestInflightsAdd(t *testing.T) { in2 := &Inflights{ start: 5, size: 10, - buffer: make([]uint64, 10), + buffer: make([]inflight, 10), } for i := 0; i < 5; i++ { - in2.Add(uint64(i)) + in2.Add(uint64(i), uint64(100+i)) } wantIn21 := &Inflights{ start: 5, count: 5, + bytes: 510, size: 10, - // ↓------------ - buffer: []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4}, + []uint64{0, 0, 0, 0, 0, 100, 101, 102, 103, 104}), } require.Equal(t, wantIn21, in2) for i := 5; i < 10; i++ { - in2.Add(uint64(i)) + in2.Add(uint64(i), uint64(100+i)) } wantIn22 := &Inflights{ start: 5, count: 10, + bytes: 1045, size: 10, - // -------------- ↓------------ - buffer: []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4}, + buffer: inflightsBuffer( + // -------------- ↓------------ + []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4}, + []uint64{105, 106, 107, 108, 109, 100, 101, 102, 103, 104}), } require.Equal(t, wantIn22, in2) } func TestInflightFreeTo(t *testing.T) { // no rotating case - in := NewInflights(10) + in := NewInflights(10, 0) for i := 0; i < 10; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } in.FreeLE(0) @@ -99,9 +111,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn0 := &Inflights{ start: 1, count: 9, + bytes: 945, size: 10, - // ↓------------------------ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓------------------------ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn0, in) @@ -110,9 +125,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn := &Inflights{ start: 5, count: 5, + bytes: 535, size: 10, - // ↓------------ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓------------ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn, in) @@ -121,15 +139,18 @@ func TestInflightFreeTo(t *testing.T) { wantIn2 := &Inflights{ start: 9, count: 1, + bytes: 109, size: 10, - // ↓ - buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓ + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn2, in) // rotating case for i := 10; i < 15; i++ { - in.Add(uint64(i)) + in.Add(uint64(i), uint64(100+i)) } in.FreeLE(12) @@ -137,9 +158,12 @@ func TestInflightFreeTo(t *testing.T) { wantIn3 := &Inflights{ start: 3, count: 2, + bytes: 227, size: 10, - // ↓----- - buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓----- + []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn3, in) @@ -149,8 +173,67 @@ func TestInflightFreeTo(t *testing.T) { start: 0, count: 0, size: 10, - // ↓ - buffer: []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + buffer: inflightsBuffer( + // ↓ + []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, + []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), } require.Equal(t, wantIn4, in) } + +func TestInflightsFull(t *testing.T) { + for _, tc := range []struct { + name string + size int + maxBytes uint64 + fullAt int + freeLE uint64 + againAt int + }{ + {name: "always-full", size: 0, fullAt: 0}, + {name: "single-entry", size: 1, fullAt: 1, freeLE: 1, againAt: 2}, + {name: "single-entry-overflow", size: 1, maxBytes: 10, fullAt: 1, freeLE: 1, againAt: 2}, + {name: "multi-entry", size: 15, fullAt: 15, freeLE: 6, againAt: 22}, + {name: "slight-overflow", size: 8, maxBytes: 400, fullAt: 4, freeLE: 2, againAt: 7}, + {name: "exact-max-bytes", size: 8, maxBytes: 406, fullAt: 4, freeLE: 3, againAt: 8}, + {name: "larger-overflow", size: 15, maxBytes: 408, fullAt: 5, freeLE: 1, againAt: 6}, + } { + t.Run(tc.name, func(t *testing.T) { + in := NewInflights(tc.size, tc.maxBytes) + + addUntilFull := func(begin, end int) { + for i := begin; i < end; i++ { + if in.Full() { + t.Fatalf("full at %d, want %d", i, end) + } + in.Add(uint64(i), uint64(100+i)) + } + if !in.Full() { + t.Fatalf("not full at %d", end) + } + } + + addUntilFull(0, tc.fullAt) + in.FreeLE(tc.freeLE) + addUntilFull(tc.fullAt, tc.againAt) + + defer func() { + if r := recover(); r == nil { + t.Errorf("Add() did not panic") + } + }() + in.Add(100, 1024) + }) + } +} + +func inflightsBuffer(indices []uint64, sizes []uint64) []inflight { + if len(indices) != len(sizes) { + panic("len(indices) != len(sizes)") + } + buffer := make([]inflight, 0, len(indices)) + for i, idx := range indices { + buffer = append(buffer, inflight{index: idx, bytes: sizes[i]}) + } + return buffer +} diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index c6272d22d..adf9cb4ba 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -141,7 +141,7 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { if entries > 0 { last := nextIndex + uint64(entries) - 1 pr.OptimisticUpdate(last) - pr.Inflights.Add(last) + pr.Inflights.Add(last, 0) // TODO: set bytes to sum(Entries[].Size()) } // If this message overflows the in-flights tracker, or it was already full, // consider this message being a probe, so that the flow is paused. diff --git a/raft/tracker/progress_test.go b/raft/tracker/progress_test.go index 974c383f0..ed80e0da4 100644 --- a/raft/tracker/progress_test.go +++ b/raft/tracker/progress_test.go @@ -21,8 +21,8 @@ import ( ) func TestProgressString(t *testing.T) { - ins := NewInflights(1) - ins.Add(123) + ins := NewInflights(1, 0) + ins.Add(123, 1) pr := &Progress{ Match: 1, Next: 2, @@ -53,9 +53,9 @@ func TestProgressIsPaused(t *testing.T) { } for i, tt := range tests { p := &Progress{ - State: tt.state, + State: tt.state, MsgAppFlowPaused: tt.paused, - Inflights: NewInflights(256), + Inflights: NewInflights(256, 0), } assert.Equal(t, tt.w, p.IsPaused(), i) } @@ -82,17 +82,17 @@ func TestProgressBecomeProbe(t *testing.T) { wnext uint64 }{ { - &Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256)}, + &Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256, 0)}, 2, }, { // snapshot finish - &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256)}, + &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256, 0)}, 11, }, { // snapshot failure - &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256)}, + &Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256, 0)}, 2, }, } @@ -105,7 +105,7 @@ func TestProgressBecomeProbe(t *testing.T) { } func TestProgressBecomeReplicate(t *testing.T) { - p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)} + p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)} p.BecomeReplicate() assert.Equal(t, StateReplicate, p.State) assert.Equal(t, uint64(1), p.Match) @@ -113,7 +113,7 @@ func TestProgressBecomeReplicate(t *testing.T) { } func TestProgressBecomeSnapshot(t *testing.T) { - p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)} + p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256, 0)} p.BecomeSnapshot(10) assert.Equal(t, StateSnapshot, p.State) assert.Equal(t, uint64(1), p.Match)