Applying consistency fix: ClusterVersionSet (and co) might get no applied on v2store

ClusterVersionSet, ClusterMemberAttrSet, DowngradeInfoSet functions are
writing both to V2store and backend. Prior this CL there were
in a branch not executed if shouldApplyV3 was false,
e.g. during restore when Backend is up-to-date (has high
consistency-index) while v2store requires replay from WAL log.

The most serious consequence of this bug was that v2store after restore
could have different index (revision) than the same exact store before restore,
so potentially different content between replicas.

Also this change is supressing double-applying of Membership
(ClusterConfig) changes on Backend (store v3) - that lackilly are not
part of MVCC/KeyValue store, so they didn't caused Revisions to be
bumped.

Inspired by jingyih@ comment:
https://github.com/etcd-io/etcd/pull/12820#issuecomment-815299406
This commit is contained in:
Piotr Tabor
2021-04-10 11:42:15 +02:00
parent bad0b4d513
commit b1c04ce043
12 changed files with 112 additions and 82 deletions

View File

@@ -561,8 +561,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
kvindex := srv.consistIndex.ConsistentIndex()
srv.lg.Debug("restore consistentIndex",
zap.Uint64("index", kvindex))
srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
if beExist {
// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
// etcd from pre-3.0 release.
@@ -2018,8 +2017,13 @@ func (s *EtcdServer) apply(
es []raftpb.Entry,
confState *raftpb.ConfState,
) (appliedt uint64, appliedi uint64, shouldStop bool) {
s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
for i := range es {
e := es[i]
s.lg.Debug("Applying entry",
zap.Uint64("index", e.Index),
zap.Uint64("term", e.Term),
zap.Stringer("type", e.Type))
switch e.Type {
case raftpb.EntryNormal:
s.applyEntryNormal(&e)
@@ -2028,12 +2032,14 @@ func (s *EtcdServer) apply(
case raftpb.EntryConfChange:
// set the consistent index of current executing entry
shouldApplyV3 := false
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentIndex(e.Index)
shouldApplyV3 = true
}
var cc raftpb.ConfChange
pbutil.MustUnmarshal(&cc, e.Data)
removedSelf, err := s.applyConfChange(cc, confState)
removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
s.setAppliedIndex(e.Index)
s.setTerm(e.Term)
shouldStop = shouldStop || removedSelf
@@ -2085,18 +2091,16 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
var r pb.Request
rp := &r
pbutil.MustUnmarshal(rp, e.Data)
s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
return
}
s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
if raftReq.V2 != nil {
req := (*RequestV2)(raftReq.V2)
s.w.Trigger(req.ID, s.applyV2Request(req))
return
}
// do not re-apply applied entries.
if !shouldApplyV3 {
return
}
id := raftReq.ID
if id == 0 {
@@ -2109,7 +2113,12 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
if !needResult && raftReq.Txn != nil {
removeNeedlessRangeReqs(raftReq.Txn)
}
ar = s.applyV3.Apply(&raftReq)
ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
}
// do not re-apply applied entries.
if !shouldApplyV3 {
return
}
if ar == nil {
@@ -2142,7 +2151,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 bool) (bool, error) {
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
cc.NodeID = raft.None
s.r.ApplyConfChange(cc)
@@ -2165,9 +2174,9 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
)
}
if confChangeContext.IsPromote {
s.cluster.PromoteMember(confChangeContext.Member.ID)
s.cluster.PromoteMember(confChangeContext.Member.ID, shouldApplyV3)
} else {
s.cluster.AddMember(&confChangeContext.Member)
s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3)
if confChangeContext.Member.ID != s.id {
s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
@@ -2185,7 +2194,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
case raftpb.ConfChangeRemoveNode:
id := types.ID(cc.NodeID)
s.cluster.RemoveMember(id)
s.cluster.RemoveMember(id, shouldApplyV3)
if id == s.id {
return true, nil
}
@@ -2203,7 +2212,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
zap.String("member-id-from-message", m.ID.String()),
)
}
s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3)
if m.ID != s.id {
s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
}