diff --git a/raft/node_test.go b/raft/node_test.go index bf38e486d..adb50e0a2 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -39,7 +39,7 @@ func TestTickMsgBeat(t *testing.T) { n.Add(i, "") for _, m := range n.Msgs() { if m.Type == msgApp { - n.Step(Message{From: m.To, Type: msgAppResp, Index: i + 1}) + n.Step(Message{From: m.To, Type: msgAppResp, Index: m.Index + len(m.Entries)}) } } // ignore commit index update messages @@ -129,7 +129,7 @@ func TestRemove(t *testing.T) { n.Add(1, "") n.Next() n.Remove(0) - n.Step(Message{Type: msgAppResp, From: 1, Term: 1, Index: 3}) + n.Step(Message{Type: msgAppResp, From: 1, Term: 1, Index: 4}) n.Next() if len(n.sm.ins) != 1 { diff --git a/raft/raft.go b/raft/raft.go index 91a7c101f..603f9e408 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -198,6 +198,13 @@ func (sm *stateMachine) q() int { return len(sm.ins)/2 + 1 } +func (sm *stateMachine) appendEntry(e Entry) { + e.Term = sm.term + sm.log.append(sm.log.lastIndex(), e) + sm.ins[sm.id].update(sm.log.lastIndex()) + sm.maybeCommit() +} + // promotable indicates whether state machine could be promoted. // New machine has to wait for the first log entry to be committed, or it will // always start as a one-node cluster. @@ -236,6 +243,8 @@ func (sm *stateMachine) becomeLeader() { sm.pendingConf = true } } + + sm.appendEntry(Entry{Type: Normal, Data: nil}) } func (sm *stateMachine) Msgs() []Message { @@ -310,11 +319,7 @@ func stepLeader(sm *stateMachine, m Message) bool { } sm.pendingConf = true } - e.Term = sm.term - - sm.log.append(sm.log.lastIndex(), e) - sm.ins[sm.id].update(sm.log.lastIndex()) - sm.maybeCommit() + sm.appendEntry(e) sm.bcastAppend() case msgAppResp: if m.Index < 0 { diff --git a/raft/raft_test.go b/raft/raft_test.go index abfd87a52..5745e031b 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -47,7 +47,7 @@ func TestLogReplication(t *testing.T) { []Message{ {To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, }, - 1, + 2, }, { newNetwork(nil, nil, nil), @@ -57,7 +57,7 @@ func TestLogReplication(t *testing.T) { {To: 1, Type: msgHup}, {To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}, }, - 2, + 4, }, } @@ -75,7 +75,12 @@ func TestLogReplication(t *testing.T) { t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.log.committed, tt.wcommitted) } - ents := sm.nextEnts() + ents := make([]Entry, 0) + for _, e := range sm.nextEnts() { + if e.Data != nil { + ents = append(ents, e) + } + } props := make([]Message, 0) for _, m := range tt.msgs { if m.Type == msgProp { @@ -98,11 +103,14 @@ func TestSingleNodeCommit(t *testing.T) { tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) sm := tt.peers[0].(*stateMachine) - if sm.log.committed != 2 { - t.Errorf("committed = %d, want %d", sm.log.committed, 2) + if sm.log.committed != 3 { + t.Errorf("committed = %d, want %d", sm.log.committed, 3) } } +// TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed +// when leader changes, no new proposal comes in and ChangeTerm proposal is +// filtered. func TestCannotCommitWithoutNewTermEntry(t *testing.T) { tt := newNetwork(nil, nil, nil, nil, nil) tt.send(Message{To: 0, Type: msgHup}) @@ -116,52 +124,99 @@ func TestCannotCommitWithoutNewTermEntry(t *testing.T) { tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) sm := tt.peers[0].(*stateMachine) - if sm.log.committed != 0 { - t.Errorf("committed = %d, want %d", sm.log.committed, 0) + if sm.log.committed != 1 { + t.Errorf("committed = %d, want %d", sm.log.committed, 1) + } + + // network recovery + tt.recover() + // avoid committing ChangeTerm proposal + tt.ignore(msgApp) + + // elect 1 as the new leader with term 2 + tt.send(Message{To: 1, Type: msgHup}) + + // no log entries from previous term should be committed + sm = tt.peers[1].(*stateMachine) + if sm.log.committed != 1 { + t.Errorf("committed = %d, want %d", sm.log.committed, 1) + } + + tt.recover() + + // send out a heartbeat + // after append a ChangeTerm entry from the current term, all entries + // should be committed + tt.send(Message{To: 1, Type: msgBeat}) + + if sm.log.committed != 4 { + t.Errorf("committed = %d, want %d", sm.log.committed, 4) + } + + // still be able to append a entry + tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) + + if sm.log.committed != 5 { + t.Errorf("committed = %d, want %d", sm.log.committed, 5) + } +} + +// TestCommitWithoutNewTermEntry tests the entries could be committed +// when leader changes, no new proposal comes in. +func TestCommitWithoutNewTermEntry(t *testing.T) { + tt := newNetwork(nil, nil, nil, nil, nil) + tt.send(Message{To: 0, Type: msgHup}) + + // 0 cannot reach 2,3,4 + tt.cut(0, 2) + tt.cut(0, 3) + tt.cut(0, 4) + + tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) + tt.send(Message{To: 0, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) + + sm := tt.peers[0].(*stateMachine) + if sm.log.committed != 1 { + t.Errorf("committed = %d, want %d", sm.log.committed, 1) } // network recovery tt.recover() // elect 1 as the new leader with term 2 - tt.send(Message{To: 1, Type: msgHup}) - // send out a heartbeat - tt.send(Message{To: 1, Type: msgBeat}) - - // no log entries from previous term should be committed - sm = tt.peers[1].(*stateMachine) - if sm.log.committed != 0 { - t.Errorf("committed = %d, want %d", sm.log.committed, 0) - } - - // after append a entry from the current term, all entries + // after append a ChangeTerm entry from the current term, all entries // should be committed - tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("some data")}}}) - if sm.log.committed != 3 { - t.Errorf("committed = %d, want %d", sm.log.committed, 3) + tt.send(Message{To: 1, Type: msgHup}) + + if sm.log.committed != 4 { + t.Errorf("committed = %d, want %d", sm.log.committed, 4) } } func TestDuelingCandidates(t *testing.T) { a := newStateMachine(0, nil) // k, id are set later + b := newStateMachine(0, nil) c := newStateMachine(0, nil) - tt := newNetwork(a, nil, c) - tt.cut(0, 2) + nt := newNetwork(a, b, c) + nt.cut(0, 2) - tt.send(Message{To: 0, Type: msgHup}) - tt.send(Message{To: 2, Type: msgHup}) + nt.send(Message{To: 0, Type: msgHup}) + nt.send(Message{To: 2, Type: msgHup}) - tt.recover() - tt.send(Message{To: 2, Type: msgHup}) + nt.recover() + nt.send(Message{To: 2, Type: msgHup}) + wlog := &log{ents: []Entry{{}, Entry{Type: Normal, Data: nil, Term: 1}}, committed: 1} tests := []struct { sm *stateMachine state stateType term int + log *log }{ - {a, stateFollower, 2}, - {c, stateLeader, 2}, + {a, stateFollower, 2, wlog}, + {b, stateFollower, 2, wlog}, + {c, stateFollower, 2, newLog()}, } for i, tt := range tests { @@ -171,11 +226,8 @@ func TestDuelingCandidates(t *testing.T) { if g := tt.sm.term; g != tt.term { t.Errorf("#%d: term = %d, want %d", i, g, tt.term) } - } - - base := ltoa(newLog()) - for i, p := range tt.peers { - if sm, ok := p.(*stateMachine); ok { + base := ltoa(tt.log) + if sm, ok := nt.peers[i].(*stateMachine); ok { l := ltoa(sm.log) if g := diffu(base, l); g != "" { t.Errorf("#%d: diff:\n%s", i, g) @@ -207,7 +259,7 @@ func TestCandidateConcede(t *testing.T) { if g := a.term; g != 1 { t.Errorf("term = %d, want %d", g, 1) } - wantLog := ltoa(&log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1}) + wantLog := ltoa(&log{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2}) for i, p := range tt.peers { if sm, ok := p.(*stateMachine); ok { l := ltoa(sm.log) @@ -239,7 +291,14 @@ func TestOldMessages(t *testing.T) { // pretend we're an old leader trying to make progress tt.send(Message{To: 0, Type: msgApp, Term: 1, Entries: []Entry{{Term: 1}}}) - base := ltoa(newLog()) + l := &log{ + ents: []Entry{ + {}, {Type: Normal, Data: nil, Term: 1}, + {Type: Normal, Data: nil, Term: 2}, {Type: Normal, Data: nil, Term: 3}, + }, + committed: 3, + } + base := ltoa(l) for i, p := range tt.peers { if sm, ok := p.(*stateMachine); ok { l := ltoa(sm.log) @@ -289,7 +348,7 @@ func TestProposal(t *testing.T) { wantLog := newLog() if tt.success { - wantLog = &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1} + wantLog = &log{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2} } base := ltoa(wantLog) for i, p := range tt.peers { @@ -323,7 +382,7 @@ func TestProposalByProxy(t *testing.T) { // propose via follower tt.send(Message{To: 1, Type: msgProp, Entries: []Entry{{Data: []byte("somedata")}}}) - wantLog := &log{ents: []Entry{{}, {Term: 1, Data: data}}, committed: 1} + wantLog := &log{ents: []Entry{{}, {Type: Normal, Data: nil, Term: 1}, {Term: 1, Data: data}}, committed: 2} base := ltoa(wantLog) for i, p := range tt.peers { if sm, ok := p.(*stateMachine); ok { @@ -496,19 +555,19 @@ func TestConf(t *testing.T) { sm.becomeLeader() sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: AddNode}}}) - if sm.log.lastIndex() != 1 { + if sm.log.lastIndex() != 2 { t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1) } if !sm.pendingConf { t.Errorf("pendingConf = %v, want %v", sm.pendingConf, true) } - if sm.log.ents[1].Type != AddNode { + if sm.log.ents[2].Type != AddNode { t.Errorf("type = %d, want %d", sm.log.ents[1].Type, AddNode) } // deny the second configuration change request if there is a pending one sm.Step(Message{Type: msgProp, Entries: []Entry{{Type: AddNode}}}) - if sm.log.lastIndex() != 1 { + if sm.log.lastIndex() != 2 { t.Errorf("lastindex = %d, want %d", sm.log.lastIndex(), 1) } } @@ -539,20 +598,24 @@ func TestConfChangeLeader(t *testing.T) { } func TestAllServerStepdown(t *testing.T) { - tests := []stateType{stateFollower, stateCandidate, stateLeader} - - want := struct { + tests := []struct { state stateType - term int - index int - }{stateFollower, 3, 1} + + wstate stateType + wterm int + windex int + }{ + {stateFollower, stateFollower, 3, 1}, + {stateCandidate, stateFollower, 3, 1}, + {stateLeader, stateFollower, 3, 2}, + } tmsgTypes := [...]messageType{msgVote, msgApp} tterm := 3 for i, tt := range tests { sm := newStateMachine(0, []int{0, 1, 2}) - switch tt { + switch tt.state { case stateFollower: sm.becomeFollower(1, 0) case stateCandidate: @@ -565,14 +628,14 @@ func TestAllServerStepdown(t *testing.T) { for j, msgType := range tmsgTypes { sm.Step(Message{Type: msgType, Term: tterm, LogTerm: tterm}) - if sm.state != want.state { - t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, want.state) + if sm.state != tt.wstate { + t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate) } - if sm.term != want.term { - t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term, want.term) + if sm.term != tt.wterm { + t.Errorf("#%d.%d term = %v , want %v", i, j, sm.term, tt.wterm) } - if len(sm.log.ents) != want.index { - t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.log.ents), want.index) + if len(sm.log.ents) != tt.windex { + t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.log.ents), tt.windex) } } } @@ -596,6 +659,7 @@ func TestLeaderAppResp(t *testing.T) { sm.log = &log{ents: []Entry{{}, {Term: 0}, {Term: 1}}} sm.becomeCandidate() sm.becomeLeader() + sm.Msgs() sm.Step(Message{From: 1, Type: msgAppResp, Index: tt.index, Term: sm.term}) msgs := sm.Msgs() @@ -656,8 +720,9 @@ func ents(terms ...int) *stateMachine { } type network struct { - peers map[int]Interface - dropm map[connem]float64 + peers map[int]Interface + dropm map[connem]float64 + ignorem map[messageType]bool } // newNetwork initializes a network from peers. @@ -692,7 +757,11 @@ func newNetwork(peers ...Interface) *network { npeers[id] = v } } - return &network{peers: npeers, dropm: make(map[connem]float64)} + return &network{ + peers: npeers, + dropm: make(map[connem]float64), + ignorem: make(map[messageType]bool), + } } func (nw *network) send(msgs ...Message) { @@ -722,13 +791,21 @@ func (nw *network) isolate(id int) { } } +func (nw *network) ignore(t messageType) { + nw.ignorem[t] = true +} + func (nw *network) recover() { nw.dropm = make(map[connem]float64) + nw.ignorem = make(map[messageType]bool) } func (nw *network) filter(msgs []Message) []Message { mm := make([]Message, 0) for _, m := range msgs { + if nw.ignorem[m.Type] { + continue + } switch m.Type { case msgHup: // hups never go over the network, so don't drop them but panic