mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #14633 from pavelkalinnikov/send_empty_append
raft: send empty appends when replication is paused
This commit is contained in:
commit
49ecea5dae
64
raft/raft.go
64
raft/raft.go
@ -437,11 +437,20 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
|
|||||||
if pr.IsPaused() {
|
if pr.IsPaused() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
m := pb.Message{}
|
|
||||||
m.To = to
|
|
||||||
|
|
||||||
term, errt := r.raftLog.term(pr.Next - 1)
|
term, errt := r.raftLog.term(pr.Next - 1)
|
||||||
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
|
var ents []pb.Entry
|
||||||
|
var erre error
|
||||||
|
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
|
||||||
|
// Otherwise, if we had a full Inflights and all inflight messages were in
|
||||||
|
// fact dropped, replication to that follower would stall. Instead, an empty
|
||||||
|
// MsgApp will eventually reach the follower (heartbeats responses prompt the
|
||||||
|
// leader to send an append), allowing it to be acked or rejected, both of
|
||||||
|
// which will clear out Inflights.
|
||||||
|
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
|
||||||
|
ents, erre = r.raftLog.entries(pr.Next, r.maxMsgSize)
|
||||||
|
}
|
||||||
|
|
||||||
if len(ents) == 0 && !sendIfEmpty {
|
if len(ents) == 0 && !sendIfEmpty {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -452,7 +461,6 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
m.Type = pb.MsgSnap
|
|
||||||
snapshot, err := r.raftLog.snapshot()
|
snapshot, err := r.raftLog.snapshot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == ErrSnapshotTemporarilyUnavailable {
|
if err == ErrSnapshotTemporarilyUnavailable {
|
||||||
@ -464,33 +472,29 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
|
|||||||
if IsEmptySnap(snapshot) {
|
if IsEmptySnap(snapshot) {
|
||||||
panic("need non-empty snapshot")
|
panic("need non-empty snapshot")
|
||||||
}
|
}
|
||||||
m.Snapshot = snapshot
|
|
||||||
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||||
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
||||||
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
|
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
|
||||||
pr.BecomeSnapshot(sindex)
|
pr.BecomeSnapshot(sindex)
|
||||||
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
||||||
} else {
|
|
||||||
m.Type = pb.MsgApp
|
r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: snapshot})
|
||||||
m.Index = pr.Next - 1
|
return true
|
||||||
m.LogTerm = term
|
|
||||||
m.Entries = ents
|
|
||||||
m.Commit = r.raftLog.committed
|
|
||||||
if n := len(m.Entries); n != 0 {
|
|
||||||
switch pr.State {
|
|
||||||
// optimistically increase the next when in StateReplicate
|
|
||||||
case tracker.StateReplicate:
|
|
||||||
last := m.Entries[n-1].Index
|
|
||||||
pr.OptimisticUpdate(last)
|
|
||||||
pr.Inflights.Add(last)
|
|
||||||
case tracker.StateProbe:
|
|
||||||
pr.ProbeSent = true
|
|
||||||
default:
|
|
||||||
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send the actual MsgApp otherwise, and update the progress accordingly.
|
||||||
|
next := pr.Next // save Next for later, as the progress update can change it
|
||||||
|
if err := pr.UpdateOnEntriesSend(len(ents), next); err != nil {
|
||||||
|
r.logger.Panicf("%x: %v", r.id, err)
|
||||||
}
|
}
|
||||||
}
|
r.send(pb.Message{
|
||||||
r.send(m)
|
To: to,
|
||||||
|
Type: pb.MsgApp,
|
||||||
|
Index: next - 1,
|
||||||
|
LogTerm: term,
|
||||||
|
Entries: ents,
|
||||||
|
Commit: r.raftLog.committed,
|
||||||
|
})
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1300,12 +1304,12 @@ func stepLeader(r *raft, m pb.Message) error {
|
|||||||
}
|
}
|
||||||
case pb.MsgHeartbeatResp:
|
case pb.MsgHeartbeatResp:
|
||||||
pr.RecentActive = true
|
pr.RecentActive = true
|
||||||
pr.ProbeSent = false
|
pr.MsgAppFlowPaused = false
|
||||||
|
|
||||||
// free one slot for the full inflights window to allow progress.
|
// NB: if the follower is paused (full Inflights), this will still send an
|
||||||
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
|
// empty append, allowing it to recover from situations in which all the
|
||||||
pr.Inflights.FreeFirstOne()
|
// messages that filled up Inflights in the first place were dropped. Note
|
||||||
}
|
// also that the outgoing heartbeat already communicated the commit index.
|
||||||
if pr.Match < r.raftLog.lastIndex() {
|
if pr.Match < r.raftLog.lastIndex() {
|
||||||
r.sendAppend(m.From)
|
r.sendAppend(m.From)
|
||||||
}
|
}
|
||||||
@ -1345,7 +1349,7 @@ func stepLeader(r *raft, m pb.Message) error {
|
|||||||
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
|
// If snapshot finish, wait for the MsgAppResp from the remote node before sending
|
||||||
// out the next MsgApp.
|
// out the next MsgApp.
|
||||||
// If snapshot failure, wait for a heartbeat interval before next try
|
// If snapshot failure, wait for a heartbeat interval before next try
|
||||||
pr.ProbeSent = true
|
pr.MsgAppFlowPaused = true
|
||||||
case pb.MsgUnreachable:
|
case pb.MsgUnreachable:
|
||||||
// During optimistic replication, if the remote becomes unreachable,
|
// During optimistic replication, if the remote becomes unreachable,
|
||||||
// there is huge probability that a MsgApp is lost.
|
// there is huge probability that a MsgApp is lost.
|
||||||
|
@ -36,14 +36,14 @@ func TestMsgAppFlowControlFull(t *testing.T) {
|
|||||||
for i := 0; i < r.prs.MaxInflight; i++ {
|
for i := 0; i < r.prs.MaxInflight; i++ {
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||||
ms := r.readMessages()
|
ms := r.readMessages()
|
||||||
if len(ms) != 1 {
|
if len(ms) != 1 || ms[0].Type != pb.MsgApp {
|
||||||
t.Fatalf("#%d: len(ms) = %d, want 1", i, len(ms))
|
t.Fatalf("#%d: len(ms) = %d, want 1 MsgApp", i, len(ms))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure 1
|
// ensure 1
|
||||||
if !pr2.Inflights.Full() {
|
if !pr2.IsPaused() {
|
||||||
t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true)
|
t.Fatal("paused = false, want true")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure 2
|
// ensure 2
|
||||||
@ -84,20 +84,20 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
|
|||||||
// fill in the inflights window again
|
// fill in the inflights window again
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||||
ms := r.readMessages()
|
ms := r.readMessages()
|
||||||
if len(ms) != 1 {
|
if len(ms) != 1 || ms[0].Type != pb.MsgApp {
|
||||||
t.Fatalf("#%d: len(ms) = %d, want 1", tt, len(ms))
|
t.Fatalf("#%d: len(ms) = %d, want 1 MsgApp", tt, len(ms))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure 1
|
// ensure 1
|
||||||
if !pr2.Inflights.Full() {
|
if !pr2.IsPaused() {
|
||||||
t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true)
|
t.Fatalf("#%d: paused = false, want true", tt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure 2
|
// ensure 2
|
||||||
for i := 0; i < tt; i++ {
|
for i := 0; i < tt; i++ {
|
||||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(i)})
|
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(i)})
|
||||||
if !pr2.Inflights.Full() {
|
if !pr2.IsPaused() {
|
||||||
t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true)
|
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -120,32 +120,28 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for tt := 1; tt < 5; tt++ {
|
for tt := 1; tt < 5; tt++ {
|
||||||
if !pr2.Inflights.Full() {
|
|
||||||
t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true)
|
|
||||||
}
|
|
||||||
|
|
||||||
// recv tt msgHeartbeatResp and expect one free slot
|
// recv tt msgHeartbeatResp and expect one free slot
|
||||||
for i := 0; i < tt; i++ {
|
for i := 0; i < tt; i++ {
|
||||||
|
if !pr2.IsPaused() {
|
||||||
|
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
|
||||||
|
}
|
||||||
|
// Unpauses the progress, sends an empty MsgApp, and pauses it again.
|
||||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
|
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
|
||||||
r.readMessages()
|
ms := r.readMessages()
|
||||||
if pr2.Inflights.Full() {
|
if len(ms) != 1 || ms[0].Type != pb.MsgApp || len(ms[0].Entries) != 0 {
|
||||||
t.Fatalf("#%d.%d: inflights.full = %t, want %t", tt, i, pr2.Inflights.Full(), false)
|
t.Fatalf("#%d.%d: len(ms) == %d, want 1 empty MsgApp", tt, i, len(ms))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// one slot
|
// No more appends are sent if there are no heartbeats.
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
if !pr2.IsPaused() {
|
||||||
|
t.Fatalf("#%d.%d: paused = false, want true", tt, i)
|
||||||
|
}
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
||||||
ms := r.readMessages()
|
ms := r.readMessages()
|
||||||
if len(ms) != 1 {
|
if len(ms) != 0 {
|
||||||
t.Fatalf("#%d: free slot = 0, want 1", tt)
|
t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms))
|
||||||
}
|
|
||||||
|
|
||||||
// and just one slot
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
|
|
||||||
ms1 := r.readMessages()
|
|
||||||
if len(ms1) != 0 {
|
|
||||||
t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms1))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) {
|
|||||||
if sm.prs.Progress[2].Next != 1 {
|
if sm.prs.Progress[2].Next != 1 {
|
||||||
t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next)
|
t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next)
|
||||||
}
|
}
|
||||||
if !sm.prs.Progress[2].ProbeSent {
|
if !sm.prs.Progress[2].MsgAppFlowPaused {
|
||||||
t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
|
t.Errorf("MsgAppFlowPaused = %v, want true", sm.prs.Progress[2].MsgAppFlowPaused)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) {
|
|||||||
if sm.prs.Progress[2].Next != 12 {
|
if sm.prs.Progress[2].Next != 12 {
|
||||||
t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next)
|
t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next)
|
||||||
}
|
}
|
||||||
if !sm.prs.Progress[2].ProbeSent {
|
if !sm.prs.Progress[2].MsgAppFlowPaused {
|
||||||
t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
|
t.Errorf("MsgAppFlowPaused = %v, want true", sm.prs.Progress[2].MsgAppFlowPaused)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,17 +94,21 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
|
|||||||
r.becomeCandidate()
|
r.becomeCandidate()
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
|
|
||||||
r.prs.Progress[2].ProbeSent = true
|
r.prs.Progress[2].MsgAppFlowPaused = true
|
||||||
|
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||||
if !r.prs.Progress[2].ProbeSent {
|
if !r.prs.Progress[2].MsgAppFlowPaused {
|
||||||
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
|
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.prs.Progress[2].BecomeReplicate()
|
r.prs.Progress[2].BecomeReplicate()
|
||||||
|
if r.prs.Progress[2].MsgAppFlowPaused {
|
||||||
|
t.Errorf("paused = %v, want false", r.prs.Progress[2].MsgAppFlowPaused)
|
||||||
|
}
|
||||||
|
r.prs.Progress[2].MsgAppFlowPaused = true
|
||||||
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
|
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
|
||||||
if r.prs.Progress[2].ProbeSent {
|
if r.prs.Progress[2].MsgAppFlowPaused {
|
||||||
t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent)
|
t.Errorf("paused = %v, want false", r.prs.Progress[2].MsgAppFlowPaused)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2654,8 +2658,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !r.prs.Progress[2].ProbeSent {
|
if !r.prs.Progress[2].MsgAppFlowPaused {
|
||||||
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
|
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
|
||||||
}
|
}
|
||||||
for j := 0; j < 10; j++ {
|
for j := 0; j < 10; j++ {
|
||||||
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
|
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
|
||||||
@ -2669,8 +2673,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
|||||||
for j := 0; j < r.heartbeatTimeout; j++ {
|
for j := 0; j < r.heartbeatTimeout; j++ {
|
||||||
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
|
||||||
}
|
}
|
||||||
if !r.prs.Progress[2].ProbeSent {
|
if !r.prs.Progress[2].MsgAppFlowPaused {
|
||||||
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
|
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
|
||||||
}
|
}
|
||||||
|
|
||||||
// consume the heartbeat
|
// consume the heartbeat
|
||||||
@ -2692,8 +2696,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
|
|||||||
if msg[0].Index != 0 {
|
if msg[0].Index != 0 {
|
||||||
t.Errorf("index = %d, want %d", msg[0].Index, 0)
|
t.Errorf("index = %d, want %d", msg[0].Index, 0)
|
||||||
}
|
}
|
||||||
if !r.prs.Progress[2].ProbeSent {
|
if !r.prs.Progress[2].MsgAppFlowPaused {
|
||||||
t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
|
t.Errorf("paused = %v, want true", r.prs.Progress[2].MsgAppFlowPaused)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,15 +84,13 @@ type Storage interface {
|
|||||||
Append([]pb.Entry) error
|
Append([]pb.Entry) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// defaultRaftConfig sets up a *raft.Config with reasonable testing defaults.
|
// raftConfigStub sets up a raft.Config stub with reasonable testing defaults.
|
||||||
// In particular, no limits are set.
|
// In particular, no limits are set. It is not a complete config: ID and Storage
|
||||||
func defaultRaftConfig(id uint64, applied uint64, s raft.Storage) *raft.Config {
|
// must be set for each node using the stub as a template.
|
||||||
return &raft.Config{
|
func raftConfigStub() raft.Config {
|
||||||
ID: id,
|
return raft.Config{
|
||||||
Applied: applied,
|
|
||||||
ElectionTick: 3,
|
ElectionTick: 3,
|
||||||
HeartbeatTick: 1,
|
HeartbeatTick: 1,
|
||||||
Storage: s,
|
|
||||||
MaxSizePerMsg: math.MaxUint64,
|
MaxSizePerMsg: math.MaxUint64,
|
||||||
MaxInflightMsgs: math.MaxInt32,
|
MaxInflightMsgs: math.MaxInt32,
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) error {
|
func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) error {
|
||||||
n := firstAsInt(t, d)
|
n := firstAsInt(t, d)
|
||||||
var snap pb.Snapshot
|
var snap pb.Snapshot
|
||||||
|
cfg := raftConfigStub()
|
||||||
for _, arg := range d.CmdArgs[1:] {
|
for _, arg := range d.CmdArgs[1:] {
|
||||||
for i := range arg.Vals {
|
for i := range arg.Vals {
|
||||||
switch arg.Key {
|
switch arg.Key {
|
||||||
@ -39,14 +40,17 @@ func (env *InteractionEnv) handleAddNodes(t *testing.T, d datadriven.TestData) e
|
|||||||
var id uint64
|
var id uint64
|
||||||
arg.Scan(t, i, &id)
|
arg.Scan(t, i, &id)
|
||||||
snap.Metadata.ConfState.Learners = append(snap.Metadata.ConfState.Learners, id)
|
snap.Metadata.ConfState.Learners = append(snap.Metadata.ConfState.Learners, id)
|
||||||
|
case "inflight":
|
||||||
|
arg.Scan(t, i, &cfg.MaxInflightMsgs)
|
||||||
case "index":
|
case "index":
|
||||||
arg.Scan(t, i, &snap.Metadata.Index)
|
arg.Scan(t, i, &snap.Metadata.Index)
|
||||||
|
cfg.Applied = snap.Metadata.Index
|
||||||
case "content":
|
case "content":
|
||||||
arg.Scan(t, i, &snap.Data)
|
arg.Scan(t, i, &snap.Data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return env.AddNodes(n, snap)
|
return env.AddNodes(n, cfg, snap)
|
||||||
}
|
}
|
||||||
|
|
||||||
type snapOverrideStorage struct {
|
type snapOverrideStorage struct {
|
||||||
@ -63,9 +67,9 @@ func (s snapOverrideStorage) Snapshot() (pb.Snapshot, error) {
|
|||||||
|
|
||||||
var _ raft.Storage = snapOverrideStorage{}
|
var _ raft.Storage = snapOverrideStorage{}
|
||||||
|
|
||||||
// AddNodes adds n new nodes initializes from the given snapshot (which may be
|
// AddNodes adds n new nodes initialized from the given snapshot (which may be
|
||||||
// empty). They will be assigned consecutive IDs.
|
// empty), and using the cfg as template. They will be assigned consecutive IDs.
|
||||||
func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error {
|
func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) error {
|
||||||
bootstrap := !reflect.DeepEqual(snap, pb.Snapshot{})
|
bootstrap := !reflect.DeepEqual(snap, pb.Snapshot{})
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
id := uint64(1 + len(env.Nodes))
|
id := uint64(1 + len(env.Nodes))
|
||||||
@ -103,9 +107,10 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error {
|
|||||||
return fmt.Errorf("failed to establish first index %d; got %d", exp, fi)
|
return fmt.Errorf("failed to establish first index %d; got %d", exp, fi)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cfg := defaultRaftConfig(id, snap.Metadata.Index, s)
|
cfg := cfg // fork the config stub
|
||||||
|
cfg.ID, cfg.Storage = id, s
|
||||||
if env.Options.OnConfig != nil {
|
if env.Options.OnConfig != nil {
|
||||||
env.Options.OnConfig(cfg)
|
env.Options.OnConfig(&cfg)
|
||||||
if cfg.ID != id {
|
if cfg.ID != id {
|
||||||
// This could be supported but then we need to do more work
|
// This could be supported but then we need to do more work
|
||||||
// translating back and forth -- not worth it.
|
// translating back and forth -- not worth it.
|
||||||
@ -117,7 +122,7 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error {
|
|||||||
}
|
}
|
||||||
cfg.Logger = env.Output
|
cfg.Logger = env.Output
|
||||||
|
|
||||||
rn, err := raft.NewRawNode(cfg)
|
rn, err := raft.NewRawNode(&cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -127,7 +132,7 @@ func (env *InteractionEnv) AddNodes(n int, snap pb.Snapshot) error {
|
|||||||
// TODO(tbg): allow a more general Storage, as long as it also allows
|
// TODO(tbg): allow a more general Storage, as long as it also allows
|
||||||
// us to apply snapshots, append entries, and update the HardState.
|
// us to apply snapshots, append entries, and update the HardState.
|
||||||
Storage: s,
|
Storage: s,
|
||||||
Config: cfg,
|
Config: &cfg,
|
||||||
History: []pb.Snapshot{snap},
|
History: []pb.Snapshot{snap},
|
||||||
}
|
}
|
||||||
env.Nodes = append(env.Nodes, node)
|
env.Nodes = append(env.Nodes, node)
|
||||||
|
190
raft/testdata/replicate_pause.txt
vendored
Normal file
190
raft/testdata/replicate_pause.txt
vendored
Normal file
@ -0,0 +1,190 @@
|
|||||||
|
# This test ensures that MsgApp stream to a follower is paused when the
|
||||||
|
# in-flight state exceeds the configured limits. This is a regression test for
|
||||||
|
# the issue fixed by https://github.com/etcd-io/etcd/pull/14633.
|
||||||
|
|
||||||
|
# Turn off output during the setup of the test.
|
||||||
|
log-level none
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
# Start with 3 nodes, with a limited in-flight capacity.
|
||||||
|
add-nodes 3 voters=(1,2,3) index=10 inflight=3
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
campaign 1
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
stabilize
|
||||||
|
----
|
||||||
|
ok (quiet)
|
||||||
|
|
||||||
|
# Propose 3 entries.
|
||||||
|
propose 1 prop_1_12
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
propose 1 prop_1_13
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
propose 1 prop_1_14
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
# Store entries and send proposals.
|
||||||
|
process-ready 1
|
||||||
|
----
|
||||||
|
ok (quiet)
|
||||||
|
|
||||||
|
# Re-enable log messages.
|
||||||
|
log-level debug
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
# Expect that in-flight tracking to nodes 2 and 3 is saturated.
|
||||||
|
status 1
|
||||||
|
----
|
||||||
|
1: StateReplicate match=14 next=15
|
||||||
|
2: StateReplicate match=11 next=15 paused inflight=3[full]
|
||||||
|
3: StateReplicate match=11 next=15 paused inflight=3[full]
|
||||||
|
|
||||||
|
log-level none
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
# Commit entries between nodes 1 and 2.
|
||||||
|
stabilize 1 2
|
||||||
|
----
|
||||||
|
ok (quiet)
|
||||||
|
|
||||||
|
log-level debug
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
# Expect that the entries are committed and stored on nodes 1 and 2.
|
||||||
|
status 1
|
||||||
|
----
|
||||||
|
1: StateReplicate match=14 next=15
|
||||||
|
2: StateReplicate match=14 next=15
|
||||||
|
3: StateReplicate match=11 next=15 paused inflight=3[full]
|
||||||
|
|
||||||
|
# Drop append messages to node 3.
|
||||||
|
deliver-msgs drop=3
|
||||||
|
----
|
||||||
|
dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"]
|
||||||
|
dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"]
|
||||||
|
dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"]
|
||||||
|
|
||||||
|
|
||||||
|
# Repeat committing 3 entries.
|
||||||
|
propose 1 prop_1_15
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
propose 1 prop_1_16
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
propose 1 prop_1_17
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
# In-flight tracking to nodes 2 and 3 is saturated, but node 3 is behind.
|
||||||
|
status 1
|
||||||
|
----
|
||||||
|
1: StateReplicate match=14 next=15
|
||||||
|
2: StateReplicate match=14 next=18 paused inflight=3[full]
|
||||||
|
3: StateReplicate match=11 next=15 paused inflight=3[full]
|
||||||
|
|
||||||
|
log-level none
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
# Commit entries between nodes 1 and 2 again.
|
||||||
|
stabilize 1 2
|
||||||
|
----
|
||||||
|
ok (quiet)
|
||||||
|
|
||||||
|
log-level debug
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
# Expect that the entries are committed and stored only on nodes 1 and 2.
|
||||||
|
status 1
|
||||||
|
----
|
||||||
|
1: StateReplicate match=17 next=18
|
||||||
|
2: StateReplicate match=17 next=18
|
||||||
|
3: StateReplicate match=11 next=15 paused inflight=3[full]
|
||||||
|
|
||||||
|
# Make a heartbeat roundtrip.
|
||||||
|
tick-heartbeat 1
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
stabilize 1
|
||||||
|
----
|
||||||
|
> 1 handling Ready
|
||||||
|
Ready MustSync=false:
|
||||||
|
Messages:
|
||||||
|
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17
|
||||||
|
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
|
||||||
|
|
||||||
|
stabilize 2 3
|
||||||
|
----
|
||||||
|
> 2 receiving messages
|
||||||
|
1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17
|
||||||
|
> 3 receiving messages
|
||||||
|
1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11
|
||||||
|
> 2 handling Ready
|
||||||
|
Ready MustSync=false:
|
||||||
|
Messages:
|
||||||
|
2->1 MsgHeartbeatResp Term:1 Log:0/0
|
||||||
|
> 3 handling Ready
|
||||||
|
Ready MustSync=false:
|
||||||
|
Messages:
|
||||||
|
3->1 MsgHeartbeatResp Term:1 Log:0/0
|
||||||
|
|
||||||
|
# After handling heartbeat responses, node 1 sends an empty MsgApp to a
|
||||||
|
# throttled node 3 because it hasn't yet replied to a single MsgApp, and the
|
||||||
|
# in-flight tracker is still saturated.
|
||||||
|
stabilize 1
|
||||||
|
----
|
||||||
|
> 1 receiving messages
|
||||||
|
2->1 MsgHeartbeatResp Term:1 Log:0/0
|
||||||
|
3->1 MsgHeartbeatResp Term:1 Log:0/0
|
||||||
|
> 1 handling Ready
|
||||||
|
Ready MustSync=false:
|
||||||
|
Messages:
|
||||||
|
1->3 MsgApp Term:1 Log:1/14 Commit:17
|
||||||
|
|
||||||
|
# Node 3 finally receives a MsgApp, but there was a gap, so it rejects it.
|
||||||
|
stabilize 3
|
||||||
|
----
|
||||||
|
> 3 receiving messages
|
||||||
|
1->3 MsgApp Term:1 Log:1/14 Commit:17
|
||||||
|
DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1
|
||||||
|
> 3 handling Ready
|
||||||
|
Ready MustSync=false:
|
||||||
|
Messages:
|
||||||
|
3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11)
|
||||||
|
|
||||||
|
log-level none
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
stabilize
|
||||||
|
----
|
||||||
|
ok (quiet)
|
||||||
|
|
||||||
|
log-level debug
|
||||||
|
----
|
||||||
|
ok
|
||||||
|
|
||||||
|
# Eventually all nodes catch up on the committed state.
|
||||||
|
status 1
|
||||||
|
----
|
||||||
|
1: StateReplicate match=17 next=18
|
||||||
|
2: StateReplicate match=17 next=18
|
||||||
|
3: StateReplicate match=17 next=18
|
@ -113,10 +113,6 @@ func (in *Inflights) FreeLE(to uint64) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// FreeFirstOne releases the first inflight. This is a no-op if nothing is
|
|
||||||
// inflight.
|
|
||||||
func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
|
|
||||||
|
|
||||||
// Full returns true if no more messages can be sent at the moment.
|
// Full returns true if no more messages can be sent at the moment.
|
||||||
func (in *Inflights) Full() bool {
|
func (in *Inflights) Full() bool {
|
||||||
return in.count == in.size
|
return in.count == in.size
|
||||||
|
@ -105,6 +105,20 @@ func TestInflightFreeTo(t *testing.T) {
|
|||||||
in.Add(uint64(i))
|
in.Add(uint64(i))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
in.FreeLE(0)
|
||||||
|
|
||||||
|
wantIn0 := &Inflights{
|
||||||
|
start: 1,
|
||||||
|
count: 9,
|
||||||
|
size: 10,
|
||||||
|
// ↓------------------------
|
||||||
|
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(in, wantIn0) {
|
||||||
|
t.Fatalf("in = %+v, want %+v", in, wantIn0)
|
||||||
|
}
|
||||||
|
|
||||||
in.FreeLE(4)
|
in.FreeLE(4)
|
||||||
|
|
||||||
wantIn := &Inflights{
|
wantIn := &Inflights{
|
||||||
@ -166,24 +180,3 @@ func TestInflightFreeTo(t *testing.T) {
|
|||||||
t.Fatalf("in = %+v, want %+v", in, wantIn4)
|
t.Fatalf("in = %+v, want %+v", in, wantIn4)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestInflightFreeFirstOne(t *testing.T) {
|
|
||||||
in := NewInflights(10)
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
in.Add(uint64(i))
|
|
||||||
}
|
|
||||||
|
|
||||||
in.FreeFirstOne()
|
|
||||||
|
|
||||||
wantIn := &Inflights{
|
|
||||||
start: 1,
|
|
||||||
count: 9,
|
|
||||||
size: 10,
|
|
||||||
// ↓------------------------
|
|
||||||
buffer: []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
|
||||||
}
|
|
||||||
|
|
||||||
if !reflect.DeepEqual(in, wantIn) {
|
|
||||||
t.Fatalf("in = %+v, want %+v", in, wantIn)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -55,10 +55,13 @@ type Progress struct {
|
|||||||
// This is always true on the leader.
|
// This is always true on the leader.
|
||||||
RecentActive bool
|
RecentActive bool
|
||||||
|
|
||||||
// ProbeSent is used while this follower is in StateProbe. When ProbeSent is
|
// MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This
|
||||||
// true, raft should pause sending replication message to this peer until
|
// happens in StateProbe, or StateReplicate with saturated Inflights. In both
|
||||||
// ProbeSent is reset. See ProbeAcked() and IsPaused().
|
// cases, we need to continue sending MsgApp once in a while to guarantee
|
||||||
ProbeSent bool
|
// progress, but we only do so when MsgAppFlowPaused is false (it is reset on
|
||||||
|
// receiving a heartbeat response), to not overflow the receiver. See
|
||||||
|
// IsPaused().
|
||||||
|
MsgAppFlowPaused bool
|
||||||
|
|
||||||
// Inflights is a sliding window for the inflight messages.
|
// Inflights is a sliding window for the inflight messages.
|
||||||
// Each inflight message contains one or more log entries.
|
// Each inflight message contains one or more log entries.
|
||||||
@ -78,10 +81,10 @@ type Progress struct {
|
|||||||
IsLearner bool
|
IsLearner bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResetState moves the Progress into the specified State, resetting ProbeSent,
|
// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused,
|
||||||
// PendingSnapshot, and Inflights.
|
// PendingSnapshot, and Inflights.
|
||||||
func (pr *Progress) ResetState(state StateType) {
|
func (pr *Progress) ResetState(state StateType) {
|
||||||
pr.ProbeSent = false
|
pr.MsgAppFlowPaused = false
|
||||||
pr.PendingSnapshot = 0
|
pr.PendingSnapshot = 0
|
||||||
pr.State = state
|
pr.State = state
|
||||||
pr.Inflights.reset()
|
pr.Inflights.reset()
|
||||||
@ -101,13 +104,6 @@ func min(a, b uint64) uint64 {
|
|||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProbeAcked is called when this peer has accepted an append. It resets
|
|
||||||
// ProbeSent to signal that additional append messages should be sent without
|
|
||||||
// further delay.
|
|
||||||
func (pr *Progress) ProbeAcked() {
|
|
||||||
pr.ProbeSent = false
|
|
||||||
}
|
|
||||||
|
|
||||||
// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
|
// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
|
||||||
// optionally and if larger, the index of the pending snapshot.
|
// optionally and if larger, the index of the pending snapshot.
|
||||||
func (pr *Progress) BecomeProbe() {
|
func (pr *Progress) BecomeProbe() {
|
||||||
@ -137,6 +133,32 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
|
|||||||
pr.PendingSnapshot = snapshoti
|
pr.PendingSnapshot = snapshoti
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateOnEntriesSend updates the progress on the given number of consecutive
|
||||||
|
// entries being sent in a MsgApp, appended at and after the given log index.
|
||||||
|
func (pr *Progress) UpdateOnEntriesSend(entries int, nextIndex uint64) error {
|
||||||
|
switch pr.State {
|
||||||
|
case StateReplicate:
|
||||||
|
if entries > 0 {
|
||||||
|
last := nextIndex + uint64(entries) - 1
|
||||||
|
pr.OptimisticUpdate(last)
|
||||||
|
pr.Inflights.Add(last)
|
||||||
|
}
|
||||||
|
// If this message overflows the in-flights tracker, or it was already full,
|
||||||
|
// consider this message being a probe, so that the flow is paused.
|
||||||
|
pr.MsgAppFlowPaused = pr.Inflights.Full()
|
||||||
|
case StateProbe:
|
||||||
|
// TODO(pavelkalinnikov): this condition captures the previous behaviour,
|
||||||
|
// but we should set MsgAppFlowPaused unconditionally for simplicity, because any
|
||||||
|
// MsgApp in StateProbe is a probe, not only non-empty ones.
|
||||||
|
if entries > 0 {
|
||||||
|
pr.MsgAppFlowPaused = true
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("sending append in unhandled state %s", pr.State)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
|
// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
|
||||||
// index acked by it. The method returns false if the given n index comes from
|
// index acked by it. The method returns false if the given n index comes from
|
||||||
// an outdated message. Otherwise it updates the progress and returns true.
|
// an outdated message. Otherwise it updates the progress and returns true.
|
||||||
@ -145,7 +167,7 @@ func (pr *Progress) MaybeUpdate(n uint64) bool {
|
|||||||
if pr.Match < n {
|
if pr.Match < n {
|
||||||
pr.Match = n
|
pr.Match = n
|
||||||
updated = true
|
updated = true
|
||||||
pr.ProbeAcked()
|
pr.MsgAppFlowPaused = false
|
||||||
}
|
}
|
||||||
pr.Next = max(pr.Next, n+1)
|
pr.Next = max(pr.Next, n+1)
|
||||||
return updated
|
return updated
|
||||||
@ -187,7 +209,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pr.Next = max(min(rejected, matchHint+1), 1)
|
pr.Next = max(min(rejected, matchHint+1), 1)
|
||||||
pr.ProbeSent = false
|
pr.MsgAppFlowPaused = false
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,9 +222,9 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
|
|||||||
func (pr *Progress) IsPaused() bool {
|
func (pr *Progress) IsPaused() bool {
|
||||||
switch pr.State {
|
switch pr.State {
|
||||||
case StateProbe:
|
case StateProbe:
|
||||||
return pr.ProbeSent
|
return pr.MsgAppFlowPaused
|
||||||
case StateReplicate:
|
case StateReplicate:
|
||||||
return pr.Inflights.Full()
|
return pr.MsgAppFlowPaused
|
||||||
case StateSnapshot:
|
case StateSnapshot:
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
|
@ -27,7 +27,7 @@ func TestProgressString(t *testing.T) {
|
|||||||
State: StateSnapshot,
|
State: StateSnapshot,
|
||||||
PendingSnapshot: 123,
|
PendingSnapshot: 123,
|
||||||
RecentActive: false,
|
RecentActive: false,
|
||||||
ProbeSent: true,
|
MsgAppFlowPaused: true,
|
||||||
IsLearner: true,
|
IsLearner: true,
|
||||||
Inflights: ins,
|
Inflights: ins,
|
||||||
}
|
}
|
||||||
@ -47,14 +47,14 @@ func TestProgressIsPaused(t *testing.T) {
|
|||||||
{StateProbe, false, false},
|
{StateProbe, false, false},
|
||||||
{StateProbe, true, true},
|
{StateProbe, true, true},
|
||||||
{StateReplicate, false, false},
|
{StateReplicate, false, false},
|
||||||
{StateReplicate, true, false},
|
{StateReplicate, true, true},
|
||||||
{StateSnapshot, false, true},
|
{StateSnapshot, false, true},
|
||||||
{StateSnapshot, true, true},
|
{StateSnapshot, true, true},
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
p := &Progress{
|
p := &Progress{
|
||||||
State: tt.state,
|
State: tt.state,
|
||||||
ProbeSent: tt.paused,
|
MsgAppFlowPaused: tt.paused,
|
||||||
Inflights: NewInflights(256),
|
Inflights: NewInflights(256),
|
||||||
}
|
}
|
||||||
if g := p.IsPaused(); g != tt.w {
|
if g := p.IsPaused(); g != tt.w {
|
||||||
@ -64,20 +64,20 @@ func TestProgressIsPaused(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset
|
// TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset
|
||||||
// ProbeSent.
|
// MsgAppFlowPaused.
|
||||||
func TestProgressResume(t *testing.T) {
|
func TestProgressResume(t *testing.T) {
|
||||||
p := &Progress{
|
p := &Progress{
|
||||||
Next: 2,
|
Next: 2,
|
||||||
ProbeSent: true,
|
MsgAppFlowPaused: true,
|
||||||
}
|
}
|
||||||
p.MaybeDecrTo(1, 1)
|
p.MaybeDecrTo(1, 1)
|
||||||
if p.ProbeSent {
|
if p.MsgAppFlowPaused {
|
||||||
t.Errorf("paused= %v, want false", p.ProbeSent)
|
t.Errorf("paused= %v, want false", p.MsgAppFlowPaused)
|
||||||
}
|
}
|
||||||
p.ProbeSent = true
|
p.MsgAppFlowPaused = true
|
||||||
p.MaybeUpdate(2)
|
p.MaybeUpdate(2)
|
||||||
if p.ProbeSent {
|
if p.MsgAppFlowPaused {
|
||||||
t.Errorf("paused= %v, want false", p.ProbeSent)
|
t.Errorf("paused= %v, want false", p.MsgAppFlowPaused)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user