raft/tracker: add MaxInflightBytes to ProgressTracker

This commit plumbs the max total byte size of the Inflights type higher up the
stack to the ProgressTracker.

Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
This commit is contained in:
Pavel Kalinnikov 2022-10-20 16:36:44 +01:00 committed by Tobias Grieger
parent bfb7b16f4f
commit 7bda0d7773
8 changed files with 19 additions and 15 deletions

View File

@ -265,7 +265,7 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u
// making the first index the better choice). // making the first index the better choice).
Next: c.LastIndex, Next: c.LastIndex,
Match: 0, Match: 0,
Inflights: tracker.NewInflights(c.Tracker.MaxInflight, 0), // TODO: set maxBytes Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes),
IsLearner: isLearner, IsLearner: isLearner,
// When a node is first added, we should mark it as recently active. // When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked // Otherwise, CheckQuorum may cause us to step down if it is invoked

View File

@ -28,7 +28,7 @@ import (
func TestConfChangeDataDriven(t *testing.T) { func TestConfChangeDataDriven(t *testing.T) {
datadriven.Walk(t, "testdata", func(t *testing.T, path string) { datadriven.Walk(t, "testdata", func(t *testing.T, path string) {
tr := tracker.MakeProgressTracker(10) tr := tracker.MakeProgressTracker(10, 0)
c := Changer{ c := Changer{
Tracker: tr, Tracker: tr,
LastIndex: 0, // incremented in this test with each cmd LastIndex: 0, // incremented in this test with each cmd

View File

@ -89,7 +89,7 @@ func TestConfChangeQuick(t *testing.T) {
wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) { wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) {
return func(setup initialChanges, ccs confChanges) (*Changer, error) { return func(setup initialChanges, ccs confChanges) (*Changer, error) {
tr := tracker.MakeProgressTracker(10) tr := tracker.MakeProgressTracker(10, 0)
c := &Changer{ c := &Changer{
Tracker: tr, Tracker: tr,
LastIndex: 10, LastIndex: 10,

View File

@ -86,7 +86,7 @@ func TestRestore(t *testing.T) {
f := func(cs pb.ConfState) bool { f := func(cs pb.ConfState) bool {
chg := Changer{ chg := Changer{
Tracker: tracker.MakeProgressTracker(20), Tracker: tracker.MakeProgressTracker(20, 0),
LastIndex: 10, LastIndex: 10,
} }
cfg, prs, err := Restore(chg, cs) cfg, prs, err := Restore(chg, cs)

View File

@ -332,7 +332,7 @@ func newRaft(c *Config) *raft {
raftLog: raftlog, raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg, maxMsgSize: c.MaxSizePerMsg,
maxUncommittedSize: c.MaxUncommittedEntriesSize, maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs), prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, 0), // TODO: set maxBytes
electionTimeout: c.ElectionTick, electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick, heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger, logger: c.Logger,
@ -484,7 +484,8 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
// Send the actual MsgApp otherwise, and update the progress accordingly. // Send the actual MsgApp otherwise, and update the progress accordingly.
next := pr.Next // save Next for later, as the progress update can change it next := pr.Next // save Next for later, as the progress update can change it
if err := pr.UpdateOnEntriesSend(len(ents), next); err != nil { // TODO(pavelkalinnikov): set bytes to sum(Entries[].Size())
if err := pr.UpdateOnEntriesSend(len(ents), 0 /* bytes */, next); err != nil {
r.logger.Panicf("%x: %v", r.id, err) r.logger.Panicf("%x: %v", r.id, err)
} }
r.send(pb.Message{ r.send(pb.Message{
@ -629,7 +630,7 @@ func (r *raft) reset(term uint64) {
*pr = tracker.Progress{ *pr = tracker.Progress{
Match: 0, Match: 0,
Next: r.raftLog.lastIndex() + 1, Next: r.raftLog.lastIndex() + 1,
Inflights: tracker.NewInflights(r.prs.MaxInflight, 0), // TODO: set maxBytes Inflights: tracker.NewInflights(r.prs.MaxInflight, r.prs.MaxInflightBytes),
IsLearner: pr.IsLearner, IsLearner: pr.IsLearner,
} }
if id == r.id { if id == r.id {
@ -1618,7 +1619,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
r.raftLog.restore(s) r.raftLog.restore(s)
// Reset the configuration and add the (potentially updated) peers in anew. // Reset the configuration and add the (potentially updated) peers in anew.
r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight, r.prs.MaxInflightBytes)
cfg, prs, err := confchange.Restore(confchange.Changer{ cfg, prs, err := confchange.Restore(confchange.Changer{
Tracker: r.prs, Tracker: r.prs,
LastIndex: r.raftLog.lastIndex(), LastIndex: r.raftLog.lastIndex(),

View File

@ -4706,7 +4706,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
learners[i] = true learners[i] = true
} }
v.id = id v.id = id
v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight) v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight, v.prs.MaxInflightBytes)
if len(learners) > 0 { if len(learners) > 0 {
v.prs.Learners = map[uint64]struct{}{} v.prs.Learners = map[uint64]struct{}{}
} }

View File

@ -134,14 +134,15 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
} }
// UpdateOnEntriesSend updates the progress on the given number of consecutive // UpdateOnEntriesSend updates the progress on the given number of consecutive
// entries being sent in a MsgApp, appended at and after the given log index. // entries being sent in a MsgApp, with the given total bytes size, appended at
func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { // and after the given log index.
func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error {
switch pr.State { switch pr.State {
case StateReplicate: case StateReplicate:
if entries > 0 { if entries > 0 {
last := nextIndex + uint64(entries) - 1 last := nextIndex + uint64(entries) - 1
pr.OptimisticUpdate(last) pr.OptimisticUpdate(last)
pr.Inflights.Add(last, 0) // TODO: set bytes to sum(Entries[].Size()) pr.Inflights.Add(last, bytes)
} }
// If this message overflows the in-flights tracker, or it was already full, // If this message overflows the in-flights tracker, or it was already full,
// consider this message being a probe, so that the flow is paused. // consider this message being a probe, so that the flow is paused.

View File

@ -121,13 +121,15 @@ type ProgressTracker struct {
Votes map[uint64]bool Votes map[uint64]bool
MaxInflight int MaxInflight int
MaxInflightBytes uint64
} }
// MakeProgressTracker initializes a ProgressTracker. // MakeProgressTracker initializes a ProgressTracker.
func MakeProgressTracker(maxInflight int) ProgressTracker { func MakeProgressTracker(maxInflight int, maxBytes uint64) ProgressTracker {
p := ProgressTracker{ p := ProgressTracker{
MaxInflight: maxInflight, MaxInflight: maxInflight,
MaxInflightBytes: maxBytes,
Config: Config{ Config: Config{
Voters: quorum.JointConfig{ Voters: quorum.JointConfig{
quorum.MajorityConfig{}, quorum.MajorityConfig{},