diff --git a/raft/confchange/confchange.go b/raft/confchange/confchange.go index 4e54e30f2..bc60abf7f 100644 --- a/raft/confchange/confchange.go +++ b/raft/confchange/confchange.go @@ -265,7 +265,7 @@ func (c Changer) initProgress(cfg *tracker.Config, prs tracker.ProgressMap, id u // making the first index the better choice). Next: c.LastIndex, Match: 0, - Inflights: tracker.NewInflights(c.Tracker.MaxInflight, 0), // TODO: set maxBytes + Inflights: tracker.NewInflights(c.Tracker.MaxInflight, c.Tracker.MaxInflightBytes), IsLearner: isLearner, // When a node is first added, we should mark it as recently active. // Otherwise, CheckQuorum may cause us to step down if it is invoked diff --git a/raft/confchange/datadriven_test.go b/raft/confchange/datadriven_test.go index ab1524091..f179f1f43 100644 --- a/raft/confchange/datadriven_test.go +++ b/raft/confchange/datadriven_test.go @@ -28,7 +28,7 @@ import ( func TestConfChangeDataDriven(t *testing.T) { datadriven.Walk(t, "testdata", func(t *testing.T, path string) { - tr := tracker.MakeProgressTracker(10) + tr := tracker.MakeProgressTracker(10, 0) c := Changer{ Tracker: tr, LastIndex: 0, // incremented in this test with each cmd diff --git a/raft/confchange/quick_test.go b/raft/confchange/quick_test.go index 16d72c199..76018f634 100644 --- a/raft/confchange/quick_test.go +++ b/raft/confchange/quick_test.go @@ -89,7 +89,7 @@ func TestConfChangeQuick(t *testing.T) { wrapper := func(invoke testFunc) func(setup initialChanges, ccs confChanges) (*Changer, error) { return func(setup initialChanges, ccs confChanges) (*Changer, error) { - tr := tracker.MakeProgressTracker(10) + tr := tracker.MakeProgressTracker(10, 0) c := &Changer{ Tracker: tr, LastIndex: 10, diff --git a/raft/confchange/restore_test.go b/raft/confchange/restore_test.go index 50712c794..ec45e5144 100644 --- a/raft/confchange/restore_test.go +++ b/raft/confchange/restore_test.go @@ -86,7 +86,7 @@ func TestRestore(t *testing.T) { f := func(cs pb.ConfState) bool { chg := Changer{ - Tracker: tracker.MakeProgressTracker(20), + Tracker: tracker.MakeProgressTracker(20, 0), LastIndex: 10, } cfg, prs, err := Restore(chg, cs) diff --git a/raft/raft.go b/raft/raft.go index 33d608d41..38d02ae27 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -332,7 +332,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxUncommittedSize: c.MaxUncommittedEntriesSize, - prs: tracker.MakeProgressTracker(c.MaxInflightMsgs), + prs: tracker.MakeProgressTracker(c.MaxInflightMsgs, 0), // TODO: set maxBytes electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -484,7 +484,8 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // Send the actual MsgApp otherwise, and update the progress accordingly. next := pr.Next // save Next for later, as the progress update can change it - if err := pr.UpdateOnEntriesSend(len(ents), next); err != nil { + // TODO(pavelkalinnikov): set bytes to sum(Entries[].Size()) + if err := pr.UpdateOnEntriesSend(len(ents), 0 /* bytes */, next); err != nil { r.logger.Panicf("%x: %v", r.id, err) } r.send(pb.Message{ @@ -629,7 +630,7 @@ func (r *raft) reset(term uint64) { *pr = tracker.Progress{ Match: 0, Next: r.raftLog.lastIndex() + 1, - Inflights: tracker.NewInflights(r.prs.MaxInflight, 0), // TODO: set maxBytes + Inflights: tracker.NewInflights(r.prs.MaxInflight, r.prs.MaxInflightBytes), IsLearner: pr.IsLearner, } if id == r.id { @@ -1618,7 +1619,7 @@ func (r *raft) restore(s pb.Snapshot) bool { r.raftLog.restore(s) // Reset the configuration and add the (potentially updated) peers in anew. - r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight) + r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight, r.prs.MaxInflightBytes) cfg, prs, err := confchange.Restore(confchange.Changer{ Tracker: r.prs, LastIndex: r.raftLog.lastIndex(), diff --git a/raft/raft_test.go b/raft/raft_test.go index 95408976b..29eec28c1 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4706,7 +4706,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw learners[i] = true } v.id = id - v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight) + v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight, v.prs.MaxInflightBytes) if len(learners) > 0 { v.prs.Learners = map[uint64]struct{}{} } diff --git a/raft/tracker/progress.go b/raft/tracker/progress.go index adf9cb4ba..f4e1e07d8 100644 --- a/raft/tracker/progress.go +++ b/raft/tracker/progress.go @@ -134,14 +134,15 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { } // UpdateOnEntriesSend updates the progress on the given number of consecutive -// entries being sent in a MsgApp, appended at and after the given log index. -func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error { +// entries being sent in a MsgApp, with the given total bytes size, appended at +// and after the given log index. +func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error { switch pr.State { case StateReplicate: if entries > 0 { last := nextIndex + uint64(entries) - 1 pr.OptimisticUpdate(last) - pr.Inflights.Add(last, 0) // TODO: set bytes to sum(Entries[].Size()) + pr.Inflights.Add(last, bytes) } // If this message overflows the in-flights tracker, or it was already full, // consider this message being a probe, so that the flow is paused. diff --git a/raft/tracker/tracker.go b/raft/tracker/tracker.go index 72dcc73b8..938b7878c 100644 --- a/raft/tracker/tracker.go +++ b/raft/tracker/tracker.go @@ -121,13 +121,15 @@ type ProgressTracker struct { Votes map[uint64]bool - MaxInflight int + MaxInflight int + MaxInflightBytes uint64 } // MakeProgressTracker initializes a ProgressTracker. -func MakeProgressTracker(maxInflight int) ProgressTracker { +func MakeProgressTracker(maxInflight int, maxBytes uint64) ProgressTracker { p := ProgressTracker{ - MaxInflight: maxInflight, + MaxInflight: maxInflight, + MaxInflightBytes: maxBytes, Config: Config{ Voters: quorum.JointConfig{ quorum.MajorityConfig{},