diff --git a/raft/raft.go b/raft/raft.go index f9f38155d..aca9d7ff9 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -437,8 +437,6 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { if pr.IsPaused() { return false } - m := pb.Message{} - m.To = to term, errt := r.raftLog.term(pr.Next - 1) ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) @@ -452,7 +450,6 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return false } - m.Type = pb.MsgSnap snapshot, err := r.raftLog.snapshot() if err != nil { if err == ErrSnapshotTemporarilyUnavailable { @@ -464,33 +461,40 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { if IsEmptySnap(snapshot) { panic("need non-empty snapshot") } - m.Snapshot = snapshot sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr) pr.BecomeSnapshot(sindex) r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) - } else { - m.Type = pb.MsgApp - m.Index = pr.Next - 1 - m.LogTerm = term - m.Entries = ents - m.Commit = r.raftLog.committed - if n := len(m.Entries); n != 0 { - switch pr.State { - // optimistically increase the next when in StateReplicate - case tracker.StateReplicate: - last := m.Entries[n-1].Index - pr.OptimisticUpdate(last) - pr.Inflights.Add(last) - case tracker.StateProbe: - pr.ProbeSent = true - default: - r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) - } + + r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: snapshot}) + return true + } + + // Send the actual MsgApp otherwise, and update the progress accordingly. + // TODO(pavelkalinnikov): factor out the Progress update to a method + next := pr.Next // save Next for later, as the progress update can change it + if n := len(ents); n != 0 { + switch pr.State { + // optimistically increase the next when in StateReplicate + case tracker.StateReplicate: + last := ents[n-1].Index + pr.OptimisticUpdate(last) + pr.Inflights.Add(last) + case tracker.StateProbe: + pr.ProbeSent = true + default: + r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) } } - r.send(m) + r.send(pb.Message{ + To: to, + Type: pb.MsgApp, + Index: next - 1, + LogTerm: term, + Entries: ents, + Commit: r.raftLog.committed, + }) return true }