From 66c7aab4d3ac065468a10bb368aa363ca5fce57a Mon Sep 17 00:00:00 2001 From: ahrtr Date: Fri, 8 Apr 2022 14:39:23 +0800 Subject: [PATCH] fix the data inconsistency issue by adding a txPostLockHook into the backend Previously the SetConsistentIndex() is called during the apply workflow, but it's outside the db transaction. If a commit happens between SetConsistentIndex and the following apply workflow, and etcd crashes for whatever reason right after the commit, then etcd commits an incomplete transaction to db. Eventually etcd runs into the data inconsistency issue. In this commit, we move the SetConsistentIndex into a txPostLockHook, so it will be executed inside the transaction lock. --- etcdutl/etcdutl/backup_command.go | 2 +- server/auth/range_perm_cache.go | 4 +- server/auth/store.go | 54 +++++++++++------------ server/etcdserver/api/membership/store.go | 14 +++--- server/etcdserver/api/v3alarm/alarms.go | 6 +-- server/etcdserver/backend.go | 2 +- server/etcdserver/cindex/cindex.go | 54 +++++++++++++++++++++-- server/etcdserver/server.go | 34 +++++++++++++- server/etcdserver/server_test.go | 7 +-- server/lease/lessor.go | 4 +- server/mvcc/backend/backend.go | 16 ++++++- server/mvcc/backend/batch_tx.go | 28 +++++++++--- server/mvcc/backend/hooks_test.go | 2 - server/mvcc/backend/verify.go | 14 ++++++ server/mvcc/backend/verify_test.go | 34 +++++++++++--- server/mvcc/kvstore.go | 6 +-- server/mvcc/kvstore_compaction.go | 2 +- server/mvcc/kvstore_test.go | 9 ++-- server/mvcc/kvstore_txn.go | 2 +- server/mvcc/util.go | 2 +- server/verify/verify.go | 3 +- 21 files changed, 216 insertions(+), 83 deletions(-) diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index 6bebaf920..6cd243f38 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -319,7 +319,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir if !v3 { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() cindex.UnsafeCreateMetaBucket(tx) cindex.UnsafeUpdateConsistentIndex(tx, idx, term) diff --git a/server/auth/range_perm_cache.go b/server/auth/range_perm_cache.go index 7d77b16ea..54be7d25f 100644 --- a/server/auth/range_perm_cache.go +++ b/server/auth/range_perm_cache.go @@ -22,7 +22,7 @@ import ( "go.uber.org/zap" ) -func getMergedPerms(lg *zap.Logger, tx backend.BatchTx, userName string) *unifiedRangePermissions { +func getMergedPerms(lg *zap.Logger, tx backend.ReadTx, userName string) *unifiedRangePermissions { user := getUser(lg, tx, userName) if user == nil { return nil @@ -105,7 +105,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b return false } -func (as *authStore) isRangeOpPermitted(tx backend.BatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { +func (as *authStore) isRangeOpPermitted(tx backend.ReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { // assumption: tx is Lock()ed _, ok := as.rangePermCache[userName] if !ok { diff --git a/server/auth/store.go b/server/auth/store.go index 44c1d35fd..09d9cdc67 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -223,7 +223,7 @@ func (as *authStore) AuthEnable() error { } b := as.be tx := b.BatchTx() - tx.Lock() + tx.LockInsideApply() defer func() { tx.Unlock() b.ForceCommit() @@ -259,7 +259,7 @@ func (as *authStore) AuthDisable() { } b := as.be tx := b.BatchTx() - tx.Lock() + tx.LockInsideApply() tx.UnsafePut(buckets.Auth, enableFlagKey, authDisabled) as.commitRevision(tx) tx.Unlock() @@ -287,7 +287,7 @@ func (as *authStore) Authenticate(ctx context.Context, username, password string } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, username) @@ -324,7 +324,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { // CompareHashAndPassword is very expensive, so we use closures // to avoid putting it in the critical section of the tx lock. revision, err := func() (uint64, error) { - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -353,7 +353,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { func (as *authStore) Recover(be backend.Backend) { enabled := false as.be = be - tx := be.BatchTx() + tx := be.ReadTx() tx.Lock() _, vs := tx.UnsafeRange(buckets.Auth, enableFlagKey, nil, 0) if len(vs) == 1 { @@ -385,7 +385,7 @@ func (as *authStore) UserAdd(r *pb.AuthUserAddRequest) (*pb.AuthUserAddResponse, } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -431,7 +431,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -456,7 +456,7 @@ func (as *authStore) UserDelete(r *pb.AuthUserDeleteRequest) (*pb.AuthUserDelete func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*pb.AuthUserChangePasswordResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -498,7 +498,7 @@ func (as *authStore) UserChangePassword(r *pb.AuthUserChangePasswordRequest) (*p func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUserGrantRoleResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.User) @@ -544,7 +544,7 @@ func (as *authStore) UserGrantRole(r *pb.AuthUserGrantRoleRequest) (*pb.AuthUser func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() user := getUser(as.lg, tx, r.Name) tx.Unlock() @@ -559,7 +559,7 @@ func (as *authStore) UserGet(r *pb.AuthUserGetRequest) (*pb.AuthUserGetResponse, func (as *authStore) UserList(r *pb.AuthUserListRequest) (*pb.AuthUserListResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() users := getAllUsers(as.lg, tx) tx.Unlock() @@ -581,7 +581,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() user := getUser(as.lg, tx, r.Name) @@ -623,7 +623,7 @@ func (as *authStore) UserRevokeRole(r *pb.AuthUserRevokeRoleRequest) (*pb.AuthUs func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() var resp pb.AuthRoleGetResponse @@ -638,7 +638,7 @@ func (as *authStore) RoleGet(r *pb.AuthRoleGetRequest) (*pb.AuthRoleGetResponse, func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() roles := getAllRoles(as.lg, tx) tx.Unlock() @@ -651,7 +651,7 @@ func (as *authStore) RoleList(r *pb.AuthRoleListRequest) (*pb.AuthRoleListRespon func (as *authStore) RoleRevokePermission(r *pb.AuthRoleRevokePermissionRequest) (*pb.AuthRoleRevokePermissionResponse, error) { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Role) @@ -697,7 +697,7 @@ func (as *authStore) RoleDelete(r *pb.AuthRoleDeleteRequest) (*pb.AuthRoleDelete } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Role) @@ -742,7 +742,7 @@ func (as *authStore) RoleAdd(r *pb.AuthRoleAddRequest) (*pb.AuthRoleAddResponse, } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Name) @@ -786,7 +786,7 @@ func (as *authStore) RoleGrantPermission(r *pb.AuthRoleGrantPermissionRequest) ( } tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() role := getRole(as.lg, tx, r.Name) @@ -849,7 +849,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE return ErrAuthOldRevision } - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -891,7 +891,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { return ErrUserEmpty } - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() u := getUser(as.lg, tx, authInfo.Username) tx.Unlock() @@ -907,7 +907,7 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { return nil } -func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User { +func getUser(lg *zap.Logger, tx backend.ReadTx, username string) *authpb.User { _, vs := tx.UnsafeRange(buckets.AuthUsers, []byte(username), nil, 0) if len(vs) == 0 { return nil @@ -925,7 +925,7 @@ func getUser(lg *zap.Logger, tx backend.BatchTx, username string) *authpb.User { return user } -func getAllUsers(lg *zap.Logger, tx backend.BatchTx) []*authpb.User { +func getAllUsers(lg *zap.Logger, tx backend.ReadTx) []*authpb.User { _, vs := tx.UnsafeRange(buckets.AuthUsers, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil @@ -955,7 +955,7 @@ func delUser(tx backend.BatchTx, username string) { tx.UnsafeDelete(buckets.AuthUsers, []byte(username)) } -func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role { +func getRole(lg *zap.Logger, tx backend.ReadTx, rolename string) *authpb.Role { _, vs := tx.UnsafeRange(buckets.AuthRoles, []byte(rolename), nil, 0) if len(vs) == 0 { return nil @@ -969,7 +969,7 @@ func getRole(lg *zap.Logger, tx backend.BatchTx, rolename string) *authpb.Role { return role } -func getAllRoles(lg *zap.Logger, tx backend.BatchTx) []*authpb.Role { +func getAllRoles(lg *zap.Logger, tx backend.ReadTx) []*authpb.Role { _, vs := tx.UnsafeRange(buckets.AuthRoles, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil @@ -1028,7 +1028,7 @@ func NewAuthStore(lg *zap.Logger, be backend.Backend, tp TokenProvider, bcryptCo } tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Auth) tx.UnsafeCreateBucket(buckets.AuthUsers) @@ -1081,7 +1081,7 @@ func (as *authStore) commitRevision(tx backend.BatchTx) { tx.UnsafePut(buckets.Auth, revisionKey, revBytes) } -func getRevision(tx backend.BatchTx) uint64 { +func getRevision(tx backend.ReadTx) uint64 { _, vs := tx.UnsafeRange(buckets.Auth, revisionKey, nil, 0) if len(vs) != 1 { // this can happen in the initialization phase @@ -1281,7 +1281,7 @@ func (as *authStore) WithRoot(ctx context.Context) context.Context { func (as *authStore) HasRole(user, role string) bool { tx := as.be.BatchTx() - tx.Lock() + tx.LockInsideApply() u := getUser(as.lg, tx, user) tx.Unlock() diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index a0cdf370a..fadc81822 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -52,7 +52,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er } tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() if unsafeMemberExists(tx, mkey) { return errMemberAlreadyExist @@ -65,7 +65,7 @@ func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) er // from the v3 backend. func TrimClusterFromBackend(be backend.Backend) error { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeDeleteBucket(buckets.Cluster) return nil @@ -75,7 +75,7 @@ func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error { mkey := backendMemberKey(id) tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed")) if !unsafeMemberExists(tx, mkey) { @@ -140,7 +140,7 @@ func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.I func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error { lg.Info("Trimming membership information from the backend...") tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error { tx.UnsafeDelete(buckets.Members, k) @@ -185,7 +185,7 @@ func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { ckey := backendClusterVersionKey() tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String())) } @@ -198,7 +198,7 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D lg.Panic("failed to marshal downgrade information", zap.Error(err)) } tx := be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(buckets.Cluster, dkey, dvalue) } @@ -316,7 +316,7 @@ func backendDowngradeKey() []byte { func mustCreateBackendBuckets(be backend.Backend) { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(buckets.Members) tx.UnsafeCreateBucket(buckets.MembersRemoved) diff --git a/server/etcdserver/api/v3alarm/alarms.go b/server/etcdserver/api/v3alarm/alarms.go index 3038813cc..5d4a641ce 100644 --- a/server/etcdserver/api/v3alarm/alarms.go +++ b/server/etcdserver/api/v3alarm/alarms.go @@ -65,7 +65,7 @@ func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember { } b := a.bg.Backend() - b.BatchTx().Lock() + b.BatchTx().LockInsideApply() b.BatchTx().UnsafePut(buckets.Alarm, v, nil) b.BatchTx().Unlock() @@ -94,7 +94,7 @@ func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember { } b := a.bg.Backend() - b.BatchTx().Lock() + b.BatchTx().LockInsideApply() b.BatchTx().UnsafeDelete(buckets.Alarm, v) b.BatchTx().Unlock() @@ -122,7 +122,7 @@ func (a *AlarmStore) restore() error { b := a.bg.Backend() tx := b.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Alarm) err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error { var m pb.AlarmMember diff --git a/server/etcdserver/backend.go b/server/etcdserver/backend.go index 081be2b52..2beef5763 100644 --- a/server/etcdserver/backend.go +++ b/server/etcdserver/backend.go @@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx()) + consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.ReadTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 4978124ba..ac6ae6374 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -34,9 +34,18 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // ConsistentApplyingIndex returns the consistent applying index of current executing entry. + ConsistentApplyingIndex() (uint64, uint64) + + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. + UnsafeConsistentIndex() uint64 + // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) + // SetConsistentApplyingIndex set the consistent applying index of current executing entry. + SetConsistentApplyingIndex(v uint64, term uint64) + // UnsafeSave must be called holding the lock on the tx. // It saves consistentIndex to the underlying stable storage. UnsafeSave(tx backend.BatchTx) @@ -56,6 +65,12 @@ type consistentIndex struct { // The value is being persisted in the backend since v3.5. term uint64 + // applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index + // and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be + // saved to consistentIndex and term above in the txPostLockInsideApplyHook. + applyingIndex uint64 + applyingTerm uint64 + // be is used for initial read consistentIndex be Backend // mutex is protecting be. @@ -75,7 +90,17 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v, term := ReadConsistentIndex(ci.be.BatchTx()) + v, term := ReadConsistentIndex(ci.be.ReadTx()) + ci.SetConsistentIndex(v, term) + return v +} + +func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { + if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { + return index + } + + v, term := unsafeReadConsistentIndex(ci.be.ReadTx()) ci.SetConsistentIndex(v, term) return v } @@ -99,6 +124,15 @@ func (ci *consistentIndex) SetBackend(be Backend) { ci.SetConsistentIndex(0, 0) } +func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm) +} + +func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) { + atomic.StoreUint64(&ci.applyingIndex, v) + atomic.StoreUint64(&ci.applyingTerm, term) +} + func NewFakeConsistentIndex(index uint64) ConsistentIndexer { return &fakeConsistentIndex{index: index} } @@ -108,12 +142,24 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} +func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term) +} +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) atomic.StoreUint64(&f.term, term) } +func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) { + atomic.StoreUint64(&f.index, index) + atomic.StoreUint64(&f.term, term) +} func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} @@ -125,7 +171,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) { // CreateMetaBucket creates the `meta` bucket (if it does not exists yet). func CreateMetaBucket(tx backend.BatchTx) { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(buckets.Meta) } @@ -174,7 +220,7 @@ func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) } func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) { - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() UnsafeUpdateConsistentIndex(tx, index, term) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 0b7e48c98..bba431971 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -661,6 +661,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { }) } + // Set the hook after EtcdServer finishes the initialization to avoid + // the hook being called during the initialization process. + srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook()) + // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ Logger: cfg.Logger, @@ -1260,6 +1264,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) + newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook()) lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns @@ -2128,7 +2133,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -2155,10 +2160,18 @@ func (s *EtcdServer) apply( // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly + applyV3Performed := false + defer func() { + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 { + s.consistIndex.SetConsistentIndex(e.Index, e.Term) + } + }() index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -2207,6 +2220,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } + applyV3Performed = true ar = s.applyV3.Apply(&raftReq, shouldApplyV3) } @@ -2258,6 +2272,13 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) + + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 { + applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm) + } return false, err } @@ -2683,6 +2704,15 @@ func (s *EtcdServer) raftStatus() raft.Status { return s.r.Node.Status() } +func (s *EtcdServer) getTxPostLockInsideApplyHook() func() { + return func() { + applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + if applyingIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm) + } + } +} + func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error { size := be.Size() sizeInUse := be.SizeInUse() diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index c5a61da47..5127e5c71 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -686,9 +686,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) consistIndex := srv.consistIndex.ConsistentIndex() - if consistIndex != appliedi { - t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi) - } + assert.Equal(t, uint64(2), appliedi) t.Run("verify-backend", func(t *testing.T) { tx := be.BatchTx() @@ -697,9 +695,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx()) + rindex, _ := cindex.ReadConsistentIndex(be.ReadTx()) assert.Equal(t, consistIndex, rindex) - assert.Equal(t, uint64(4), rterm) } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 409fa5b2a..02ee77f50 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Lease) lpbs := unsafeGetAllLeases(tx) @@ -852,7 +852,7 @@ func (l *Lease) persistTo(b backend.Backend) { panic("failed to marshal lease proto item") } - b.BatchTx().Lock() + b.BatchTx().LockInsideApply() b.BatchTx().UnsafePut(buckets.Lease, key, val) b.BatchTx().Unlock() } diff --git a/server/mvcc/backend/backend.go b/server/mvcc/backend/backend.go index 1e5f191c8..f156ae994 100644 --- a/server/mvcc/backend/backend.go +++ b/server/mvcc/backend/backend.go @@ -68,6 +68,9 @@ type Backend interface { Defrag() error ForceCommit() Close() error + + // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook. + SetTxPostLockInsideApplyHook(func()) } type Snapshot interface { @@ -120,6 +123,9 @@ type backend struct { hooks Hooks + // txPostLockInsideApplyHook is called each time right after locking the tx. + txPostLockInsideApplyHook func() + lg *zap.Logger } @@ -231,6 +237,14 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } +func (b *backend) SetTxPostLockInsideApplyHook(hook func()) { + // It needs to lock the batchTx, because the periodic commit + // may be accessing the txPostLockInsideApplyHook at the moment. + b.batchTx.lock() + defer b.batchTx.Unlock() + b.txPostLockInsideApplyHook = hook +} + func (b *backend) ReadTx() ReadTx { return b.readTx } // ConcurrentReadTx creates and returns a new ReadTx, which: @@ -440,7 +454,7 @@ func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. - b.batchTx.Lock() + b.batchTx.LockOutsideApply() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. diff --git a/server/mvcc/backend/batch_tx.go b/server/mvcc/backend/batch_tx.go index a8e649199..c8fa55954 100644 --- a/server/mvcc/backend/batch_tx.go +++ b/server/mvcc/backend/batch_tx.go @@ -65,18 +65,32 @@ type batchTx struct { pending int } +// Lock is supposed to be called only by the unit test. func (t *batchTx) Lock() { + ValidateCalledInsideUnittest(t.backend.lg) + t.lock() +} + +func (t *batchTx) lock() { t.Mutex.Lock() } func (t *batchTx) LockInsideApply() { - ValidateCalledInsideApply(t.backend.lg) - t.Lock() + t.lock() + if t.backend.txPostLockInsideApplyHook != nil { + // The callers of some methods (i.e., (*RaftCluster).AddMember) + // can be coming from both InsideApply and OutsideApply, but the + // callers from OutsideApply will have a nil txPostLockInsideApplyHook. + // So we should check the txPostLockInsideApplyHook before validating + // the callstack. + ValidateCalledInsideApply(t.backend.lg) + t.backend.txPostLockInsideApplyHook() + } } func (t *batchTx) LockOutsideApply() { ValidateCalledOutSideApply(t.backend.lg) - t.Lock() + t.lock() } func (t *batchTx) Unlock() { @@ -226,14 +240,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { - t.Lock() + t.lock() t.commit(false) t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { - t.Lock() + t.lock() t.commit(true) t.Unlock() } @@ -303,13 +317,13 @@ func (t *batchTxBuffered) Unlock() { } func (t *batchTxBuffered) Commit() { - t.Lock() + t.lock() t.commit(false) t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { - t.Lock() + t.lock() t.commit(true) t.Unlock() } diff --git a/server/mvcc/backend/hooks_test.go b/server/mvcc/backend/hooks_test.go index 03a90dd3d..69a1e7216 100644 --- a/server/mvcc/backend/hooks_test.go +++ b/server/mvcc/backend/hooks_test.go @@ -40,8 +40,6 @@ func TestBackendPreCommitHook(t *testing.T) { // Empty commit. tx.Commit() - write(tx, []byte("foo"), []byte("bar")) - assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits") tx.Commit() assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits") diff --git a/server/mvcc/backend/verify.go b/server/mvcc/backend/verify.go index 2f3dc0221..a6a0b8675 100644 --- a/server/mvcc/backend/verify.go +++ b/server/mvcc/backend/verify.go @@ -46,6 +46,15 @@ func ValidateCalledOutSideApply(lg *zap.Logger) { } } +func ValidateCalledInsideUnittest(lg *zap.Logger) { + if !verifyLockEnabled() { + return + } + if !insideUnittest() { + lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace")) + } +} + func verifyLockEnabled() bool { return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK } @@ -54,3 +63,8 @@ func insideApply() bool { stackTraceStr := string(debug.Stack()) return strings.Contains(stackTraceStr, ".applyEntries") } + +func insideUnittest() bool { + stackTraceStr := string(debug.Stack()) + return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/") +} diff --git a/server/mvcc/backend/verify_test.go b/server/mvcc/backend/verify_test.go index f4f4db475..b8f0a84c5 100644 --- a/server/mvcc/backend/verify_test.go +++ b/server/mvcc/backend/verify_test.go @@ -15,7 +15,6 @@ package backend_test import ( - "fmt" "os" "testing" "time" @@ -26,40 +25,60 @@ import ( func TestLockVerify(t *testing.T) { tcs := []struct { - insideApply bool - lock func(tx backend.BatchTx) - expectPanic bool + name string + insideApply bool + lock func(tx backend.BatchTx) + txPostLockInsideApplyHook func() + expectPanic bool }{ { + name: "call lockInsideApply from inside apply", insideApply: true, lock: lockInsideApply, expectPanic: false, }, { + name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)", insideApply: false, lock: lockInsideApply, - expectPanic: true, + expectPanic: false, }, { + name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)", + insideApply: false, + lock: lockInsideApply, + txPostLockInsideApplyHook: func() {}, + expectPanic: true, + }, + { + name: "call lockOutsideApply from outside apply", insideApply: false, lock: lockOutsideApply, expectPanic: false, }, { + name: "call lockOutsideApply from inside apply", insideApply: true, lock: lockOutsideApply, expectPanic: true, }, + { + name: "call Lock from unit test", + insideApply: false, + lock: lockFromUT, + expectPanic: false, + }, } env := os.Getenv("ETCD_VERIFY") os.Setenv("ETCD_VERIFY", "lock") defer func() { os.Setenv("ETCD_VERIFY", env) }() - for i, tc := range tcs { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook) hasPaniced := handlePanic(func() { if tc.insideApply { @@ -89,3 +108,4 @@ func applyEntries(be backend.Backend, f func(tx backend.BatchTx)) { func lockInsideApply(tx backend.BatchTx) { tx.LockInsideApply() } func lockOutsideApply(tx backend.BatchTx) { tx.LockOutsideApply() } +func lockFromUT(tx backend.BatchTx) { tx.Lock() } diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index 54055ed05..ae31d968c 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -119,7 +119,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi } tx := s.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() tx.UnsafeCreateBucket(buckets.Key) tx.UnsafeCreateBucket(buckets.Meta) tx.Unlock() @@ -238,7 +238,7 @@ func (s *store) updateCompactRev(rev int64) (<-chan struct{}, error) { revToBytes(revision{main: rev}, rbytes) tx := s.b.BatchTx() - tx.Lock() + tx.LockInsideApply() tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes) tx.Unlock() // ensure that desired compaction is persisted @@ -334,7 +334,7 @@ func (s *store) restore() error { keyToLease := make(map[string]lease.LeaseID) // restore index - tx := s.b.BatchTx() + tx := s.b.ReadTx() tx.Lock() _, finishedCompactBytes := tx.UnsafeRange(buckets.Meta, finishedCompactKeyName, nil, 0) diff --git a/server/mvcc/kvstore_compaction.go b/server/mvcc/kvstore_compaction.go index 71bd4b736..c63d3f4f4 100644 --- a/server/mvcc/kvstore_compaction.go +++ b/server/mvcc/kvstore_compaction.go @@ -39,7 +39,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc start := time.Now() tx := s.b.BatchTx() - tx.Lock() + tx.LockOutsideApply() keys, _ := tx.UnsafeRange(buckets.Key, last, end, int64(s.cfg.CompactionBatchLimit)) for _, key := range keys { rev = bytesToRev(key) diff --git a/server/mvcc/kvstore_test.go b/server/mvcc/kvstore_test.go index d2a9e55e8..e6d437fb1 100644 --- a/server/mvcc/kvstore_test.go +++ b/server/mvcc/kvstore_test.go @@ -871,6 +871,8 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } +func (b *fakeBatchTx) LockInsideApply() {} +func (b *fakeBatchTx) LockOutsideApply() {} func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} @@ -894,10 +896,8 @@ func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) { func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error { return nil } -func (b *fakeBatchTx) Commit() {} -func (b *fakeBatchTx) CommitAndStop() {} -func (b *fakeBatchTx) LockInsideApply() {} -func (b *fakeBatchTx) LockOutsideApply() {} +func (b *fakeBatchTx) Commit() {} +func (b *fakeBatchTx) CommitAndStop() {} type fakeBackend struct { tx *fakeBatchTx @@ -914,6 +914,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {} type indexGetResp struct { rev revision diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index 93d7db20e..9df7b7941 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -78,7 +78,7 @@ type storeTxnWrite struct { func (s *store) Write(trace *traceutil.Trace) TxnWrite { s.mu.RLock() tx := s.b.BatchTx() - tx.Lock() + tx.LockInsideApply() tw := &storeTxnWrite{ storeTxnRead: storeTxnRead{s, tx, 0, 0, trace}, tx: tx, diff --git a/server/mvcc/util.go b/server/mvcc/util.go index 83cbf44bf..c4c0ff2f0 100644 --- a/server/mvcc/util.go +++ b/server/mvcc/util.go @@ -31,7 +31,7 @@ func WriteKV(be backend.Backend, kv mvccpb.KeyValue) { panic(fmt.Errorf("cannot marshal event: %v", err)) } - be.BatchTx().Lock() + be.BatchTx().LockOutsideApply() be.BatchTx().UnsafePut(buckets.Key, ibytes, d) be.BatchTx().Unlock() } diff --git a/server/verify/verify.go b/server/verify/verify.go index f727201ce..ef613d7eb 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) { } func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { - tx := be.BatchTx() - index, term := cindex.ReadConsistentIndex(tx) + index, term := cindex.ReadConsistentIndex(be.ReadTx()) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) }