diff --git a/raft/raft.go b/raft/raft.go index 56dd20810..211a3b0e2 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -1078,7 +1078,13 @@ func stepLeader(r *raft, m pb.Message) error { pr.becomeReplicate() case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort(): r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + // Transition back to replicating state via probing state + // (which takes the snapshot into account). If we didn't + // move to replicating state, that would only happen with + // the next round of appends (but there may not be a next + // round for a while, exposing an inconsistent RaftStatus). pr.becomeProbe() + pr.becomeReplicate() case pr.State == ProgressStateReplicate: pr.ins.freeTo(m.Index) } diff --git a/raft/raft_snap_test.go b/raft/raft_snap_test.go index a80ed4da8..14cedc23c 100644 --- a/raft/raft_snap_test.go +++ b/raft/raft_snap_test.go @@ -111,6 +111,114 @@ func TestSnapshotSucceed(t *testing.T) { } } +// TestSnapshotSucceedViaAppResp regression tests the situation in which a snap- +// shot is sent to a follower at the most recent index (i.e. the snapshot index +// is the leader's last index is the committed index). In that situation, a bug +// in the past left the follower in probing status until the next log entry was +// committed. +func TestSnapshotSucceedViaAppResp(t *testing.T) { + snap := pb.Snapshot{ + Metadata: pb.SnapshotMetadata{ + Index: 11, // magic number + Term: 11, // magic number + ConfState: pb.ConfState{Nodes: []uint64{1, 2, 3}}, + }, + } + + s1 := NewMemoryStorage() + n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, s1) + + // Become follower because otherwise the way this test sets things up the + // leadership term will be 1 (which is stale). We want it to match the snap- + // shot term in this test. + n1.becomeFollower(snap.Metadata.Term-1, 2) + n1.becomeCandidate() + n1.becomeLeader() + + // Apply a snapshot on the leader. This is a workaround against the fact that + // the leader will always append an empty entry, but that empty entry works + // against what we're trying to assert in this test, namely that a snapshot + // at the latest committed index leaves the follower in probing state. + // With the snapshot, the empty entry is fully committed. + n1.restore(snap) + + noMessage := pb.MessageType(-1) + mustSend := func(from, to *raft, typ pb.MessageType) pb.Message { + t.Helper() + for i, msg := range from.msgs { + if msg.From != from.id || msg.To != to.id || msg.Type != typ { + continue + } + t.Log(DescribeMessage(msg, func([]byte) string { return "" })) + if err := to.Step(msg); err != nil { + t.Fatalf("%v: %s", msg, err) + } + from.msgs = append(from.msgs[:i], from.msgs[i+1:]...) + return msg + } + if typ == noMessage { + if len(from.msgs) == 0 { + return pb.Message{} + } + t.Fatalf("expected no more messages, but got %d->%d %v", from.id, to.id, from.msgs) + } + t.Fatalf("message %d->%d %s not found in %v", from.id, to.id, typ, from.msgs) + return pb.Message{} // unreachable + } + + // Create the follower that will receive the snapshot. + s2 := NewMemoryStorage() + n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, s2) + + // Let the leader probe the follower. + if !n1.maybeSendAppend(2, true /* sendIfEmpty */) { + t.Fatalf("expected message to be sent") + } + if msg := mustSend(n1, n2, pb.MsgApp); len(msg.Entries) > 0 { + // For this test to work, the leader must not have anything to append + // to the follower right now. + t.Fatalf("unexpectedly appending entries %v", msg.Entries) + } + + // Follower rejects the append (because it doesn't have any log entries) + if msg := mustSend(n2, n1, pb.MsgAppResp); !msg.Reject { + t.Fatalf("expected a rejection with zero hint, got reject=%t hint=%d", msg.Reject, msg.RejectHint) + } + + expIdx := snap.Metadata.Index + // Leader sends snapshot due to RejectHint of zero (the storage we use here + // has index zero compacted). + if msg := mustSend(n1, n2, pb.MsgSnap); msg.Snapshot.Metadata.Index != expIdx { + t.Fatalf("expected snapshot at index %d, got %d", expIdx, msg.Snapshot.Metadata.Index) + } + + // n2 reacts to snapshot with MsgAppResp. + if msg := mustSend(n2, n1, pb.MsgAppResp); msg.Index != expIdx { + t.Fatalf("expected AppResp at index %d, got %d", expIdx, msg.Index) + } + + // Leader sends MsgApp to communicate commit index. + if msg := mustSend(n1, n2, pb.MsgApp); msg.Commit != expIdx { + t.Fatalf("expected commit index %d, got %d", expIdx, msg.Commit) + } + + // Follower responds. + mustSend(n2, n1, pb.MsgAppResp) + + // Leader has correct state for follower. + pr := n1.prs[2] + if pr.State != ProgressStateReplicate { + t.Fatalf("unexpected state %v", pr) + } + if pr.Match != expIdx || pr.Next != expIdx+1 { + t.Fatalf("expected match = %d, next = %d; got match = %d and next = %d", expIdx, expIdx+1, pr.Match, pr.Next) + } + + // Leader and follower are done. + mustSend(n1, n2, noMessage) + mustSend(n2, n1, noMessage) +} + func TestSnapshotAbort(t *testing.T) { storage := NewMemoryStorage() sm := newTestRaft(1, []uint64{1, 2}, 10, 1, storage) @@ -128,7 +236,14 @@ func TestSnapshotAbort(t *testing.T) { if sm.prs[2].PendingSnapshot != 0 { t.Fatalf("PendingSnapshot = %d, want 0", sm.prs[2].PendingSnapshot) } - if sm.prs[2].Next != 12 { - t.Fatalf("Next = %d, want 12", sm.prs[2].Next) + // The follower entered ProgressStateReplicate and the leader send an append + // and optimistically updated the progress (so we see 13 instead of 12). + // There is something to append because the leader appended an empty entry + // to the log at index 12 when it assumed leadership. + if sm.prs[2].Next != 13 { + t.Fatalf("Next = %d, want 13", sm.prs[2].Next) + } + if n := sm.prs[2].ins.count; n != 1 { + t.Fatalf("expected an inflight message, got %d", n) } }