mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: candidate should wait for applying all configuration changes
This commit is contained in:
parent
eed4a3f035
commit
7f0733cf46
@ -140,6 +140,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
go func() {
|
||||
defer r.onStop()
|
||||
islead := false
|
||||
isCandidate := false
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -162,6 +163,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
|
||||
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
|
||||
islead = rd.RaftState == raft.StateLeader
|
||||
isCandidate = rd.RaftState == raft.StateCandidate
|
||||
rh.updateLeadership()
|
||||
}
|
||||
|
||||
@ -225,7 +227,17 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
||||
r.sendMessages(rd.Messages)
|
||||
}
|
||||
raftDone <- struct{}{}
|
||||
|
||||
r.Advance()
|
||||
|
||||
if isCandidate {
|
||||
// candidate needs to wait for all pending configuration changes to be applied
|
||||
// before continue. Or we might incorrectly count the number of votes (e.g. receive vote from
|
||||
// a removed member).
|
||||
// We simply wait for ALL pending entries to be applied for now.
|
||||
// We might improve this later on if it causes unnecessary long blocking issues.
|
||||
rh.waitForApply()
|
||||
}
|
||||
case <-r.stopped:
|
||||
return
|
||||
}
|
||||
|
@ -175,3 +175,53 @@ func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
|
||||
t.Fatalf("failed to stop raft loop")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCandidateBlocksByApply(t *testing.T) {
|
||||
n := newNopReadyNode()
|
||||
|
||||
waitApplyc := make(chan struct{})
|
||||
|
||||
srv := &EtcdServer{r: raftNode{
|
||||
Node: n,
|
||||
storage: mockstorage.NewStorageRecorder(""),
|
||||
raftStorage: raft.NewMemoryStorage(),
|
||||
transport: rafthttp.NewNopTransporter(),
|
||||
ticker: &time.Ticker{},
|
||||
}}
|
||||
|
||||
rh := &raftReadyHandler{
|
||||
updateLeadership: func() {},
|
||||
waitForApply: func() {
|
||||
<-waitApplyc
|
||||
},
|
||||
}
|
||||
|
||||
srv.r.start(rh)
|
||||
defer srv.r.Stop()
|
||||
|
||||
// become candidate
|
||||
n.readyc <- raft.Ready{SoftState: &raft.SoftState{RaftState: raft.StateCandidate}}
|
||||
<-srv.r.applyc
|
||||
|
||||
continueC := make(chan struct{})
|
||||
go func() {
|
||||
n.readyc <- raft.Ready{}
|
||||
<-srv.r.applyc
|
||||
close(continueC)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-continueC:
|
||||
t.Fatalf("unexpected execution: raft routine should block waiting for apply")
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
|
||||
// finish apply, unblock raft routine
|
||||
close(waitApplyc)
|
||||
|
||||
select {
|
||||
case <-continueC:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatalf("unexpected blocking on execution")
|
||||
}
|
||||
}
|
||||
|
@ -608,6 +608,7 @@ type etcdProgress struct {
|
||||
type raftReadyHandler struct {
|
||||
updateLeadership func()
|
||||
updateCommittedIndex func(uint64)
|
||||
waitForApply func()
|
||||
}
|
||||
|
||||
func (s *EtcdServer) run() {
|
||||
@ -616,6 +617,9 @@ func (s *EtcdServer) run() {
|
||||
plog.Panicf("get snapshot from raft storage error: %v", err)
|
||||
}
|
||||
|
||||
// asynchronously accept apply packets, dispatch progress in-order
|
||||
sched := schedule.NewFIFOScheduler()
|
||||
|
||||
var (
|
||||
smu sync.RWMutex
|
||||
syncC <-chan time.Time
|
||||
@ -663,11 +667,12 @@ func (s *EtcdServer) run() {
|
||||
s.setCommittedIndex(ci)
|
||||
}
|
||||
},
|
||||
waitForApply: func() {
|
||||
sched.WaitFinish(0)
|
||||
},
|
||||
}
|
||||
s.r.start(rh)
|
||||
|
||||
// asynchronously accept apply packets, dispatch progress in-order
|
||||
sched := schedule.NewFIFOScheduler()
|
||||
ep := etcdProgress{
|
||||
confState: sn.Metadata.ConfState,
|
||||
snapi: sn.Metadata.Index,
|
||||
|
Loading…
x
Reference in New Issue
Block a user