mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server/etcdserver/raft.go:
1. rename confChangeCh to raftAdvancedC 2. rename waitApply to confChanged 3. add comments and test assertion Signed-off-by: Chao Chen <chaochn@amazon.com>
This commit is contained in:
parent
ad3b6ee4c6
commit
6cdc9ae4fe
@ -65,15 +65,17 @@ func init() {
|
|||||||
|
|
||||||
// toApply contains entries, snapshot to be applied. Once
|
// toApply contains entries, snapshot to be applied. Once
|
||||||
// an toApply is consumed, the entries will be persisted to
|
// an toApply is consumed, the entries will be persisted to
|
||||||
// to raft storage concurrently; the application must read
|
// raft storage concurrently; the application must read
|
||||||
// notifyc before assuming the raft messages are stable.
|
// notifyc before assuming the raft messages are stable.
|
||||||
type toApply struct {
|
type toApply struct {
|
||||||
entries []raftpb.Entry
|
entries []raftpb.Entry
|
||||||
snapshot raftpb.Snapshot
|
snapshot raftpb.Snapshot
|
||||||
// notifyc synchronizes etcd server applies with the raft node
|
// notifyc synchronizes etcd server applies with the raft node
|
||||||
notifyc chan struct{}
|
notifyc chan struct{}
|
||||||
// confChangeCh synchronizes etcd server applies confChange with raft node
|
// raftAdvancedC notifies EtcdServer.apply that
|
||||||
confChangeCh chan struct{}
|
// 'raftLog.applied' has advanced by r.Advance
|
||||||
|
// it should be used only when entries contain raftpb.EntryConfChange
|
||||||
|
raftAdvancedC <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type raftNode struct {
|
type raftNode struct {
|
||||||
@ -205,12 +207,12 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
notifyc := make(chan struct{}, 1)
|
notifyc := make(chan struct{}, 1)
|
||||||
confChangeCh := make(chan struct{}, 1)
|
raftAdvancedC := make(chan struct{}, 1)
|
||||||
ap := toApply{
|
ap := toApply{
|
||||||
entries: rd.CommittedEntries,
|
entries: rd.CommittedEntries,
|
||||||
snapshot: rd.Snapshot,
|
snapshot: rd.Snapshot,
|
||||||
notifyc: notifyc,
|
notifyc: notifyc,
|
||||||
confChangeCh: confChangeCh,
|
raftAdvancedC: raftAdvancedC,
|
||||||
}
|
}
|
||||||
|
|
||||||
updateCommittedIndex(&ap, rh)
|
updateCommittedIndex(&ap, rh)
|
||||||
@ -273,10 +275,10 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
|
|
||||||
r.raftStorage.Append(rd.Entries)
|
r.raftStorage.Append(rd.Entries)
|
||||||
|
|
||||||
waitApply := false
|
confChanged := false
|
||||||
for _, ent := range rd.CommittedEntries {
|
for _, ent := range rd.CommittedEntries {
|
||||||
if ent.Type == raftpb.EntryConfChange {
|
if ent.Type == raftpb.EntryConfChange {
|
||||||
waitApply = true
|
confChanged = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -296,7 +298,7 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
// We simply wait for ALL pending entries to be applied for now.
|
// We simply wait for ALL pending entries to be applied for now.
|
||||||
// We might improve this later on if it causes unnecessary long blocking issues.
|
// We might improve this later on if it causes unnecessary long blocking issues.
|
||||||
|
|
||||||
if waitApply {
|
if confChanged {
|
||||||
// blocks until 'applyAll' calls 'applyWait.Trigger'
|
// blocks until 'applyAll' calls 'applyWait.Trigger'
|
||||||
// to be in sync with scheduled config-change job
|
// to be in sync with scheduled config-change job
|
||||||
// (assume notifyc has cap of 1)
|
// (assume notifyc has cap of 1)
|
||||||
@ -317,9 +319,9 @@ func (r *raftNode) start(rh *raftReadyHandler) {
|
|||||||
// gofail: var raftBeforeAdvance struct{}
|
// gofail: var raftBeforeAdvance struct{}
|
||||||
r.Advance()
|
r.Advance()
|
||||||
|
|
||||||
if waitApply {
|
if confChanged {
|
||||||
// notify etcdserver that raft has already been notified or advanced.
|
// notify etcdserver that raft has already been notified or advanced.
|
||||||
confChangeCh <- struct{}{}
|
raftAdvancedC <- struct{}{}
|
||||||
}
|
}
|
||||||
case <-r.stopped:
|
case <-r.stopped:
|
||||||
return
|
return
|
||||||
|
@ -24,13 +24,14 @@ import (
|
|||||||
|
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
|
|
||||||
|
"go.etcd.io/raft/v3"
|
||||||
|
"go.etcd.io/raft/v3/raftpb"
|
||||||
|
|
||||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||||
"go.etcd.io/etcd/pkg/v3/pbutil"
|
"go.etcd.io/etcd/pkg/v3/pbutil"
|
||||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||||
"go.etcd.io/etcd/server/v3/mock/mockstorage"
|
"go.etcd.io/etcd/server/v3/mock/mockstorage"
|
||||||
serverstorage "go.etcd.io/etcd/server/v3/storage"
|
serverstorage "go.etcd.io/etcd/server/v3/storage"
|
||||||
"go.etcd.io/raft/v3"
|
|
||||||
"go.etcd.io/raft/v3/raftpb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGetIDs(t *testing.T) {
|
func TestGetIDs(t *testing.T) {
|
||||||
@ -231,6 +232,11 @@ func TestConfigChangeBlocksApply(t *testing.T) {
|
|||||||
// finish toApply, unblock raft routine
|
// finish toApply, unblock raft routine
|
||||||
<-ap.notifyc
|
<-ap.notifyc
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ap.raftAdvancedC:
|
||||||
|
t.Log("recevied raft advance notification")
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-continueC:
|
case <-continueC:
|
||||||
case <-time.After(time.Second):
|
case <-time.After(time.Second):
|
||||||
|
@ -1133,7 +1133,7 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
var shouldstop bool
|
var shouldstop bool
|
||||||
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.confChangeCh); shouldstop {
|
if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState, apply.raftAdvancedC); shouldstop {
|
||||||
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
|
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1649,8 +1649,9 @@ func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() }
|
|||||||
func (s *EtcdServer) Term() uint64 { return s.getTerm() }
|
func (s *EtcdServer) Term() uint64 { return s.getTerm() }
|
||||||
|
|
||||||
type confChangeResponse struct {
|
type confChangeResponse struct {
|
||||||
membs []*membership.Member
|
membs []*membership.Member
|
||||||
err error
|
raftAdvanceC <-chan struct{}
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
// configure sends a configuration change through consensus and
|
// configure sends a configuration change through consensus and
|
||||||
@ -1673,6 +1674,11 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me
|
|||||||
lg.Panic("failed to configure")
|
lg.Panic("failed to configure")
|
||||||
}
|
}
|
||||||
resp := x.(*confChangeResponse)
|
resp := x.(*confChangeResponse)
|
||||||
|
// etcdserver need to ensure the raft has already been notified
|
||||||
|
// or advanced before it responds to the client. Otherwise, the
|
||||||
|
// following config change request may be rejected.
|
||||||
|
// See https://github.com/etcd-io/etcd/issues/15528.
|
||||||
|
<-resp.raftAdvanceC
|
||||||
lg.Info(
|
lg.Info(
|
||||||
"applied a configuration change through raft",
|
"applied a configuration change through raft",
|
||||||
zap.String("local-member-id", s.MemberId().String()),
|
zap.String("local-member-id", s.MemberId().String()),
|
||||||
@ -1810,7 +1816,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
|
|||||||
func (s *EtcdServer) apply(
|
func (s *EtcdServer) apply(
|
||||||
es []raftpb.Entry,
|
es []raftpb.Entry,
|
||||||
confState *raftpb.ConfState,
|
confState *raftpb.ConfState,
|
||||||
confChangeCh chan struct{},
|
raftAdvancedC <-chan struct{},
|
||||||
) (appliedt uint64, appliedi uint64, shouldStop bool) {
|
) (appliedt uint64, appliedi uint64, shouldStop bool) {
|
||||||
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
|
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
|
||||||
for i := range es {
|
for i := range es {
|
||||||
@ -1842,19 +1848,7 @@ func (s *EtcdServer) apply(
|
|||||||
s.setAppliedIndex(e.Index)
|
s.setAppliedIndex(e.Index)
|
||||||
s.setTerm(e.Term)
|
s.setTerm(e.Term)
|
||||||
shouldStop = shouldStop || removedSelf
|
shouldStop = shouldStop || removedSelf
|
||||||
|
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), raftAdvancedC, err})
|
||||||
// etcdserver need to ensure the raft has already been notified
|
|
||||||
// or advanced before it responds to the client. Otherwise, the
|
|
||||||
// following config change request may be rejected.
|
|
||||||
// See https://github.com/etcd-io/etcd/issues/15528.
|
|
||||||
select {
|
|
||||||
case <-time.After(500 * time.Millisecond):
|
|
||||||
lg := s.Logger()
|
|
||||||
lg.Warn("timed out waiting for configChange notification")
|
|
||||||
case <-confChangeCh:
|
|
||||||
}
|
|
||||||
|
|
||||||
s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
lg := s.Logger()
|
lg := s.Logger()
|
||||||
|
@ -32,6 +32,9 @@ import (
|
|||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
|
|
||||||
|
"go.etcd.io/raft/v3"
|
||||||
|
"go.etcd.io/raft/v3/raftpb"
|
||||||
|
|
||||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||||
"go.etcd.io/etcd/api/v3/membershippb"
|
"go.etcd.io/etcd/api/v3/membershippb"
|
||||||
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
"go.etcd.io/etcd/client/pkg/v3/fileutil"
|
||||||
@ -58,8 +61,6 @@ import (
|
|||||||
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
||||||
"go.etcd.io/etcd/server/v3/storage/mvcc"
|
"go.etcd.io/etcd/server/v3/storage/mvcc"
|
||||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||||
"go.etcd.io/raft/v3"
|
|
||||||
"go.etcd.io/raft/v3/raftpb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestDoLocalAction tests requests which do not need to go through raft to be applied,
|
// TestDoLocalAction tests requests which do not need to go through raft to be applied,
|
||||||
@ -689,8 +690,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
|||||||
Data: pbutil.MustMarshal(cc),
|
Data: pbutil.MustMarshal(cc),
|
||||||
}}
|
}}
|
||||||
|
|
||||||
confChangeCh := make(chan struct{}, 1)
|
raftAdvancedC := make(chan struct{}, 1)
|
||||||
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, confChangeCh)
|
raftAdvancedC <- struct{}{}
|
||||||
|
_, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}, raftAdvancedC)
|
||||||
consistIndex := srv.consistIndex.ConsistentIndex()
|
consistIndex := srv.consistIndex.ConsistentIndex()
|
||||||
assert.Equal(t, uint64(2), appliedi)
|
assert.Equal(t, uint64(2), appliedi)
|
||||||
|
|
||||||
@ -764,8 +766,9 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) {
|
|||||||
ents = append(ents, ent)
|
ents = append(ents, ent)
|
||||||
}
|
}
|
||||||
|
|
||||||
confChangeCh := make(chan struct{}, 1)
|
raftAdvancedC := make(chan struct{}, 1)
|
||||||
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, confChangeCh)
|
raftAdvancedC <- struct{}{}
|
||||||
|
_, _, shouldStop := srv.apply(ents, &raftpb.ConfState{}, raftAdvancedC)
|
||||||
if !shouldStop {
|
if !shouldStop {
|
||||||
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
|
t.Errorf("shouldStop = %t, want %t", shouldStop, true)
|
||||||
}
|
}
|
||||||
|
@ -294,8 +294,7 @@ func TestMemberPromote(t *testing.T) {
|
|||||||
|
|
||||||
// TestMemberPromoteMemberNotLearner ensures that promoting a voting member fails.
|
// TestMemberPromoteMemberNotLearner ensures that promoting a voting member fails.
|
||||||
func TestMemberPromoteMemberNotLearner(t *testing.T) {
|
func TestMemberPromoteMemberNotLearner(t *testing.T) {
|
||||||
// TODO enable this test with integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`) after PR 15708 is merged
|
integration2.BeforeTest(t, integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`))
|
||||||
integration2.BeforeTest(t)
|
|
||||||
|
|
||||||
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
|
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3})
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
@ -380,7 +379,7 @@ func TestMemberPromoteMemberNotExist(t *testing.T) {
|
|||||||
|
|
||||||
// TestMaxLearnerInCluster verifies that the maximum number of learners allowed in a cluster
|
// TestMaxLearnerInCluster verifies that the maximum number of learners allowed in a cluster
|
||||||
func TestMaxLearnerInCluster(t *testing.T) {
|
func TestMaxLearnerInCluster(t *testing.T) {
|
||||||
integration2.BeforeTest(t)
|
integration2.BeforeTest(t, integration2.WithFailpoint("raftBeforeAdvance", `sleep(100)`))
|
||||||
|
|
||||||
// 1. start with a cluster with 3 voting member and max learner 2
|
// 1. start with a cluster with 3 voting member and max learner 2
|
||||||
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3, ExperimentalMaxLearners: 2, DisableStrictReconfigCheck: true})
|
clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 3, ExperimentalMaxLearners: 2, DisableStrictReconfigCheck: true})
|
||||||
@ -388,7 +387,9 @@ func TestMaxLearnerInCluster(t *testing.T) {
|
|||||||
|
|
||||||
// 2. adding 2 learner members should succeed
|
// 2. adding 2 learner members should succeed
|
||||||
for i := 0; i < 2; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
_, err := clus.Client(0).MemberAddAsLearner(context.Background(), []string{fmt.Sprintf("http://127.0.0.1:123%d", i)})
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
_, err := clus.Client(0).MemberAddAsLearner(ctx, []string{fmt.Sprintf("http://127.0.0.1:123%d", i)})
|
||||||
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to add learner member %v", err)
|
t.Fatalf("failed to add learner member %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user