diff --git a/raft/node.go b/raft/node.go index 56b3bee19..4c2a8968f 100644 --- a/raft/node.go +++ b/raft/node.go @@ -136,21 +136,7 @@ type Node interface { // However, as an optimization, the application may call Advance while it is applying the // commands. For example. when the last Ready contains a snapshot, the application might take // a long time to apply the snapshot data. To continue receiving Ready without blocking raft - // progress, it can call Advance before finishing applying the last ready. To make this optimization - // work safely, when the application receives a Ready with softState.RaftState equal to Candidate - // it MUST apply all pending configuration changes if there is any. - // - // Here is a simple solution that waiting for ALL pending entries to get applied. - // ``` - // ... - // rd := <-n.Ready() - // go apply(rd.CommittedEntries) // optimization to apply asynchronously in FIFO order. - // if rd.SoftState.RaftState == StateCandidate { - // waitAllApplied() - // } - // n.Advance() - // ... - //``` + // progress, it can call Advance before finishing applying the last ready. Advance() // ApplyConfChange applies config change to the local node. // Returns an opaque ConfState protobuf which must be recorded diff --git a/raft/node_test.go b/raft/node_test.go index 90bb5ed32..3615e1dc7 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -357,15 +357,12 @@ func TestNodeStart(t *testing.T) { } wants := []Ready{ { - SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, - HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1}, + HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0}, Entries: []raftpb.Entry{ {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, - {Term: 2, Index: 2}, }, CommittedEntries: []raftpb.Entry{ {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, - {Term: 2, Index: 2}, }, }, { @@ -385,7 +382,6 @@ func TestNodeStart(t *testing.T) { } n := StartNode(c, []Peer{{ID: 1}}) defer n.Stop() - n.Campaign(ctx) g := <-n.Ready() if !reflect.DeepEqual(g, wants[0]) { t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) @@ -394,6 +390,11 @@ func TestNodeStart(t *testing.T) { n.Advance() } + n.Campaign(ctx) + rd := <-n.Ready() + storage.Append(rd.Entries) + n.Advance() + n.Propose(ctx, []byte("foo")) if g2 := <-n.Ready(); !reflect.DeepEqual(g2, wants[1]) { t.Errorf("#%d: g = %+v,\n w %+v", 2, g2, wants[1]) @@ -508,10 +509,14 @@ func TestNodeAdvance(t *testing.T) { } n := StartNode(c, []Peer{{ID: 1}}) defer n.Stop() + rd := <-n.Ready() + storage.Append(rd.Entries) + n.Advance() + n.Campaign(ctx) <-n.Ready() + n.Propose(ctx, []byte("foo")) - var rd Ready select { case rd = <-n.Ready(): t.Fatalf("unexpected Ready before Advance: %+v", rd) diff --git a/raft/raft.go b/raft/raft.go index c4ba9e77e..cb4feac4f 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -521,15 +521,14 @@ func (r *raft) becomeLeader() { r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err) } - for _, e := range ents { - if e.Type != pb.EntryConfChange { - continue - } - if r.pendingConf { - panic("unexpected double uncommitted config entry") - } + nconf := numOfPendingConf(ents) + if nconf > 1 { + panic("unexpected multiple uncommitted config entry") + } + if nconf == 1 { r.pendingConf = true } + r.appendEntry(pb.Entry{Data: nil}) r.logger.Infof("%x became leader at term %d", r.id, r.Term) } @@ -575,6 +574,15 @@ func (r *raft) poll(id uint64, v bool) (granted int) { func (r *raft) Step(m pb.Message) error { if m.Type == pb.MsgHup { if r.state != StateLeader { + ents, err := r.raftLog.entries(r.raftLog.applied+1, r.raftLog.committed-r.raftLog.applied) + if err != nil { + r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err) + } + if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied { + r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n) + return nil + } + r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) r.campaign(campaignElection) } else { @@ -1047,3 +1055,13 @@ func (r *raft) sendTimeoutNow(to uint64) { func (r *raft) abortLeaderTransfer() { r.leadTransferee = None } + +func numOfPendingConf(ents []pb.Entry) int { + n := 0 + for i := range ents { + if ents[i].Type == pb.EntryConfChange { + n++ + } + } + return n +} diff --git a/raft/rawnode.go b/raft/rawnode.go index 4cea62f64..54c50001a 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -103,9 +103,14 @@ func NewRawNode(config *Config, peers []Peer) (*RawNode, error) { r.addNode(peer.ID) } } + // Set the initial hard and soft states after performing all initialization. rn.prevSoftSt = r.softState() - rn.prevHardSt = r.hardState() + if lastIndex == 0 { + rn.prevHardSt = emptyState + } else { + rn.prevHardSt = r.hardState() + } return rn, nil } diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 112e3f5df..53c08dba5 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -53,12 +53,18 @@ func TestRawNodeProposeAndConfChange(t *testing.T) { if err != nil { t.Fatal(err) } + rd := rawNode.Ready() + s.Append(rd.Entries) + rawNode.Advance(rd) + rawNode.Campaign() proposed := false - var lastIndex uint64 - var ccdata []byte + var ( + lastIndex uint64 + ccdata []byte + ) for { - rd := rawNode.Ready() + rd = rawNode.Ready() s.Append(rd.Entries) // Once we are the leader, propose a command and a ConfChange. if !proposed && rd.SoftState.Lead == rawNode.raft.id { @@ -124,15 +130,12 @@ func TestRawNodeStart(t *testing.T) { } wants := []Ready{ { - SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, - HardState: raftpb.HardState{Term: 2, Commit: 2, Vote: 1}, + HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0}, Entries: []raftpb.Entry{ {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, - {Term: 2, Index: 2}, }, CommittedEntries: []raftpb.Entry{ {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, - {Term: 2, Index: 2}, }, }, { @@ -147,7 +150,6 @@ func TestRawNodeStart(t *testing.T) { if err != nil { t.Fatal(err) } - rawNode.Campaign() rd := rawNode.Ready() t.Logf("rd %v", rd) if !reflect.DeepEqual(rd, wants[0]) { @@ -156,6 +158,13 @@ func TestRawNodeStart(t *testing.T) { storage.Append(rd.Entries) rawNode.Advance(rd) } + storage.Append(rd.Entries) + rawNode.Advance(rd) + + rawNode.Campaign() + rd = rawNode.Ready() + storage.Append(rd.Entries) + rawNode.Advance(rd) rawNode.Propose([]byte("foo")) if rd = rawNode.Ready(); !reflect.DeepEqual(rd, wants[1]) {