mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: wait for raft is notified on confChange before responding to client
Signed-off-by: Benjamin Wang <wachao@vmware.com>
This commit is contained in:
parent
d3233fec0a
commit
ad3b6ee4c6
@ -72,6 +72,8 @@ type toApply struct {
|
|||||||
snapshot raftpb.Snapshot
|
snapshot raftpb.Snapshot
|
||||||
// notifyc synchronizes etcd server applies with the raft node
|
// notifyc synchronizes etcd server applies with the raft node
|
||||||
notifyc chan struct{}
|
notifyc chan struct{}
|
||||||
|
// confChangeCh synchronizes etcd server applies confChange with raft node
|
||||||
|
confChangeCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type raftNode struct {
|
type raftNode struct {
|
||||||
@ -203,10 +205,12 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
notifyc := make(chan struct{}, 1)
|
notifyc := make(chan struct{}, 1)
|
||||||
|
confChangeCh := make(chan struct{}, 1)
|
||||||
ap := toApply{
|
ap := toApply{
|
||||||
entries: rd.CommittedEntries,
|
entries: rd.CommittedEntries,
|
||||||
snapshot: rd.Snapshot,
|
snapshot: rd.Snapshot,
|
||||||
notifyc: notifyc,
|
notifyc: notifyc,
|
||||||
|
confChangeCh: confChangeCh,
|
||||||
}
|
}
|
||||||
|
|
||||||
updateCommittedIndex(&ap, rh)
|
updateCommittedIndex(&ap, rh)
|
||||||
@ -269,6 +273,14 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
|
|
||||||
r.raftStorage.Append(rd.Entries)
|
r.raftStorage.Append(rd.Entries)
|
||||||
|
|
||||||
|
waitApply := false
|
||||||
|
for _, ent := range rd.CommittedEntries {
|
||||||
|
if ent.Type == raftpb.EntryConfChange {
|
||||||
|
waitApply = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !islead {
|
if !islead {
|
||||||
// finish processing incoming messages before we signal notifyc chan
|
// finish processing incoming messages before we signal notifyc chan
|
||||||
msgs := r.processMessages(rd.Messages)
|
msgs := r.processMessages(rd.Messages)
|
||||||
@ -283,13 +295,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
// on its own single-node cluster, before toApply-layer applies the config change.
|
// on its own single-node cluster, before toApply-layer applies the config change.
|
||||||
// We simply wait for ALL pending entries to be applied for now.
|
// We simply wait for ALL pending entries to be applied for now.
|
||||||
// We might improve this later on if it causes unnecessary long blocking issues.
|
// We might improve this later on if it causes unnecessary long blocking issues.
|
||||||
waitApply := false
|
|
||||||
for _, ent := range rd.CommittedEntries {
|
|
||||||
if ent.Type == raftpb.EntryConfChange {
|
|
||||||
waitApply = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if waitApply {
|
if waitApply {
|
||||||
// blocks until 'applyAll' calls 'applyWait.Trigger'
|
// blocks until 'applyAll' calls 'applyWait.Trigger'
|
||||||
// to be in sync with scheduled config-change job
|
// to be in sync with scheduled config-change job
|
||||||
@ -310,6 +316,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
|
|
||||||
// gofail: var raftBeforeAdvance struct{}
|
// gofail: var raftBeforeAdvance struct{}
|
||||||
r.Advance()
|
r.Advance()
|
||||||
|
|
||||||
|
if waitApply {
|
||||||
|
// notify etcdserver that raft has already been notified or advanced.
|
||||||
|
confChangeCh <- struct{}{}
|
||||||
|
}
|
||||||
case <-r.stopped:
|
case <-r.stopped:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -1133,7 +1133,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
var shouldstop bool
|
var shouldstop bool
|
||||||
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
|
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.confChangeCh); shouldstop {
|
||||||
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
|
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1810,6 +1810,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
|||||||
func (s *EtcdServer) apply(
|
func (s *EtcdServer) apply(
|
||||||
es []raftpb.Entry,
|
es []raftpb.Entry,
|
||||||
confState *raftpb.ConfState,
|
confState *raftpb.ConfState,
|
||||||
|
confChangeCh chan struct{},
|
||||||
) (appliedt uint64, appliedi uint64, shouldStop bool) {
|
) (appliedt uint64, appliedi uint64, shouldStop bool) {
|
||||||
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
|
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
|
||||||
for i := range es {
|
for i := range es {
|
||||||
@ -1841,6 +1842,18 @@ func (s *EtcdServer) apply(
|
|||||||
s.setAppliedIndex(e.Index)
|
s.setAppliedIndex(e.Index)
|
||||||
s.setTerm(e.Term)
|
s.setTerm(e.Term)
|
||||||
shouldStop = shouldStop || removedSelf
|
shouldStop = shouldStop || removedSelf
|
||||||
|
|
||||||
|
// etcdserver need to ensure the raft has already been notified
|
||||||
|
// or advanced before it responds to the client. Otherwise, the
|
||||||
|
// following config change request may be rejected.
|
||||||
|
// See https://github.com/etcd-io/etcd/issues/15528.
|
||||||
|
select {
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
|
lg := s.Logger()
|
||||||
|
lg.Warn("timed out waiting for configChange notification")
|
||||||
|
case <-confChangeCh:
|
||||||
|
}
|
||||||
|
|
||||||
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
|
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
@ -689,7 +689,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
|||||||
Data: pbutil.MustMarshal(cc),
|
Data: pbutil.MustMarshal(cc),
|
||||||
}}
|
}}
|
||||||
|
|
||||||
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
|
confChangeCh := make(chan struct{}, 1)
|
||||||
|
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, confChangeCh)
|
||||||
consistIndex := srv.consistIndex.ConsistentIndex()
|
consistIndex := srv.consistIndex.ConsistentIndex()
|
||||||
assert.Equal(t, uint64(2), appliedi)
|
assert.Equal(t, uint64(2), appliedi)
|
||||||
|
|
||||||
@ -763,7 +764,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
|||||||
ents = append(ents, ent)
|
ents = append(ents, ent)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
|
confChangeCh := make(chan struct{}, 1)
|
||||||
|
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, confChangeCh)
|
||||||
if !shouldStop {
|
if !shouldStop {
|
||||||
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
|
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user