Merge pull request #14413 from tbg/raft-single-voter

raft: don't emit unstable CommittedEntries
This commit is contained in:
Benjamin Wang 2022-09-22 08:43:37 +08:00 committed by GitHub
commit 31d9664cb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 716 additions and 376 deletions

View File

@ -55,7 +55,7 @@ func mustTemp(pre, body string) string {
}
func ltoa(l *raftLog) string {
s := fmt.Sprintf("committed: %d\n", l.committed)
s := fmt.Sprintf("lastIndex: %d\n", l.lastIndex())
s += fmt.Sprintf("applied: %d\n", l.applied)
for i, e := range l.allEntries() {
s += fmt.Sprintf("#%d: %+v\n", i, e)

View File

@ -211,11 +211,7 @@ type Peer struct {
Context []byte
}
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
//
// Peers must not be zero length; call RestartNode in that case.
func StartNode(c *Config, peers []Peer) Node {
func setupNode(c *Config, peers []Peer) *node {
if len(peers) == 0 {
panic("no peers given; use RestartNode instead")
}
@ -229,11 +225,19 @@ func StartNode(c *Config, peers []Peer) Node {
}
n := newNode(rn)
go n.run()
return &n
}
// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
//
// Peers must not be zero length; call RestartNode in that case.
func StartNode(c *Config, peers []Peer) Node {
n := setupNode(c, peers)
go n.run()
return n
}
// RestartNode is similar to StartNode but does not take a list of peers.
// The current membership of the cluster will be restored from the Storage.
// If the caller has an existing state machine, pass in the last log index that

View File

@ -35,6 +35,12 @@ import (
func readyWithTimeout(n Node) Ready {
select {
case rd := <-n.Ready():
if nn, ok := n.(*nodeTestHarness); ok {
n = nn.node
}
if nn, ok := n.(*node); ok {
nn.rn.raft.logger.Infof("emitted ready: %s", DescribeReady(rd, nil))
}
return rd
case <-time.After(time.Second):
panic("timed out waiting for ready")
@ -126,6 +132,10 @@ func TestNodeStepUnblock(t *testing.T) {
func TestNodePropose(t *testing.T) {
var msgs []raftpb.Message
appendStep := func(r *raft, m raftpb.Message) error {
t.Log(DescribeMessage(m, nil))
if m.Type == raftpb.MsgAppResp {
return nil // injected by (*raft).advance
}
msgs = append(msgs, m)
return nil
}
@ -163,55 +173,6 @@ func TestNodePropose(t *testing.T) {
}
}
// TestNodeReadIndex ensures that node.ReadIndex sends the MsgReadIndex message to the underlying raft.
// It also ensures that ReadState can be read out through ready chan.
func TestNodeReadIndex(t *testing.T) {
var msgs []raftpb.Message
appendStep := func(r *raft, m raftpb.Message) error {
msgs = append(msgs, m)
return nil
}
wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
s := newTestMemoryStorage(withPeers(1))
rn := newTestRawNode(1, 10, 1, s)
n := newNode(rn)
r := rn.raft
r.readStates = wrs
go n.run()
n.Campaign(context.TODO())
for {
rd := <-n.Ready()
if !reflect.DeepEqual(rd.ReadStates, wrs) {
t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs)
}
s.Append(rd.Entries)
if rd.SoftState.Lead == r.id {
n.Advance()
break
}
n.Advance()
}
r.step = appendStep
wrequestCtx := []byte("somedata2")
n.ReadIndex(context.TODO(), wrequestCtx)
n.Stop()
if len(msgs) != 1 {
t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
}
if msgs[0].Type != raftpb.MsgReadIndex {
t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
}
if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
}
}
// TestDisableProposalForwarding ensures that proposals are not forwarded to
// the leader when DisableProposalForwarding is true.
func TestDisableProposalForwarding(t *testing.T) {
@ -308,6 +269,9 @@ func TestNodeReadIndexToOldLeader(t *testing.T) {
func TestNodeProposeConfig(t *testing.T) {
var msgs []raftpb.Message
appendStep := func(r *raft, m raftpb.Message) error {
if m.Type == raftpb.MsgAppResp {
return nil // injected by (*raft).advance
}
msgs = append(msgs, m)
return nil
}
@ -352,30 +316,34 @@ func TestNodeProposeConfig(t *testing.T) {
// not affect the later propose to add new node.
func TestNodeProposeAddDuplicateNode(t *testing.T) {
s := newTestMemoryStorage(withPeers(1))
rn := newTestRawNode(1, 10, 1, s)
n := newNode(rn)
go n.run()
n.Campaign(context.TODO())
rdyEntries := make([]raftpb.Entry, 0)
cfg := newTestConfig(1, 10, 1, s)
ctx, cancel, n := newNodeTestHarness(context.Background(), t, cfg)
defer cancel()
n.Campaign(ctx)
allCommittedEntries := make([]raftpb.Entry, 0)
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
done := make(chan struct{})
stop := make(chan struct{})
goroutineStopped := make(chan struct{})
applyConfChan := make(chan struct{})
rd := readyWithTimeout(n)
s.Append(rd.Entries)
n.Advance()
go func() {
defer close(done)
defer close(goroutineStopped)
for {
select {
case <-stop:
case <-ctx.Done():
return
case <-ticker.C:
n.Tick()
case rd := <-n.Ready():
t.Log(DescribeReady(rd, nil))
s.Append(rd.Entries)
applied := false
for _, e := range rd.Entries {
rdyEntries = append(rdyEntries, e)
for _, e := range rd.CommittedEntries {
allCommittedEntries = append(allCommittedEntries, e)
switch e.Type {
case raftpb.EntryNormal:
case raftpb.EntryConfChange:
@ -395,32 +363,31 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
ccdata1, _ := cc1.Marshal()
n.ProposeConfChange(context.TODO(), cc1)
n.ProposeConfChange(ctx, cc1)
<-applyConfChan
// try add the same node again
n.ProposeConfChange(context.TODO(), cc1)
n.ProposeConfChange(ctx, cc1)
<-applyConfChan
// the new node join should be ok
cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
ccdata2, _ := cc2.Marshal()
n.ProposeConfChange(context.TODO(), cc2)
n.ProposeConfChange(ctx, cc2)
<-applyConfChan
close(stop)
<-done
cancel()
<-goroutineStopped
if len(rdyEntries) != 4 {
t.Errorf("len(entry) = %d, want %d, %v\n", len(rdyEntries), 4, rdyEntries)
if len(allCommittedEntries) != 4 {
t.Errorf("len(entry) = %d, want %d, %v\n", len(allCommittedEntries), 4, allCommittedEntries)
}
if !bytes.Equal(rdyEntries[1].Data, ccdata1) {
t.Errorf("data = %v, want %v", rdyEntries[1].Data, ccdata1)
if !bytes.Equal(allCommittedEntries[1].Data, ccdata1) {
t.Errorf("data = %v, want %v", allCommittedEntries[1].Data, ccdata1)
}
if !bytes.Equal(rdyEntries[3].Data, ccdata2) {
t.Errorf("data = %v, want %v", rdyEntries[3].Data, ccdata2)
if !bytes.Equal(allCommittedEntries[3].Data, ccdata2) {
t.Errorf("data = %v, want %v", allCommittedEntries[3].Data, ccdata2)
}
n.Stop()
}
// TestBlockProposal ensures that node will block proposal when it does not
@ -463,6 +430,10 @@ func TestNodeProposeWaitDropped(t *testing.T) {
t.Logf("dropping message: %v", m.String())
return ErrProposalDropped
}
if m.Type == raftpb.MsgAppResp {
// This is produced by raft internally, see (*raft).advance.
return nil
}
msgs = append(msgs, m)
return nil
}
@ -495,7 +466,7 @@ func TestNodeProposeWaitDropped(t *testing.T) {
n.Stop()
if len(msgs) != 0 {
t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
t.Fatalf("len(msgs) = %d, want %d", len(msgs), 0)
}
}
@ -580,9 +551,6 @@ func TestReadyContainUpdates(t *testing.T) {
// start with correct configuration change entries, and can accept and commit
// proposals.
func TestNodeStart(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
ccdata, err := cc.Marshal()
if err != nil {
@ -600,11 +568,17 @@ func TestNodeStart(t *testing.T) {
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1},
Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 2, Data: nil}},
MustSync: true,
},
{
HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
Entries: nil,
CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
MustSync: false,
},
}
storage := NewMemoryStorage()
c := &Config{
@ -616,27 +590,44 @@ func TestNodeStart(t *testing.T) {
MaxInflightMsgs: 256,
}
n := StartNode(c, []Peer{{ID: 1}})
defer n.Stop()
g := <-n.Ready()
if !reflect.DeepEqual(g, wants[0]) {
t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
} else {
storage.Append(g.Entries)
ctx, cancel, n := newNodeTestHarness(context.Background(), t, c, Peer{ID: 1})
defer cancel()
{
rd := <-n.Ready()
if !reflect.DeepEqual(rd, wants[0]) {
t.Fatalf("#1: rd = %+v,\n w %+v", rd, wants[0])
}
storage.Append(rd.Entries)
n.Advance()
}
if err := n.Campaign(ctx); err != nil {
t.Fatal(err)
}
rd := <-n.Ready()
storage.Append(rd.Entries)
n.Advance()
{
rd := <-n.Ready()
storage.Append(rd.Entries)
n.Advance()
}
n.Propose(ctx, []byte("foo"))
if g2 := <-n.Ready(); !reflect.DeepEqual(g2, wants[1]) {
t.Errorf("#%d: g = %+v,\n w %+v", 2, g2, wants[1])
} else {
storage.Append(g2.Entries)
{
rd := <-n.Ready()
if !reflect.DeepEqual(rd, wants[1]) {
t.Errorf("#2: rd = %+v,\n w %+v", rd, wants[1])
}
storage.Append(rd.Entries)
n.Advance()
}
{
rd := <-n.Ready()
if !reflect.DeepEqual(rd, wants[2]) {
t.Errorf("#3: rd = %+v,\n w %+v", rd, wants[2])
}
storage.Append(rd.Entries)
n.Advance()
}
@ -740,10 +731,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
}
func TestNodeAdvance(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
storage := NewMemoryStorage()
storage := newTestMemoryStorage(withPeers(1))
c := &Config{
ID: 1,
ElectionTick: 10,
@ -752,21 +740,17 @@ func TestNodeAdvance(t *testing.T) {
MaxSizePerMsg: noLimit,
MaxInflightMsgs: 256,
}
n := StartNode(c, []Peer{{ID: 1}})
defer n.Stop()
rd := <-n.Ready()
ctx, cancel, n := newNodeTestHarness(context.Background(), t, c)
defer cancel()
n.Campaign(ctx)
rd := readyWithTimeout(n)
// Commit empty entry.
storage.Append(rd.Entries)
n.Advance()
n.Campaign(ctx)
<-n.Ready()
n.Propose(ctx, []byte("foo"))
select {
case rd = <-n.Ready():
t.Fatalf("unexpected Ready before Advance: %+v", rd)
case <-time.After(time.Millisecond):
}
rd = readyWithTimeout(n)
storage.Append(rd.Entries)
n.Advance()
select {
@ -911,15 +895,14 @@ func TestCommitPagination(t *testing.T) {
s := newTestMemoryStorage(withPeers(1))
cfg := newTestConfig(1, 10, 1, s)
cfg.MaxCommittedSizePerReady = 2048
rn, err := NewRawNode(cfg)
if err != nil {
t.Fatal(err)
}
n := newNode(rn)
go n.run()
n.Campaign(context.TODO())
ctx, cancel, n := newNodeTestHarness(context.Background(), t, cfg)
defer cancel()
n.Campaign(ctx)
rd := readyWithTimeout(&n)
rd := readyWithTimeout(n)
s.Append(rd.Entries)
n.Advance()
rd = readyWithTimeout(n)
if len(rd.CommittedEntries) != 1 {
t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
}
@ -928,25 +911,32 @@ func TestCommitPagination(t *testing.T) {
blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 3; i++ {
if err := n.Propose(context.TODO(), blob); err != nil {
if err := n.Propose(ctx, blob); err != nil {
t.Fatal(err)
}
}
// First the three proposals have to be appended.
rd = readyWithTimeout(n)
if len(rd.Entries) != 3 {
t.Fatal("expected to see three entries")
}
s.Append(rd.Entries)
n.Advance()
// The 3 proposals will commit in two batches.
rd = readyWithTimeout(&n)
rd = readyWithTimeout(n)
if len(rd.CommittedEntries) != 2 {
t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()
rd = readyWithTimeout(&n)
rd = readyWithTimeout(n)
if len(rd.CommittedEntries) != 1 {
t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()
n.Stop()
}
type ignoreSizeHintMemStorage struct {

110
raft/node_util_test.go Normal file
View File

@ -0,0 +1,110 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raft
import (
"context"
"fmt"
"testing"
"time"
)
type nodeTestHarness struct {
*node
t *testing.T
}
func (l *nodeTestHarness) Debug(v ...interface{}) {
l.t.Log(v...)
}
func (l *nodeTestHarness) Debugf(format string, v ...interface{}) {
l.t.Logf(format, v...)
}
func (l *nodeTestHarness) Error(v ...interface{}) {
l.t.Error(v...)
}
func (l *nodeTestHarness) Errorf(format string, v ...interface{}) {
l.t.Errorf(format, v...)
}
func (l *nodeTestHarness) Info(v ...interface{}) {
l.t.Log(v...)
}
func (l *nodeTestHarness) Infof(format string, v ...interface{}) {
l.t.Logf(format, v...)
}
func (l *nodeTestHarness) Warning(v ...interface{}) {
l.t.Log(v...)
}
func (l *nodeTestHarness) Warningf(format string, v ...interface{}) {
l.t.Logf(format, v...)
}
func (l *nodeTestHarness) Fatal(v ...interface{}) {
l.t.Error(v...)
panic(v)
}
func (l *nodeTestHarness) Fatalf(format string, v ...interface{}) {
l.t.Errorf(format, v...)
panic(fmt.Sprintf(format, v...))
}
func (l *nodeTestHarness) Panic(v ...interface{}) {
l.t.Log(v...)
panic(v)
}
func (l *nodeTestHarness) Panicf(format string, v ...interface{}) {
l.t.Errorf(format, v...)
panic(fmt.Sprintf(format, v...))
}
func newNodeTestHarness(ctx context.Context, t *testing.T, cfg *Config, peers ...Peer) (_ context.Context, cancel func(), _ *nodeTestHarness) {
// Wrap context in a 10s timeout to make tests more robust. Otherwise,
// it's likely that deadlock will occur unless Node behaves exactly as
// expected - when you expect a Ready and start waiting on the channel
// but no Ready ever shows up, for example.
ctx, cancel = context.WithTimeout(ctx, 10*time.Second)
var n *node
if len(peers) > 0 {
n = setupNode(cfg, peers)
} else {
rn, err := NewRawNode(cfg)
if err != nil {
t.Fatal(err)
}
nn := newNode(rn)
n = &nn
}
go func() {
defer func() {
if r := recover(); r != nil {
t.Error(r)
}
}()
defer cancel()
defer n.Stop()
n.run()
}()
t.Cleanup(n.Stop)
return ctx, cancel, &nodeTestHarness{node: n, t: t}
}

View File

@ -572,6 +572,19 @@ func (r *raft) advance(rd Ready) {
if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
if r.id == r.lead {
// The leader needs to self-ack the entries just appended (since it doesn't
// send an MsgApp to itself). This is roughly equivalent to:
//
// r.prs.Progress[r.id].MaybeUpdate(e.Index)
// if r.maybeCommit() {
// r.bcastAppend()
// }
_ = r.Step(pb.Message{From: r.id, Type: pb.MsgAppResp, Index: e.Index})
}
// NB: it's important for performance that this call happens after
// r.Step above on the leader. This is because r.Step can then use
// a fast-path for `r.raftLog.term()`.
r.raftLog.stableTo(e.Index, e.Term)
}
if !IsEmptySnap(rd.Snapshot) {
@ -634,10 +647,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
return false
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.prs.Progress[r.id].MaybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
r.raftLog.append(es...)
return true
}
@ -735,7 +745,11 @@ func (r *raft) becomeLeader() {
// (perhaps after having received a snapshot as a result). The leader is
// trivially in this state. Note that r.reset() has initialized this
// progress with the last index already.
r.prs.Progress[r.id].BecomeReplicate()
pr := r.prs.Progress[r.id]
pr.BecomeReplicate()
// The leader always has RecentActive == true; MsgCheckQuorum makes sure to
// preserve this.
pr.RecentActive = true
// Conservatively set the pendingConfIndex to the last index in the
// log. There may or may not be a pending config change, but it's
@ -995,15 +1009,6 @@ func stepLeader(r *raft, m pb.Message) error {
r.bcastHeartbeat()
return nil
case pb.MsgCheckQuorum:
// The leader should always see itself as active. As a precaution, handle
// the case in which the leader isn't in the configuration any more (for
// example if it just removed itself).
//
// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
// leader steps down when removing itself. I might be missing something.
if pr := r.prs.Progress[r.id]; pr != nil {
pr.RecentActive = true
}
if !r.prs.QuorumActive() {
r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
r.becomeFollower(r.Term, None)
@ -1104,6 +1109,9 @@ func stepLeader(r *raft, m pb.Message) error {
}
switch m.Type {
case pb.MsgAppResp:
// NB: this code path is also hit from (*raft).advance, where the leader steps
// an MsgAppResp to acknowledge the appended entries in the last Ready.
pr.RecentActive = true
if m.Reject {
@ -1272,7 +1280,9 @@ func stepLeader(r *raft, m pb.Message) error {
// replicate, or when freeTo() covers multiple messages). If
// we have more entries to send, send as many messages as we
// can (without sending empty messages for the commit index)
for r.maybeSendAppend(m.From, false) {
if r.id != m.From {
for r.maybeSendAppend(m.From, false) {
}
}
// Transfer leadership is in progress.
if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
@ -1811,6 +1821,11 @@ func numOfPendingConf(ents []pb.Entry) int {
}
func releasePendingReadIndexMessages(r *raft) {
if len(r.pendingReadIndexMessages) == 0 {
// Fast path for the common case to avoid a call to storage.LastIndex()
// via committedEntryInCurrentTerm.
return
}
if !r.committedEntryInCurrentTerm() {
r.logger.Error("pending MsgReadIndex should be released only after first commit in current term")
return

View File

@ -473,9 +473,9 @@ func TestLeaderCommitEntry(t *testing.T) {
// Reference: section 5.3
func TestLeaderAcknowledgeCommit(t *testing.T) {
tests := []struct {
size int
acceptors map[uint64]bool
wack bool
size int
nonLeaderAcceptors map[uint64]bool
wack bool
}{
{1, nil, true},
{3, nil, false},
@ -496,8 +496,11 @@ func TestLeaderAcknowledgeCommit(t *testing.T) {
li := r.raftLog.lastIndex()
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
for _, m := range r.readMessages() {
if tt.acceptors[m.To] {
rd := newReady(r, &SoftState{}, pb.HardState{})
s.Append(rd.Entries)
r.advance(rd) // simulate having appended entry on leader
for _, m := range rd.Messages {
if tt.nonLeaderAcceptors[m.To] {
r.Step(acceptAndReply(m))
}
}
@ -891,6 +894,9 @@ func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) {
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Term: r.Term, Index: tt.index})
rd := newReady(r, &SoftState{}, pb.HardState{})
storage.Append(rd.Entries)
r.advance(rd)
if r.raftLog.committed != tt.wcommit {
t.Errorf("#%d: commit = %d, want %d", i, r.raftLog.committed, tt.wcommit)
}

View File

@ -29,13 +29,15 @@ import (
// nextEnts returns the appliable entries and updates the applied index
func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
// Transfer all unstable entries to "stable" storage.
s.Append(r.raftLog.unstableEntries())
r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
ents = r.raftLog.nextEnts()
r.raftLog.appliedTo(r.raftLog.committed)
return ents
for {
rd := newReady(r, &SoftState{}, pb.HardState{})
s.Append(rd.Entries)
r.advance(rd)
if len(rd.Entries)+len(rd.CommittedEntries) == 0 {
return ents
}
ents = append(ents, rd.CommittedEntries...)
}
}
func mustAppendEntry(r *raft, ents ...pb.Entry) {
@ -57,21 +59,33 @@ func (r *raft) readMessages() []pb.Message {
}
func TestProgressLeader(t *testing.T) {
r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
s := newTestMemoryStorage(withPeers(1, 2))
r := newTestRaft(1, 5, 1, s)
r.becomeCandidate()
r.becomeLeader()
r.prs.Progress[2].BecomeReplicate()
// Send proposals to r1. The first 5 entries should be appended to the log.
// Send proposals to r1. The first 5 entries should be queued in the unstable log.
propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
for i := 0; i < 5; i++ {
if pr := r.prs.Progress[r.id]; pr.State != tracker.StateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
t.Errorf("unexpected progress %v", pr)
}
if err := r.Step(propMsg); err != nil {
t.Fatalf("proposal resulted in error: %v", err)
}
}
if m := r.prs.Progress[1].Match; m != 0 {
t.Fatalf("expected zero match, got %d", m)
}
rd := newReady(r, &SoftState{}, pb.HardState{})
if len(rd.Entries) != 6 || len(rd.Entries[0].Data) > 0 || string(rd.Entries[5].Data) != "foo" {
t.Fatalf("unexpected Entries: %s", DescribeReady(rd, nil))
}
r.advance(rd)
if m := r.prs.Progress[1].Match; m != 6 {
t.Fatalf("unexpected Match %d", m)
}
if m := r.prs.Progress[1].Next; m != 7 {
t.Fatalf("unexpected Next %d", m)
}
}
// TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat response.
@ -663,10 +677,12 @@ func TestLogReplication(t *testing.T) {
// TestLearnerLogReplication tests that a learner can receive entries from the leader.
func TestLearnerLogReplication(t *testing.T) {
n1 := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
s1 := newTestMemoryStorage(withPeers(1), withLearners(2))
n1 := newTestLearnerRaft(1, 10, 1, s1)
n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
nt := newNetwork(n1, n2)
nt.t = t
n1.becomeFollower(1, None)
n2.becomeFollower(1, None)
@ -686,12 +702,23 @@ func TestLearnerLogReplication(t *testing.T) {
t.Error("peer 2 state: not learner, want yes")
}
nextCommitted := n1.raftLog.committed + 1
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
nextCommitted := uint64(2)
{
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
rd := newReady(n1, &SoftState{}, pb.HardState{})
nt.send(rd.Messages...)
s1.Append(rd.Entries)
n1.advance(rd)
}
if n1.raftLog.committed != nextCommitted {
t.Errorf("peer 1 wants committed to %d, but still %d", nextCommitted, n1.raftLog.committed)
}
{
rd := newReady(n1, &SoftState{}, pb.HardState{})
nt.send(rd.Messages...)
}
if n1.raftLog.committed != n2.raftLog.committed {
t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed)
}
@ -703,11 +730,18 @@ func TestLearnerLogReplication(t *testing.T) {
}
func TestSingleNodeCommit(t *testing.T) {
tt := newNetwork(nil)
s := newTestMemoryStorage(withPeers(1))
cfg := newTestConfig(1, 10, 1, s)
r := newRaft(cfg)
tt := newNetwork(r)
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
rd := newReady(r, &SoftState{}, pb.HardState{})
s.Append(rd.Entries)
r.advance(rd)
sm := tt.peers[1].(*raft)
if sm.raftLog.committed != 3 {
t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
@ -792,9 +826,12 @@ func TestCommitWithoutNewTermEntry(t *testing.T) {
}
func TestDuelingCandidates(t *testing.T) {
a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
s1 := newTestMemoryStorage(withPeers(1, 2, 3))
s2 := newTestMemoryStorage(withPeers(1, 2, 3))
s3 := newTestMemoryStorage(withPeers(1, 2, 3))
a := newTestRaft(1, 10, 1, s1)
b := newTestRaft(2, 10, 1, s2)
c := newTestRaft(3, 10, 1, s3)
nt := newNetwork(a, b, c)
nt.cut(1, 3)
@ -820,21 +857,19 @@ func TestDuelingCandidates(t *testing.T) {
// we expect it to disrupt the leader 1 since it has a higher term
// 3 will be follower again since both 1 and 2 rejects its vote request since 3 does not have a long enough log
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
wlog := &raftLog{
storage: &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}},
committed: 1,
unstable: unstable{offset: 2},
if sm.state != StateFollower {
t.Errorf("state = %s, want %s", sm.state, StateFollower)
}
tests := []struct {
sm *raft
state StateType
term uint64
raftLog *raftLog
sm *raft
state StateType
term uint64
lastIndex uint64
}{
{a, StateFollower, 2, wlog},
{b, StateFollower, 2, wlog},
{c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)},
{a, StateFollower, 2, 1},
{b, StateFollower, 2, 1},
{c, StateFollower, 2, 0},
}
for i, tt := range tests {
@ -844,14 +879,8 @@ func TestDuelingCandidates(t *testing.T) {
if g := tt.sm.Term; g != tt.term {
t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
}
base := ltoa(tt.raftLog)
if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
l := ltoa(sm.raftLog)
if g := diffu(base, l); g != "" {
t.Errorf("#%d: diff:\n%s", i, g)
}
} else {
t.Logf("#%d: empty log", i)
if exp, act := tt.lastIndex, tt.sm.raftLog.lastIndex(); exp != act {
t.Errorf("#%d: last index exp = %d, act = %d", i, exp, act)
}
}
}
@ -868,6 +897,7 @@ func TestDuelingPreCandidates(t *testing.T) {
c := newRaft(cfgC)
nt := newNetwork(a, b, c)
nt.t = t
nt.cut(1, 3)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
@ -891,20 +921,15 @@ func TestDuelingPreCandidates(t *testing.T) {
// With PreVote, it does not disrupt the leader.
nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
wlog := &raftLog{
storage: &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}},
committed: 1,
unstable: unstable{offset: 2},
}
tests := []struct {
sm *raft
state StateType
term uint64
raftLog *raftLog
sm *raft
state StateType
term uint64
lastIndex uint64
}{
{a, StateLeader, 1, wlog},
{b, StateFollower, 1, wlog},
{c, StateFollower, 1, newLog(NewMemoryStorage(), raftLogger)},
{a, StateLeader, 1, 1},
{b, StateFollower, 1, 1},
{c, StateFollower, 1, 0},
}
for i, tt := range tests {
@ -914,14 +939,8 @@ func TestDuelingPreCandidates(t *testing.T) {
if g := tt.sm.Term; g != tt.term {
t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
}
base := ltoa(tt.raftLog)
if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
l := ltoa(sm.raftLog)
if g := diffu(base, l); g != "" {
t.Errorf("#%d: diff:\n%s", i, g)
}
} else {
t.Logf("#%d: empty log", i)
if exp, act := tt.lastIndex, tt.sm.raftLog.lastIndex(); exp != act {
t.Errorf("#%d: last index is %d, exp %d", i, act, exp)
}
}
}
@ -1058,6 +1077,7 @@ func TestProposal(t *testing.T) {
// promote 1 to become leader
send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
r := tt.network.peers[1].(*raft)
wantLog := newLog(NewMemoryStorage(), raftLogger)
if tt.success {
@ -1065,8 +1085,8 @@ func TestProposal(t *testing.T) {
storage: &MemoryStorage{
ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
},
unstable: unstable{offset: 3},
committed: 2}
unstable: unstable{offset: 3},
}
}
base := ltoa(wantLog)
for i, p := range tt.peers {
@ -1079,8 +1099,7 @@ func TestProposal(t *testing.T) {
t.Logf("#%d: peer %d empty log", j, i)
}
}
sm := tt.network.peers[1].(*raft)
if g := sm.Term; g != 1 {
if g := r.Term; g != 1 {
t.Errorf("#%d: term = %d, want %d", j, g, 1)
}
}
@ -1405,14 +1424,14 @@ func TestRaftFreesReadOnlyMem(t *testing.T) {
// TestMsgAppRespWaitReset verifies the resume behavior of a leader
// MsgAppResp.
func TestMsgAppRespWaitReset(t *testing.T) {
sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
s := newTestMemoryStorage(withPeers(1, 2, 3))
sm := newTestRaft(1, 5, 1, s)
sm.becomeCandidate()
sm.becomeLeader()
// The new leader has just emitted a new Term 4 entry; consume those messages
// from the outgoing queue.
sm.bcastAppend()
sm.readMessages()
// Run n1 which includes sending a message like the below
// one to n2, but also appending to its own log.
nextEnts(sm, s)
// Node 2 acks the first entry, making it committed.
sm.Step(pb.Message{
@ -2228,7 +2247,8 @@ func TestReadOnlyOptionSafe(t *testing.T) {
}
func TestReadOnlyWithLearner(t *testing.T) {
a := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
s := newTestMemoryStorage(withPeers(1), withLearners(2))
a := newTestLearnerRaft(1, 10, 1, s)
b := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
nt := newNetwork(a, b)
@ -2258,6 +2278,7 @@ func TestReadOnlyWithLearner(t *testing.T) {
for i, tt := range tests {
for j := 0; j < tt.proposals; j++ {
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
nextEnts(a, s) // append the entries on the leader
}
nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
@ -3634,13 +3655,17 @@ func TestLeaderTransferTimeout(t *testing.T) {
}
func TestLeaderTransferIgnoreProposal(t *testing.T) {
nt := newNetwork(nil, nil, nil)
s := newTestMemoryStorage(withPeers(1, 2, 3))
r := newTestRaft(1, 10, 1, s)
nt := newNetwork(r, nil, nil)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
nt.isolate(3)
lead := nt.peers[1].(*raft)
nextEnts(r, s) // handle empty entry
// Transfer leadership to isolated node to let transfer pending, then send proposal.
nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
if lead.leadTransferee != 3 {
@ -4630,6 +4655,8 @@ func votedWithConfig(configFunc func(*Config), vote, term uint64) *raft {
}
type network struct {
t *testing.T // optional
peers map[uint64]stateMachine
storage map[uint64]*MemoryStorage
dropm map[connem]float64
@ -4713,6 +4740,9 @@ func (nw *network) send(msgs ...pb.Message) {
for len(msgs) > 0 {
m := msgs[0]
p := nw.peers[m.To]
if nw.t != nil {
nw.t.Log(DescribeMessage(m, nil))
}
p.Step(m)
msgs = append(msgs[1:], nw.filter(p.readMessages())...)
}

View File

@ -388,122 +388,125 @@ func TestRawNodeJointAutoLeave(t *testing.T) {
}
exp2Cs := pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}}
t.Run("", func(t *testing.T) {
s := newTestMemoryStorage(withPeers(1))
rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
if err != nil {
t.Fatal(err)
}
s := newTestMemoryStorage(withPeers(1))
rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
if err != nil {
t.Fatal(err)
}
rawNode.Campaign()
proposed := false
var (
lastIndex uint64
ccdata []byte
)
// Propose the ConfChange, wait until it applies, save the resulting
// ConfState.
var cs *pb.ConfState
for cs == nil {
rd := rawNode.Ready()
s.Append(rd.Entries)
for _, ent := range rd.CommittedEntries {
var cc pb.ConfChangeI
if ent.Type == pb.EntryConfChangeV2 {
var ccc pb.ConfChangeV2
if err = ccc.Unmarshal(ent.Data); err != nil {
t.Fatal(err)
}
cc = &ccc
}
if cc != nil {
// Force it step down.
rawNode.Step(pb.Message{Type: pb.MsgHeartbeatResp, From: 1, Term: rawNode.raft.Term + 1})
cs = rawNode.ApplyConfChange(cc)
}
}
rawNode.Advance(rd)
// Once we are the leader, propose a command and a ConfChange.
if !proposed && rd.SoftState.Lead == rawNode.raft.id {
if err = rawNode.Propose([]byte("somedata")); err != nil {
t.Fatal(err)
}
ccdata, err = testCc.Marshal()
if err != nil {
t.Fatal(err)
}
rawNode.ProposeConfChange(testCc)
proposed = true
}
}
// Check that the last index is exactly the conf change we put in,
// down to the bits. Note that this comes from the Storage, which
// will not reflect any unstable entries that we'll only be presented
// with in the next Ready.
lastIndex, err = s.LastIndex()
if err != nil {
t.Fatal(err)
}
entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
if err != nil {
t.Fatal(err)
}
if len(entries) != 2 {
t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
}
if !bytes.Equal(entries[0].Data, []byte("somedata")) {
t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
}
if entries[1].Type != pb.EntryConfChangeV2 {
t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChangeV2)
}
if !bytes.Equal(entries[1].Data, ccdata) {
t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
}
if !reflect.DeepEqual(&expCs, cs) {
t.Fatalf("exp:\n%+v\nact:\n%+v", expCs, cs)
}
if rawNode.raft.pendingConfIndex != 0 {
t.Fatalf("pendingConfIndex: expected %d, got %d", 0, rawNode.raft.pendingConfIndex)
}
// Move the RawNode along. It should not leave joint because it's follower.
rd := rawNode.readyWithoutAccept()
// Check that the right ConfChange comes out.
if len(rd.Entries) != 0 {
t.Fatalf("expected zero entry, got %+v", rd)
}
// Make it leader again. It should leave joint automatically after moving apply index.
rawNode.Campaign()
rd = rawNode.Ready()
rawNode.Campaign()
proposed := false
var (
lastIndex uint64
ccdata []byte
)
// Propose the ConfChange, wait until it applies, save the resulting
// ConfState.
var cs *pb.ConfState
for cs == nil {
rd := rawNode.Ready()
s.Append(rd.Entries)
for _, ent := range rd.CommittedEntries {
var cc pb.ConfChangeI
if ent.Type == pb.EntryConfChangeV2 {
var ccc pb.ConfChangeV2
if err = ccc.Unmarshal(ent.Data); err != nil {
t.Fatal(err)
}
cc = &ccc
}
if cc != nil {
// Force it step down.
rawNode.Step(pb.Message{Type: pb.MsgHeartbeatResp, From: 1, Term: rawNode.raft.Term + 1})
cs = rawNode.ApplyConfChange(cc)
}
}
rawNode.Advance(rd)
rd = rawNode.Ready()
s.Append(rd.Entries)
// Once we are the leader, propose a command and a ConfChange.
if !proposed && rd.SoftState.Lead == rawNode.raft.id {
if err = rawNode.Propose([]byte("somedata")); err != nil {
t.Fatal(err)
}
ccdata, err = testCc.Marshal()
if err != nil {
t.Fatal(err)
}
rawNode.ProposeConfChange(testCc)
proposed = true
}
}
// Check that the right ConfChange comes out.
if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 {
t.Fatalf("expected exactly one more entry, got %+v", rd)
}
var cc pb.ConfChangeV2
if err := cc.Unmarshal(rd.Entries[0].Data); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: nil}) {
t.Fatalf("expected zero ConfChangeV2, got %+v", cc)
}
// Lie and pretend the ConfChange applied. It won't do so because now
// we require the joint quorum and we're only running one node.
cs = rawNode.ApplyConfChange(cc)
if exp := exp2Cs; !reflect.DeepEqual(&exp, cs) {
t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
}
})
// Check that the last index is exactly the conf change we put in,
// down to the bits. Note that this comes from the Storage, which
// will not reflect any unstable entries that we'll only be presented
// with in the next Ready.
lastIndex, err = s.LastIndex()
if err != nil {
t.Fatal(err)
}
entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
if err != nil {
t.Fatal(err)
}
if len(entries) != 2 {
t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
}
if !bytes.Equal(entries[0].Data, []byte("somedata")) {
t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
}
if entries[1].Type != pb.EntryConfChangeV2 {
t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChangeV2)
}
if !bytes.Equal(entries[1].Data, ccdata) {
t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
}
if !reflect.DeepEqual(&expCs, cs) {
t.Fatalf("exp:\n%+v\nact:\n%+v", expCs, cs)
}
if rawNode.raft.pendingConfIndex != 0 {
t.Fatalf("pendingConfIndex: expected %d, got %d", 0, rawNode.raft.pendingConfIndex)
}
// Move the RawNode along. It should not leave joint because it's follower.
rd := rawNode.readyWithoutAccept()
// Check that the right ConfChange comes out.
if len(rd.Entries) != 0 {
t.Fatalf("expected zero entry, got %+v", rd)
}
// Make it leader again. It should leave joint automatically after moving apply index.
rawNode.Campaign()
rd = rawNode.Ready()
t.Log(DescribeReady(rd, nil))
s.Append(rd.Entries)
rawNode.Advance(rd)
rd = rawNode.Ready()
t.Log(DescribeReady(rd, nil))
s.Append(rd.Entries)
rawNode.Advance(rd)
rd = rawNode.Ready()
t.Log(DescribeReady(rd, nil))
s.Append(rd.Entries)
// Check that the right ConfChange comes out.
if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 {
t.Fatalf("expected exactly one more entry, got %+v", rd)
}
var cc pb.ConfChangeV2
if err := cc.Unmarshal(rd.Entries[0].Data); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: nil}) {
t.Fatalf("expected zero ConfChangeV2, got %+v", cc)
}
// Lie and pretend the ConfChange applied. It won't do so because now
// we require the joint quorum and we're only running one node.
cs = rawNode.ApplyConfChange(cc)
if exp := exp2Cs; !reflect.DeepEqual(&exp, cs) {
t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
}
}
// TestRawNodeProposeAddDuplicateNode ensures that two proposes to add the same node should
@ -656,18 +659,16 @@ func TestRawNodeReadIndex(t *testing.T) {
// requires the application to bootstrap the state, i.e. it does not accept peers
// and will not create faux configuration change entries.
func TestRawNodeStart(t *testing.T) {
entries := []pb.Entry{
{Term: 1, Index: 2, Data: nil}, // empty entry
{Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
}
want := Ready{
SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1},
Entries: []pb.Entry{
{Term: 1, Index: 2, Data: nil}, // empty entry
{Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
},
CommittedEntries: []pb.Entry{
{Term: 1, Index: 2, Data: nil}, // empty entry
{Term: 1, Index: 3, Data: []byte("foo")}, // empty entry
},
MustSync: true,
SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1},
Entries: nil, // emitted & checked in intermediate Ready cycle
CommittedEntries: entries,
MustSync: false, // since we're only applying, not appending
}
storage := NewMemoryStorage()
@ -747,9 +748,24 @@ func TestRawNodeStart(t *testing.T) {
t.Fatal("expected a Ready")
}
rd := rawNode.Ready()
if !reflect.DeepEqual(entries, rd.Entries) {
t.Fatalf("expected to see entries\n%s, not\n%s", DescribeEntries(entries, nil), DescribeEntries(rd.Entries, nil))
}
storage.Append(rd.Entries)
rawNode.Advance(rd)
if !rawNode.HasReady() {
t.Fatal("expected a Ready")
}
rd = rawNode.Ready()
if len(rd.Entries) != 0 {
t.Fatalf("unexpected entries: %s", DescribeEntries(rd.Entries, nil))
}
if rd.MustSync {
t.Fatalf("should not need to sync")
}
rawNode.Advance(rd)
rd.SoftState, want.SoftState = nil, nil
if !reflect.DeepEqual(rd, want) {
@ -868,17 +884,17 @@ func TestRawNodeStatus(t *testing.T) {
// TestNodeCommitPaginationAfterRestart. The anomaly here was even worse as the
// Raft group would forget to apply entries:
//
// - node learns that index 11 is committed
// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already
// exceeds maxBytes), which isn't noticed internally by Raft
// - Commit index gets bumped to 10
// - the node persists the HardState, but crashes before applying the entries
// - upon restart, the storage returns the same entries, but `slice` takes a
// different code path and removes the last entry.
// - Raft does not emit a HardState, but when the app calls Advance(), it bumps
// its internal applied index cursor to 10 (when it should be 9)
// - the next Ready asks the app to apply index 11 (omitting index 10), losing a
// write.
// - node learns that index 11 is committed
// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already
// exceeds maxBytes), which isn't noticed internally by Raft
// - Commit index gets bumped to 10
// - the node persists the HardState, but crashes before applying the entries
// - upon restart, the storage returns the same entries, but `slice` takes a
// different code path and removes the last entry.
// - Raft does not emit a HardState, but when the app calls Advance(), it bumps
// its internal applied index cursor to 10 (when it should be 9)
// - the next Ready asks the app to apply index 11 (omitting index 10), losing a
// write.
func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
s := &ignoreSizeHintMemStorage{
MemoryStorage: newTestMemoryStorage(withPeers(1)),
@ -952,6 +968,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
data := []byte("testdata")
testEntry := pb.Entry{Data: data}
maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
t.Log("maxEntrySize", maxEntrySize)
s := newTestMemoryStorage(withPeers(1))
cfg := newTestConfig(1, 10, 1, s)
@ -960,20 +977,16 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rd := rawNode.Ready()
s.Append(rd.Entries)
rawNode.Advance(rd)
// Become the leader.
// Become the leader and apply empty entry.
rawNode.Campaign()
for {
rd = rawNode.Ready()
rd := rawNode.Ready()
s.Append(rd.Entries)
if rd.SoftState.Lead == rawNode.raft.id {
rawNode.Advance(rd)
rawNode.Advance(rd)
if len(rd.CommittedEntries) > 0 {
break
}
rawNode.Advance(rd)
}
// Simulate a network partition while we make our proposals by never
@ -995,12 +1008,25 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
// Recover from the partition. The uncommitted tail of the Raft log should
// disappear as entries are committed.
rd = rawNode.Ready()
if len(rd.CommittedEntries) != maxEntries {
t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
rd := rawNode.Ready()
if len(rd.Entries) != maxEntries {
t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.Entries))
}
s.Append(rd.Entries)
rawNode.Advance(rd)
// Entries are appended, but not applied.
checkUncommitted(maxEntrySize)
rd = rawNode.Ready()
if len(rd.Entries) != 0 {
t.Fatalf("unexpected entries: %s", DescribeEntries(rd.Entries, nil))
}
if len(rd.CommittedEntries) != maxEntries {
t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
}
rawNode.Advance(rd)
checkUncommitted(0)
}
@ -1105,3 +1131,104 @@ func TestRawNodeConsumeReady(t *testing.T) {
t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs)
}
}
func BenchmarkRawNode(b *testing.B) {
cases := []struct {
name string
peers []uint64
}{
{
name: "single-voter",
peers: []uint64{1},
},
{
name: "two-voters",
peers: []uint64{1, 2},
},
// You can easily add more cases here.
}
for _, tc := range cases {
b.Run(tc.name, func(b *testing.B) {
benchmarkRawNodeImpl(b, tc.peers...)
})
}
}
func benchmarkRawNodeImpl(b *testing.B, peers ...uint64) {
const debug = false
s := newTestMemoryStorage(withPeers(peers...))
cfg := newTestConfig(1, 10, 1, s)
if !debug {
cfg.Logger = discardLogger // avoid distorting benchmark output
}
rn, err := NewRawNode(cfg)
if err != nil {
b.Fatal(err)
}
run := make(chan struct{}, 1)
defer close(run)
var numReady uint64
stabilize := func() (applied uint64) {
for rn.HasReady() {
numReady++
rd := rn.Ready()
if debug {
b.Log(DescribeReady(rd, nil))
}
if n := len(rd.CommittedEntries); n > 0 {
applied = rd.CommittedEntries[n-1].Index
}
s.Append(rd.Entries)
for _, m := range rd.Messages {
if m.Type == pb.MsgVote {
resp := pb.Message{To: m.From, From: m.To, Term: m.Term, Type: pb.MsgVoteResp}
if debug {
b.Log(DescribeMessage(resp, nil))
}
rn.Step(resp)
}
if m.Type == pb.MsgApp {
idx := m.Index
if n := len(m.Entries); n > 0 {
idx = m.Entries[n-1].Index
}
resp := pb.Message{To: m.From, From: m.To, Type: pb.MsgAppResp, Term: m.Term, Index: idx}
if debug {
b.Log(DescribeMessage(resp, nil))
}
rn.Step(resp)
}
}
rn.Advance(rd)
}
return applied
}
rn.Campaign()
stabilize()
if debug {
b.N = 1
}
var applied uint64
for i := 0; i < b.N; i++ {
if err := rn.Propose([]byte("foo")); err != nil {
b.Fatal(err)
}
applied = stabilize()
}
if applied < uint64(b.N) {
b.Fatalf("did not apply everything: %d < %d", applied, b.N)
}
b.ReportMetric(float64(s.callStats.firstIndex)/float64(b.N), "firstIndex/op")
b.ReportMetric(float64(s.callStats.lastIndex)/float64(b.N), "lastIndex/op")
b.ReportMetric(float64(s.callStats.term)/float64(b.N), "term/op")
b.ReportMetric(float64(numReady)/float64(b.N), "ready/op")
b.Logf("storage access stats: %+v", s.callStats)
}

View File

@ -71,6 +71,10 @@ type Storage interface {
Snapshot() (pb.Snapshot, error)
}
type inMemStorageCallStats struct {
initialState, firstIndex, lastIndex, entries, term, snapshot int
}
// MemoryStorage implements the Storage interface backed by an
// in-memory array.
type MemoryStorage struct {
@ -83,6 +87,8 @@ type MemoryStorage struct {
snapshot pb.Snapshot
// ents[i] has raft log position i+snapshot.Metadata.Index
ents []pb.Entry
callStats inMemStorageCallStats
}
// NewMemoryStorage creates an empty MemoryStorage.
@ -95,6 +101,7 @@ func NewMemoryStorage() *MemoryStorage {
// InitialState implements the Storage interface.
func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error) {
ms.callStats.initialState++
return ms.hardState, ms.snapshot.Metadata.ConfState, nil
}
@ -110,6 +117,7 @@ func (ms *MemoryStorage) SetHardState(st pb.HardState) error {
func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) {
ms.Lock()
defer ms.Unlock()
ms.callStats.entries++
offset := ms.ents[0].Index
if lo <= offset {
return nil, ErrCompacted
@ -130,6 +138,7 @@ func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) {
func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
ms.Lock()
defer ms.Unlock()
ms.callStats.term++
offset := ms.ents[0].Index
if i < offset {
return 0, ErrCompacted
@ -144,6 +153,7 @@ func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
func (ms *MemoryStorage) LastIndex() (uint64, error) {
ms.Lock()
defer ms.Unlock()
ms.callStats.lastIndex++
return ms.lastIndex(), nil
}
@ -155,6 +165,7 @@ func (ms *MemoryStorage) lastIndex() uint64 {
func (ms *MemoryStorage) FirstIndex() (uint64, error) {
ms.Lock()
defer ms.Unlock()
ms.callStats.firstIndex++
return ms.firstIndex(), nil
}
@ -166,6 +177,7 @@ func (ms *MemoryStorage) firstIndex() uint64 {
func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) {
ms.Lock()
defer ms.Unlock()
ms.callStats.snapshot++
return ms.snapshot, nil
}

View File

@ -35,10 +35,13 @@ stabilize
> 1 handling Ready
Ready MustSync=true:
Lead:1 State:StateLeader
HardState Term:1 Vote:1 Commit:4
HardState Term:1 Vote:1 Commit:2
Entries:
1/3 EntryNormal ""
1/4 EntryConfChange v2
> 1 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:4
CommittedEntries:
1/3 EntryNormal ""
1/4 EntryConfChange v2

View File

@ -31,19 +31,24 @@ INFO 3 switched to configuration voters=()
INFO 3 became follower at term 0
INFO newRaft 3 [peers: [], term: 0, commit: 0, applied: 0, lastindex: 0, lastterm: 0]
# n1 immediately gets to commit & apply the conf change using only itself. We see that
# it starts transitioning out of that joint configuration (though we will only see that
# proposal in the next ready handling loop, when it is emitted). We also see that this
# is using joint consensus, which it has to since we're carrying out two additions at
# once.
# Process n1 once, so that it can append the entry.
process-ready 1
----
Ready MustSync=true:
Lead:1 State:StateLeader
HardState Term:1 Vote:1 Commit:4
HardState Term:1 Vote:1 Commit:2
Entries:
1/3 EntryNormal ""
1/4 EntryConfChangeV2 v2 v3
# Now n1 applies the conf change. We see that it starts transitioning out of that joint
# configuration (though we will only see that proposal in the next ready handling
# loop, when it is emitted). We also see that this is using joint consensus, which
# it has to since we're carrying out two additions at once.
process-ready 1
----
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:4
CommittedEntries:
1/3 EntryNormal ""
1/4 EntryConfChangeV2 v2 v3

View File

@ -38,10 +38,13 @@ stabilize 1 2
> 1 handling Ready
Ready MustSync=true:
Lead:1 State:StateLeader
HardState Term:1 Vote:1 Commit:4
HardState Term:1 Vote:1 Commit:2
Entries:
1/3 EntryNormal ""
1/4 EntryConfChangeV2 v2
> 1 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:4
CommittedEntries:
1/3 EntryNormal ""
1/4 EntryConfChangeV2 v2

View File

@ -36,10 +36,13 @@ stabilize
> 1 handling Ready
Ready MustSync=true:
Lead:1 State:StateLeader
HardState Term:1 Vote:1 Commit:4
HardState Term:1 Vote:1 Commit:2
Entries:
1/3 EntryNormal ""
1/4 EntryConfChangeV2 v2
> 1 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:4
CommittedEntries:
1/3 EntryNormal ""
1/4 EntryConfChangeV2 v2

View File

@ -36,10 +36,13 @@ stabilize 1 2
> 1 handling Ready
Ready MustSync=true:
Lead:1 State:StateLeader
HardState Term:1 Vote:1 Commit:4
HardState Term:1 Vote:1 Commit:2
Entries:
1/3 EntryNormal ""
1/4 EntryConfChangeV2 v2
> 1 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:4
CommittedEntries:
1/3 EntryNormal ""
1/4 EntryConfChangeV2 v2

30
raft/testdata/single_node.txt vendored Normal file
View File

@ -0,0 +1,30 @@
log-level info
----
ok
add-nodes 1 voters=(1) index=3
----
INFO 1 switched to configuration voters=(1)
INFO 1 became follower at term 0
INFO newRaft 1 [peers: [1], term: 0, commit: 3, applied: 3, lastindex: 3, lastterm: 1]
campaign 1
----
INFO 1 is starting a new election at term 0
INFO 1 became candidate at term 1
INFO 1 received MsgVoteResp from 1 at term 1
INFO 1 became leader at term 1
stabilize
----
> 1 handling Ready
Ready MustSync=true:
Lead:1 State:StateLeader
HardState Term:1 Vote:1 Commit:3
Entries:
1/4 EntryNormal ""
> 1 handling Ready
Ready MustSync=false:
HardState Term:1 Vote:1 Commit:4
CommittedEntries:
1/4 EntryNormal ""

View File

@ -41,7 +41,7 @@ ok
status 1
----
1: StateReplicate match=11 next=12 inactive
1: StateReplicate match=11 next=12
2: StateReplicate match=11 next=12
3: StateProbe match=0 next=11 paused inactive
@ -95,7 +95,7 @@ stabilize 1
status 1
----
1: StateReplicate match=11 next=12 inactive
1: StateReplicate match=11 next=12
2: StateReplicate match=11 next=12
3: StateSnapshot match=0 next=11 paused pendingSnap=11
@ -132,7 +132,7 @@ stabilize 1
status 1
----
1: StateReplicate match=11 next=12 inactive
1: StateReplicate match=11 next=12
2: StateReplicate match=11 next=12
3: StateReplicate match=11 next=12

View File

@ -52,8 +52,7 @@ type Progress struct {
// RecentActive is true if the progress is recently active. Receiving any messages
// from the corresponding follower indicates the progress is active.
// RecentActive can be reset to false after an election timeout.
//
// TODO(tbg): the leader should always have this set to true.
// This is always true on the leader.
RecentActive bool
// ProbeSent is used while this follower is in StateProbe. When ProbeSent is