From 7f0733cf46232319ae68e5cc79d650e95a612afb Mon Sep 17 00:00:00 2001 From: Xiang Date: Tue, 14 Mar 2017 11:25:02 -0700 Subject: [PATCH] etcdserver: candidate should wait for applying all configuration changes --- etcdserver/raft.go | 12 ++++++++++ etcdserver/raft_test.go | 50 +++++++++++++++++++++++++++++++++++++++++ etcdserver/server.go | 9 ++++++-- 3 files changed, 69 insertions(+), 2 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 643caa455..eec154e23 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -140,6 +140,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { go func() { defer r.onStop() islead := false + isCandidate := false for { select { @@ -162,6 +163,7 @@ func (r *raftNode) start(rh *raftReadyHandler) { atomic.StoreUint64(&r.lead, rd.SoftState.Lead) islead = rd.RaftState == raft.StateLeader + isCandidate = rd.RaftState == raft.StateCandidate rh.updateLeadership() } @@ -225,7 +227,17 @@ func (r *raftNode) start(rh *raftReadyHandler) { r.sendMessages(rd.Messages) } raftDone <- struct{}{} + r.Advance() + + if isCandidate { + // candidate needs to wait for all pending configuration changes to be applied + // before continue. Or we might incorrectly count the number of votes (e.g. receive vote from + // a removed member). + // We simply wait for ALL pending entries to be applied for now. + // We might improve this later on if it causes unnecessary long blocking issues. + rh.waitForApply() + } case <-r.stopped: return } diff --git a/etcdserver/raft_test.go b/etcdserver/raft_test.go index eb51c1434..c95823705 100644 --- a/etcdserver/raft_test.go +++ b/etcdserver/raft_test.go @@ -175,3 +175,53 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) { t.Fatalf("failed to stop raft loop") } } + +func TestCandidateBlocksByApply(t *testing.T) { + n := newNopReadyNode() + + waitApplyc := make(chan struct{}) + + srv := &EtcdServer{r: raftNode{ + Node: n, + storage: mockstorage.NewStorageRecorder(""), + raftStorage: raft.NewMemoryStorage(), + transport: rafthttp.NewNopTransporter(), + ticker: &time.Ticker{}, + }} + + rh := &raftReadyHandler{ + updateLeadership: func() {}, + waitForApply: func() { + <-waitApplyc + }, + } + + srv.r.start(rh) + defer srv.r.Stop() + + // become candidate + n.readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateCandidate}} + <-srv.r.applyc + + continueC := make(chan struct{}) + go func() { + n.readyc <- raft.Ready{} + <-srv.r.applyc + close(continueC) + }() + + select { + case <-continueC: + t.Fatalf("unexpected execution: raft routine should block waiting for apply") + case <-time.After(time.Second): + } + + // finish apply, unblock raft routine + close(waitApplyc) + + select { + case <-continueC: + case <-time.After(time.Second): + t.Fatalf("unexpected blocking on execution") + } +} diff --git a/etcdserver/server.go b/etcdserver/server.go index dc6629852..187d82e13 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -608,6 +608,7 @@ type etcdProgress struct { type raftReadyHandler struct { updateLeadership func() updateCommittedIndex func(uint64) + waitForApply func() } func (s *EtcdServer) run() { @@ -616,6 +617,9 @@ func (s *EtcdServer) run() { plog.Panicf("get snapshot from raft storage error: %v", err) } + // asynchronously accept apply packets, dispatch progress in-order + sched := schedule.NewFIFOScheduler() + var ( smu sync.RWMutex syncC <-chan time.Time @@ -663,11 +667,12 @@ func (s *EtcdServer) run() { s.setCommittedIndex(ci) } }, + waitForApply: func() { + sched.WaitFinish(0) + }, } s.r.start(rh) - // asynchronously accept apply packets, dispatch progress in-order - sched := schedule.NewFIFOScheduler() ep := etcdProgress{ confState: sn.Metadata.ConfState, snapi: sn.Metadata.Index,