mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12854 from ptabor/20210410-shouldApplyV3
(no)StoreV2 (Part 3): Applying consistency fix: ClusterVersionSet (and co) might get not applied on v2store
This commit is contained in:
@@ -572,8 +572,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
|
||||
kvindex := srv.consistIndex.ConsistentIndex()
|
||||
srv.lg.Debug("restore consistentIndex",
|
||||
zap.Uint64("index", kvindex))
|
||||
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
|
||||
if beExist {
|
||||
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
|
||||
// etcd from pre-3.0 release.
|
||||
@@ -2039,8 +2038,13 @@ func (s *EtcdServer) apply(
|
||||
es []raftpb.Entry,
|
||||
confState *raftpb.ConfState,
|
||||
) (appliedt uint64, appliedi uint64, shouldStop bool) {
|
||||
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
|
||||
for i := range es {
|
||||
e := es[i]
|
||||
s.lg.Debug("Applying entry",
|
||||
zap.Uint64("index", e.Index),
|
||||
zap.Uint64("term", e.Term),
|
||||
zap.Stringer("type", e.Type))
|
||||
switch e.Type {
|
||||
case raftpb.EntryNormal:
|
||||
s.applyEntryNormal(&e)
|
||||
@@ -2048,13 +2052,19 @@ func (s *EtcdServer) apply(
|
||||
s.setTerm(e.Term)
|
||||
|
||||
case raftpb.EntryConfChange:
|
||||
// We need to apply all WAL entries on top of v2store
|
||||
// and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
|
||||
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||
|
||||
// set the consistent index of current executing entry
|
||||
if e.Index > s.consistIndex.ConsistentIndex() {
|
||||
s.consistIndex.SetConsistentIndex(e.Index)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
|
||||
var cc raftpb.ConfChange
|
||||
pbutil.MustUnmarshal(&cc, e.Data)
|
||||
removedSelf, err := s.applyConfChange(cc, confState)
|
||||
removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
|
||||
s.setAppliedIndex(e.Index)
|
||||
s.setTerm(e.Term)
|
||||
shouldStop = shouldStop || removedSelf
|
||||
@@ -2074,17 +2084,17 @@ func (s *EtcdServer) apply(
|
||||
|
||||
// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
|
||||
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
shouldApplyV3 := false
|
||||
shouldApplyV3 := membership.ApplyV2storeOnly
|
||||
index := s.consistIndex.ConsistentIndex()
|
||||
if e.Index > index {
|
||||
// set the consistent index of current executing entry
|
||||
s.consistIndex.SetConsistentIndex(e.Index)
|
||||
shouldApplyV3 = true
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
s.lg.Debug("apply entry normal",
|
||||
zap.Uint64("consistent-index", index),
|
||||
zap.Uint64("entry-index", e.Index),
|
||||
zap.Bool("should-applyV3", shouldApplyV3))
|
||||
zap.Bool("should-applyV3", bool(shouldApplyV3)))
|
||||
|
||||
// raft state machine may generate noop entry when leader confirmation.
|
||||
// skip it in advance to avoid some potential bug in the future
|
||||
@@ -2104,18 +2114,16 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
var r pb.Request
|
||||
rp := &r
|
||||
pbutil.MustUnmarshal(rp, e.Data)
|
||||
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
|
||||
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
|
||||
return
|
||||
}
|
||||
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
|
||||
if raftReq.V2 != nil {
|
||||
req := (*RequestV2)(raftReq.V2)
|
||||
s.w.Trigger(req.ID, s.applyV2Request(req))
|
||||
return
|
||||
}
|
||||
// do not re-apply applied entries.
|
||||
if !shouldApplyV3 {
|
||||
return
|
||||
}
|
||||
|
||||
id := raftReq.ID
|
||||
if id == 0 {
|
||||
@@ -2128,7 +2136,12 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
if !needResult && raftReq.Txn != nil {
|
||||
removeNeedlessRangeReqs(raftReq.Txn)
|
||||
}
|
||||
ar = s.applyV3.Apply(&raftReq)
|
||||
ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
|
||||
}
|
||||
|
||||
// do not re-apply applied entries.
|
||||
if !shouldApplyV3 {
|
||||
return
|
||||
}
|
||||
|
||||
if ar == nil {
|
||||
@@ -2170,7 +2183,7 @@ func (s *EtcdServer) notifyAboutFirstCommitInTerm() {
|
||||
|
||||
// applyConfChange applies a ConfChange to the server. It is only
|
||||
// invoked with a ConfChange that has already passed through Raft
|
||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
|
||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
|
||||
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
|
||||
cc.NodeID = raft.None
|
||||
s.r.ApplyConfChange(cc)
|
||||
@@ -2193,9 +2206,9 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
)
|
||||
}
|
||||
if confChangeContext.IsPromote {
|
||||
s.cluster.PromoteMember(confChangeContext.Member.ID)
|
||||
s.cluster.PromoteMember(confChangeContext.Member.ID, shouldApplyV3)
|
||||
} else {
|
||||
s.cluster.AddMember(&confChangeContext.Member)
|
||||
s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3)
|
||||
|
||||
if confChangeContext.Member.ID != s.id {
|
||||
s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
|
||||
@@ -2213,7 +2226,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
|
||||
case raftpb.ConfChangeRemoveNode:
|
||||
id := types.ID(cc.NodeID)
|
||||
s.cluster.RemoveMember(id)
|
||||
s.cluster.RemoveMember(id, shouldApplyV3)
|
||||
if id == s.id {
|
||||
return true, nil
|
||||
}
|
||||
@@ -2231,7 +2244,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
zap.String("member-id-from-message", m.ID.String()),
|
||||
)
|
||||
}
|
||||
s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
|
||||
s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3)
|
||||
if m.ID != s.id {
|
||||
s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user