raft: extract Progress update on MsgApp to a method

Previously, Progress update on MsgApp send was scattered across raft.go and
tracker/progress.go. This commit better encapsulates this logic in the Progress
type.

Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
This commit is contained in:
Pavel Kalinnikov 2022-11-01 21:52:26 +00:00
parent d5ac7b833f
commit 3bc3d2071e
2 changed files with 25 additions and 13 deletions

View File

@ -472,20 +472,9 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
}
// Send the actual MsgApp otherwise, and update the progress accordingly.
// TODO(pavelkalinnikov): factor out the Progress update to a method
next := pr.Next // save Next for later, as the progress update can change it
if n := len(ents); n != 0 {
switch pr.State {
// optimistically increase the next when in StateReplicate
case tracker.StateReplicate:
last := ents[n-1].Index
pr.OptimisticUpdate(last)
pr.Inflights.Add(last)
case tracker.StateProbe:
pr.ProbeSent = true
default:
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
if err := pr.UpdateOnEntriesSend(len(ents), next); err != nil {
r.logger.Panicf("%x: %v", r.id, err)
}
r.send(pb.Message{
To: to,

View File

@ -137,6 +137,29 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
pr.PendingSnapshot = snapshoti
}
// UpdateOnEntriesSend updates the progress on the given number of consecutive
// entries being sent in a MsgApp, appended at and after the given log index.
func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error {
switch pr.State {
case StateReplicate:
if entries > 0 {
last := nextIndex + uint64(entries) - 1
pr.OptimisticUpdate(last)
pr.Inflights.Add(last)
}
case StateProbe:
// TODO(pavelkalinnikov): this condition captures the previous behaviour,
// but we should set ProbeSent unconditionally for simplicity, because any
// MsgApp in StateProbe is a probe, not only non-empty ones.
if entries > 0 {
pr.ProbeSent = true
}
default:
return fmt.Errorf("sending append in unhandled state %s", pr.State)
}
return nil
}
// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
// index acked by it. The method returns false if the given n index comes from
// an outdated message. Otherwise it updates the progress and returns true.