Merge pull request #9073 from bdarnell/pending-conf-index

raft: Avoid scanning raft log in becomeLeader
This commit is contained in:
Xiang Li 2018-01-08 16:37:36 -08:00 committed by GitHub
commit ed1ff9e952
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 46 additions and 73 deletions

View File

@ -324,7 +324,6 @@ func (n *node) run(r *raft) {
} }
case cc := <-n.confc: case cc := <-n.confc:
if cc.NodeID == None { if cc.NodeID == None {
r.resetPendingConf()
select { select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes()}: case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
case <-n.done: case <-n.done:
@ -344,7 +343,6 @@ func (n *node) run(r *raft) {
} }
r.removeNode(cc.NodeID) r.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode: case pb.ConfChangeUpdateNode:
r.resetPendingConf()
default: default:
panic("unexpected conf type") panic("unexpected conf type")
} }

View File

@ -348,6 +348,7 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
n.Tick() n.Tick()
case rd := <-n.Ready(): case rd := <-n.Ready():
s.Append(rd.Entries) s.Append(rd.Entries)
applied := false
for _, e := range rd.Entries { for _, e := range rd.Entries {
rdyEntries = append(rdyEntries, e) rdyEntries = append(rdyEntries, e)
switch e.Type { switch e.Type {
@ -356,10 +357,13 @@ func TestNodeProposeAddDuplicateNode(t *testing.T) {
var cc raftpb.ConfChange var cc raftpb.ConfChange
cc.Unmarshal(e.Data) cc.Unmarshal(e.Data)
n.ApplyConfChange(cc) n.ApplyConfChange(cc)
applyConfChan <- struct{}{} applied = true
} }
} }
n.Advance() n.Advance()
if applied {
applyConfChan <- struct{}{}
}
} }
} }
}() }()

View File

@ -256,8 +256,13 @@ type raft struct {
// leadTransferee is id of the leader transfer target when its value is not zero. // leadTransferee is id of the leader transfer target when its value is not zero.
// Follow the procedure defined in raft thesis 3.10. // Follow the procedure defined in raft thesis 3.10.
leadTransferee uint64 leadTransferee uint64
// New configuration is ignored if there exists unapplied configuration. // Only one conf change may be pending (in the log, but not yet
pendingConf bool // applied) at a time. This is enforced via pendingConfIndex, which
// is set to a value >= the log index of the latest pending
// configuration change (if any). Config changes are only allowed to
// be proposed if the leader's applied index is greater than this
// value.
pendingConfIndex uint64
readOnly *readOnly readOnly *readOnly
@ -579,7 +584,7 @@ func (r *raft) reset(term uint64) {
} }
}) })
r.pendingConf = false r.pendingConfIndex = 0
r.readOnly = newReadOnly(r.readOnly.option) r.readOnly = newReadOnly(r.readOnly.option)
} }
@ -683,12 +688,13 @@ func (r *raft) becomeLeader() {
r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err) r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
} }
nconf := numOfPendingConf(ents) // Conservatively set the pendingConfIndex to the last index in the
if nconf > 1 { // log. There may or may not be a pending config change, but it's
panic("unexpected multiple uncommitted config entry") // safe to delay any future proposals until we commit all our
} // pending log entries, and scanning the entire tail of the log
if nconf == 1 { // could be expensive.
r.pendingConf = true if len(ents) > 0 {
r.pendingConfIndex = ents[len(ents)-1].Index
} }
r.appendEntry(pb.Entry{Data: nil}) r.appendEntry(pb.Entry{Data: nil})
@ -902,11 +908,13 @@ func stepLeader(r *raft, m pb.Message) {
for i, e := range m.Entries { for i, e := range m.Entries {
if e.Type == pb.EntryConfChange { if e.Type == pb.EntryConfChange {
if r.pendingConf { if r.pendingConfIndex > r.raftLog.applied {
r.logger.Infof("propose conf %s ignored since pending unapplied configuration", e.String()) r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
e.String(), r.pendingConfIndex, r.raftLog.applied)
m.Entries[i] = pb.Entry{Type: pb.EntryNormal} m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
} else {
r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
} }
r.pendingConf = true
} }
} }
r.appendEntry(m.Entries...) r.appendEntry(m.Entries...)
@ -1271,7 +1279,6 @@ func (r *raft) addLearner(id uint64) {
} }
func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) { func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
r.pendingConf = false
pr := r.getProgress(id) pr := r.getProgress(id)
if pr == nil { if pr == nil {
r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner) r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
@ -1307,7 +1314,6 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
func (r *raft) removeNode(id uint64) { func (r *raft) removeNode(id uint64) {
r.delProgress(id) r.delProgress(id)
r.pendingConf = false
// do not try to commit or abort transferring if there is no nodes in the cluster. // do not try to commit or abort transferring if there is no nodes in the cluster.
if len(r.prs) == 0 && len(r.learnerPrs) == 0 { if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
@ -1325,8 +1331,6 @@ func (r *raft) removeNode(id uint64) {
} }
} }
func (r *raft) resetPendingConf() { r.pendingConf = false }
func (r *raft) setProgress(id, match, next uint64, isLearner bool) { func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
if !isLearner { if !isLearner {
delete(r.learnerPrs, id) delete(r.learnerPrs, id)

View File

@ -2736,8 +2736,8 @@ func TestStepConfig(t *testing.T) {
if g := r.raftLog.lastIndex(); g != index+1 { if g := r.raftLog.lastIndex(); g != index+1 {
t.Errorf("index = %d, want %d", g, index+1) t.Errorf("index = %d, want %d", g, index+1)
} }
if !r.pendingConf { if r.pendingConfIndex != index+1 {
t.Errorf("pendingConf = %v, want true", r.pendingConf) t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, index+1)
} }
} }
@ -2751,7 +2751,7 @@ func TestStepIgnoreConfig(t *testing.T) {
r.becomeLeader() r.becomeLeader()
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
index := r.raftLog.lastIndex() index := r.raftLog.lastIndex()
pendingConf := r.pendingConf pendingConfIndex := r.pendingConfIndex
r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}} wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
ents, err := r.raftLog.entries(index+1, noLimit) ents, err := r.raftLog.entries(index+1, noLimit)
@ -2761,57 +2761,39 @@ func TestStepIgnoreConfig(t *testing.T) {
if !reflect.DeepEqual(ents, wents) { if !reflect.DeepEqual(ents, wents) {
t.Errorf("ents = %+v, want %+v", ents, wents) t.Errorf("ents = %+v, want %+v", ents, wents)
} }
if r.pendingConf != pendingConf { if r.pendingConfIndex != pendingConfIndex {
t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf) t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, pendingConfIndex)
} }
} }
// TestRecoverPendingConfig tests that new leader recovers its pendingConf flag // TestNewLeaderPendingConfig tests that new leader sets its pendingConfigIndex
// based on uncommitted entries. // based on uncommitted entries.
func TestRecoverPendingConfig(t *testing.T) { func TestNewLeaderPendingConfig(t *testing.T) {
tests := []struct { tests := []struct {
entType pb.EntryType addEntry bool
wpending bool wpendingIndex uint64
}{ }{
{pb.EntryNormal, false}, {false, 0},
{pb.EntryConfChange, true}, {true, 1},
} }
for i, tt := range tests { for i, tt := range tests {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.appendEntry(pb.Entry{Type: tt.entType}) if tt.addEntry {
r.appendEntry(pb.Entry{Type: pb.EntryNormal})
}
r.becomeCandidate() r.becomeCandidate()
r.becomeLeader() r.becomeLeader()
if r.pendingConf != tt.wpending { if r.pendingConfIndex != tt.wpendingIndex {
t.Errorf("#%d: pendingConf = %v, want %v", i, r.pendingConf, tt.wpending) t.Errorf("#%d: pendingConfIndex = %d, want %d",
i, r.pendingConfIndex, tt.wpendingIndex)
} }
} }
} }
// TestRecoverDoublePendingConfig tests that new leader will panic if // TestAddNode tests that addNode could update nodes correctly.
// there exist two uncommitted config entries.
func TestRecoverDoublePendingConfig(t *testing.T) {
func() {
defer func() {
if err := recover(); err == nil {
t.Errorf("expect panic, but nothing happens")
}
}()
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.appendEntry(pb.Entry{Type: pb.EntryConfChange})
r.becomeCandidate()
r.becomeLeader()
}()
}
// TestAddNode tests that addNode could update pendingConf and nodes correctly.
func TestAddNode(t *testing.T) { func TestAddNode(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.addNode(2) r.addNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes() nodes := r.nodes()
wnodes := []uint64{1, 2} wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) { if !reflect.DeepEqual(nodes, wnodes) {
@ -2819,14 +2801,10 @@ func TestAddNode(t *testing.T) {
} }
} }
// TestAddLearner tests that addLearner could update pendingConf and nodes correctly. // TestAddLearner tests that addLearner could update nodes correctly.
func TestAddLearner(t *testing.T) { func TestAddLearner(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.addLearner(2) r.addLearner(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
nodes := r.nodes() nodes := r.nodes()
wnodes := []uint64{1, 2} wnodes := []uint64{1, 2}
if !reflect.DeepEqual(nodes, wnodes) { if !reflect.DeepEqual(nodes, wnodes) {
@ -2841,7 +2819,6 @@ func TestAddLearner(t *testing.T) {
// immediately when checkQuorum is set. // immediately when checkQuorum is set.
func TestAddNodeCheckQuorum(t *testing.T) { func TestAddNodeCheckQuorum(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage()) r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.checkQuorum = true r.checkQuorum = true
r.becomeCandidate() r.becomeCandidate()
@ -2872,15 +2849,11 @@ func TestAddNodeCheckQuorum(t *testing.T) {
} }
} }
// TestRemoveNode tests that removeNode could update pendingConf, nodes and // TestRemoveNode tests that removeNode could update nodes and
// and removed list correctly. // and removed list correctly.
func TestRemoveNode(t *testing.T) { func TestRemoveNode(t *testing.T) {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.removeNode(2) r.removeNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
w := []uint64{1} w := []uint64{1}
if g := r.nodes(); !reflect.DeepEqual(g, w) { if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w) t.Errorf("nodes = %v, want %v", g, w)
@ -2894,15 +2867,11 @@ func TestRemoveNode(t *testing.T) {
} }
} }
// TestRemoveLearner tests that removeNode could update pendingConf, nodes and // TestRemoveLearner tests that removeNode could update nodes and
// and removed list correctly. // and removed list correctly.
func TestRemoveLearner(t *testing.T) { func TestRemoveLearner(t *testing.T) {
r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage()) r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.removeNode(2) r.removeNode(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
w := []uint64{1} w := []uint64{1}
if g := r.nodes(); !reflect.DeepEqual(g, w) { if g := r.nodes(); !reflect.DeepEqual(g, w) {
t.Errorf("nodes = %v, want %v", g, w) t.Errorf("nodes = %v, want %v", g, w)

View File

@ -169,7 +169,6 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
// ApplyConfChange applies a config change to the local node. // ApplyConfChange applies a config change to the local node.
func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState { func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
if cc.NodeID == None { if cc.NodeID == None {
rn.raft.resetPendingConf()
return &pb.ConfState{Nodes: rn.raft.nodes()} return &pb.ConfState{Nodes: rn.raft.nodes()}
} }
switch cc.Type { switch cc.Type {
@ -180,7 +179,6 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
case pb.ConfChangeRemoveNode: case pb.ConfChangeRemoveNode:
rn.raft.removeNode(cc.NodeID) rn.raft.removeNode(cc.NodeID)
case pb.ConfChangeUpdateNode: case pb.ConfChangeUpdateNode:
rn.raft.resetPendingConf()
default: default:
panic("unexpected conf type") panic("unexpected conf type")
} }