Merge pull request #15708 from chaochn47/confchange_raft_node_notifies_apply

raft node notifies configure when confChanged
This commit is contained in:
Benjamin Wang 2023-06-28 10:03:50 +01:00 committed by GitHub
commit 22f9dac7b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 58 additions and 26 deletions

View File

@ -65,13 +65,17 @@ func init() {
// toApply contains entries, snapshot to be applied. Once
// an toApply is consumed, the entries will be persisted to
// to raft storage concurrently; the application must read
// raft storage concurrently; the application must read
// notifyc before assuming the raft messages are stable.
type toApply struct {
entries []raftpb.Entry
snapshot raftpb.Snapshot
// notifyc synchronizes etcd server applies with the raft node
notifyc chan struct{}
// raftAdvancedC notifies EtcdServer.apply that
// 'raftLog.applied' has advanced by r.Advance
// it should be used only when entries contain raftpb.EntryConfChange
raftAdvancedC <-chan struct{}
}
type raftNode struct {
@ -203,10 +207,12 @@ func (r *raftNode) start(rh *raftReadyHandler) {
}
notifyc := make(chan struct{}, 1)
raftAdvancedC := make(chan struct{}, 1)
ap := toApply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
notifyc: notifyc,
raftAdvancedC: raftAdvancedC,
}
updateCommittedIndex(&ap, rh)
@ -269,6 +275,14 @@ func (r *raftNode) start(rh *raftReadyHandler) {
r.raftStorage.Append(rd.Entries)
confChanged := false
for _, ent := range rd.CommittedEntries {
if ent.Type == raftpb.EntryConfChange {
confChanged = true
break
}
}
if !islead {
// finish processing incoming messages before we signal notifyc chan
msgs := r.processMessages(rd.Messages)
@ -283,14 +297,8 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// on its own single-node cluster, before toApply-layer applies the config change.
// We simply wait for ALL pending entries to be applied for now.
// We might improve this later on if it causes unnecessary long blocking issues.
waitApply := false
for _, ent := range rd.CommittedEntries {
if ent.Type == raftpb.EntryConfChange {
waitApply = true
break
}
}
if waitApply {
if confChanged {
// blocks until 'applyAll' calls 'applyWait.Trigger'
// to be in sync with scheduled config-change job
// (assume notifyc has cap of 1)
@ -310,6 +318,11 @@ func (r *raftNode) start(rh *raftReadyHandler) {
// gofail: var raftBeforeAdvance struct{}
r.Advance()
if confChanged {
// notify etcdserver that raft has already been notified or advanced.
raftAdvancedC <- struct{}{}
}
case <-r.stopped:
return
}

View File

@ -24,13 +24,14 @@ import (
"go.uber.org/zap/zaptest"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/mock/mockstorage"
serverstorage "go.etcd.io/etcd/server/v3/storage"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)
func TestGetIDs(t *testing.T) {
@ -231,6 +232,11 @@ func TestConfigChangeBlocksApply(t *testing.T) {
// finish toApply, unblock raft routine
<-ap.notifyc
select {
case <-ap.raftAdvancedC:
t.Log("recevied raft advance notification")
}
select {
case <-continueC:
case <-time.After(time.Second):

View File

@ -1133,7 +1133,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
return
}
var shouldstop bool
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.raftAdvancedC); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
@ -1649,8 +1649,9 @@ func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() }
func (s *EtcdServer) Term() uint64 { return s.getTerm() }
type confChangeResponse struct {
membs []*membership.Member
err error
membs []*membership.Member
raftAdvanceC <-chan struct{}
err error
}
// configure sends a configuration change through consensus and
@ -1673,6 +1674,11 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me
lg.Panic("failed to configure")
}
resp := x.(*confChangeResponse)
// etcdserver need to ensure the raft has already been notified
// or advanced before it responds to the client. Otherwise, the
// following config change request may be rejected.
// See https://github.com/etcd-io/etcd/issues/15528.
<-resp.raftAdvanceC
lg.Info(
"applied a configuration change through raft",
zap.String("local-member-id", s.MemberId().String()),
@ -1810,6 +1816,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
func (s *EtcdServer) apply(
es []raftpb.Entry,
confState *raftpb.ConfState,
raftAdvancedC <-chan struct{},
) (appliedt uint64, appliedi uint64, shouldStop bool) {
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
for i := range es {
@ -1841,7 +1848,7 @@ func (s *EtcdServer) apply(
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
shouldStop = shouldStop || removedSelf
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), raftAdvancedC, err})
default:
lg := s.Logger()

View File

@ -32,6 +32,9 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/membershippb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
@ -58,8 +61,6 @@ import (
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
"go.etcd.io/etcd/server/v3/storage/mvcc"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)
// TestDoLocalAction tests requests which do not need to go through raft to be applied,
@ -689,7 +690,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
Data: pbutil.MustMarshal(cc),
}}
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{})
raftAdvancedC := make(chan struct{}, 1)
raftAdvancedC <- struct{}{}
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, raftAdvancedC)
consistIndex := srv.consistIndex.ConsistentIndex()
assert.Equal(t, uint64(2), appliedi)
@ -763,7 +766,9 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
ents = append(ents, ent)
}
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{})
raftAdvancedC := make(chan struct{}, 1)
raftAdvancedC <- struct{}{}
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, raftAdvancedC)
if !shouldStop {
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
}

View File

@ -294,8 +294,7 @@ func TestMemberPromote(t *testing.T) {
// TestMemberPromoteMemberNotLearner ensures that promoting a voting member fails.
func TestMemberPromoteMemberNotLearner(t *testing.T) {
// TODO enable this test with integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`) after PR 15708 is merged
integration2.BeforeTest(t)
integration2.BeforeTest(t, integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`))
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
defer clus.Terminate(t)
@ -380,7 +379,7 @@ func TestMemberPromoteMemberNotExist(t *testing.T) {
// TestMaxLearnerInCluster verifies that the maximum number of learners allowed in a cluster
func TestMaxLearnerInCluster(t *testing.T) {
integration2.BeforeTest(t)
integration2.BeforeTest(t, integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`))
// 1. start with a cluster with 3 voting member and max learner 2
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3, ExperimentalMaxLearners: 2, DisableStrictReconfigCheck: true})
@ -388,7 +387,9 @@ func TestMaxLearnerInCluster(t *testing.T) {
// 2. adding 2 learner members should succeed
for i := 0; i < 2; i++ {
_, err := clus.Client(0).MemberAddAsLearner(context.Background(), []string{fmt.Sprintf("http://127.0.0.1:123%d", i)})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err := clus.Client(0).MemberAddAsLearner(ctx, []string{fmt.Sprintf("http://127.0.0.1:123%d", i)})
cancel()
if err != nil {
t.Fatalf("failed to add learner member %v", err)
}