From bfd5170f66812b43775b95e11ccb742d61cff466 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Wed, 30 Mar 2022 15:26:31 +0800 Subject: [PATCH 1/6] add a txPostLockHook into the backend Previously the SetConsistentIndex() is called during the apply workflow, but it's outside the db transaction. If a commit happens between SetConsistentIndex and the following apply workflow, and etcd crashes for whatever reason right after the commit, then etcd commits an incomplete transaction to db. Eventually etcd runs into the data inconsistency issue. In this commit, we move the SetConsistentIndex into a txPostLockHook, so it will be executed inside the transaction lock. --- etcdutl/etcdutl/migrate_command.go | 4 +-- server/etcdserver/adapters.go | 8 +++--- server/etcdserver/cindex/cindex.go | 23 ++++++++++++++--- server/etcdserver/server.go | 31 ++++++++++++++++++++--- server/etcdserver/server_test.go | 7 ++--- server/lease/lessor.go | 2 +- server/storage/backend.go | 2 +- server/storage/backend/backend.go | 16 +++++++++++- server/storage/backend/batch_tx.go | 16 +++++++++--- server/storage/mvcc/kvstore.go | 4 +-- server/storage/mvcc/kvstore_compaction.go | 2 +- server/storage/mvcc/kvstore_test.go | 2 ++ server/storage/schema/alarm.go | 2 +- server/storage/schema/auth.go | 2 +- server/storage/schema/cindex.go | 6 ++--- server/storage/schema/membership.go | 6 ++--- server/storage/schema/migration.go | 4 +-- server/storage/schema/schema.go | 8 +++--- server/verify/verify.go | 3 +-- 19 files changed, 104 insertions(+), 44 deletions(-) diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 88f9e473c..195576e31 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -118,7 +118,7 @@ type migrateConfig struct { func migrateCommandFunc(c *migrateConfig) error { defer c.be.Close() tx := c.be.BatchTx() - current, err := schema.DetectSchemaVersion(c.lg, tx) + current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx()) if err != nil { c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") return err @@ -140,7 +140,7 @@ func migrateCommandFunc(c *migrateConfig) error { } func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() // Storage version is only supported since v3.6 if target.LessThan(schema.V3_6) { diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 9511bc6a6..5f1bcfef1 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -78,9 +78,9 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { s.bemu.RLock() defer s.bemu.RUnlock() - tx := s.be.BatchTx() - tx.Lock() - defer tx.Unlock() + tx := s.be.ReadTx() + tx.RLock() + defer tx.RUnlock() v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx) if err != nil { return nil @@ -94,7 +94,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error defer s.bemu.RUnlock() tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target) } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 24dad6603..7ec1b1212 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -23,6 +23,7 @@ import ( ) type Backend interface { + ReadTx() backend.ReadTx BatchTx() backend.BatchTx } @@ -32,6 +33,9 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. + UnsafeConsistentIndex() uint64 + // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) @@ -73,7 +77,19 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v, term := schema.ReadConsistentIndex(ci.be.BatchTx()) + v, term := schema.ReadConsistentIndex(ci.be.ReadTx()) + ci.SetConsistentIndex(v, term) + return v +} + +// UnsafeConsistentIndex is similar to ConsistentIndex, +// but it shouldn't lock the transaction. +func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { + if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { + return index + } + + v, term := schema.UnsafeReadConsistentIndex(ci.be.BatchTx()) ci.SetConsistentIndex(v, term) return v } @@ -106,7 +122,8 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index } func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) @@ -117,7 +134,7 @@ func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index f5632318e..b22f680bb 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -207,8 +207,10 @@ type EtcdServer struct { term uint64 // must use atomic operations to access; keep 64-bit aligned. lead uint64 // must use atomic operations to access; keep 64-bit aligned. - consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex - r raftNode // uses 64-bit atomics; keep 64-bit aligned. + consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned. + consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned. + consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex + r raftNode // uses 64-bit atomics; keep 64-bit aligned. readych chan struct{} Cfg config.ServerConfig @@ -341,6 +343,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = b.storage.backend.be + srv.be.SetTxPostLockHook(srv.getTxPostLockHook()) srv.beHooks = b.storage.backend.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat @@ -978,6 +981,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) + newbe.SetTxPostLockHook(s.getTxPostLockHook()) + lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns @@ -1547,6 +1552,15 @@ func (s *EtcdServer) getTerm() uint64 { return atomic.LoadUint64(&s.term) } +func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) { + atomic.StoreUint64(&s.consistentIdx, cIdx) + atomic.StoreUint64(&s.consistentTerm, cTerm) +} + +func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) { + return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm) +} + func (s *EtcdServer) setLead(v uint64) { atomic.StoreUint64(&s.lead, v) } @@ -1771,7 +1785,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setConsistentIndexAndTerm(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -1801,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setConsistentIndexAndTerm(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -2296,3 +2310,12 @@ func (s *EtcdServer) raftStatus() raft.Status { func (s *EtcdServer) Version() *serverversion.Manager { return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s)) } + +func (s *EtcdServer) getTxPostLockHook() func() { + return func() { + cIdx, term := s.getConsistentIndexAndTerm() + if cIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(cIdx, term) + } + } +} diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index cb4386976..706c78549 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -687,9 +687,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) consistIndex := srv.consistIndex.ConsistentIndex() - if consistIndex != appliedi { - t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi) - } + assert.Equal(t, uint64(2), appliedi) t.Run("verify-backend", func(t *testing.T) { tx := be.BatchTx() @@ -698,9 +696,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := schema.ReadConsistentIndex(be.BatchTx()) + rindex, _ := schema.ReadConsistentIndex(be.ReadTx()) assert.Equal(t, consistIndex, rindex) - assert.Equal(t, uint64(4), rterm) } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index cb6f0d6ec..4af816c76 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() schema.UnsafeCreateLeaseBucket(tx) lpbs := schema.MustUnsafeGetAllLeases(tx) tx.Unlock() diff --git a/server/storage/backend.go b/server/storage/backend.go index b7b0eb2fc..b1101cfa6 100644 --- a/server/storage/backend.go +++ b/server/storage/backend.go @@ -99,7 +99,7 @@ func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks *BackendHooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx()) + consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index a61f4ec36..f949f282b 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -67,6 +67,9 @@ type Backend interface { Defrag() error ForceCommit() Close() error + + // SetTxPostLockHook sets a txPostLockHook. + SetTxPostLockHook(func()) } type Snapshot interface { @@ -119,6 +122,9 @@ type backend struct { hooks Hooks + // txPostLockHook is called each time right after locking the tx. + txPostLockHook func() + lg *zap.Logger } @@ -227,6 +233,14 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } +func (b *backend) SetTxPostLockHook(hook func()) { + // It needs to lock the batchTx, because the periodic commit + // may be accessing the txPostLockHook at the moment. + b.batchTx.LockWithoutHook() + defer b.batchTx.Unlock() + b.txPostLockHook = hook +} + func (b *backend) ReadTx() ReadTx { return b.readTx } // ConcurrentReadTx creates and returns a new ReadTx, which: @@ -438,7 +452,7 @@ func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. - b.batchTx.Lock() + b.batchTx.LockWithoutHook() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index a8e649199..a27266bb2 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -55,6 +55,7 @@ type BatchTx interface { CommitAndStop() LockInsideApply() LockOutsideApply() + } type batchTx struct { @@ -66,6 +67,13 @@ type batchTx struct { } func (t *batchTx) Lock() { + t.LockWithoutHook() + if t.backend.txPostLockHook != nil { + t.backend.txPostLockHook() + } +} + +func (t *batchTx) LockWithoutHook() { t.Mutex.Lock() } @@ -226,14 +234,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { - t.Lock() + t.LockWithoutHook() t.commit(false) t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { - t.Lock() + t.LockWithoutHook() t.commit(true) t.Unlock() } @@ -303,13 +311,13 @@ func (t *batchTxBuffered) Unlock() { } func (t *batchTxBuffered) Commit() { - t.Lock() + t.LockWithoutHook() t.commit(false) t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { - t.Lock() + t.LockWithoutHook() t.commit(true) t.Unlock() } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 846f83cde..9b79c090a 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi } tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() tx.UnsafeCreateBucket(schema.Key) schema.UnsafeCreateMetaBucket(tx) tx.Unlock() @@ -331,7 +331,7 @@ func (s *store) restore() error { // restore index tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() finishedCompact, found := UnsafeReadFinishedCompact(tx) if found { diff --git a/server/storage/mvcc/kvstore_compaction.go b/server/storage/mvcc/kvstore_compaction.go index ba9440082..941f056a9 100644 --- a/server/storage/mvcc/kvstore_compaction.go +++ b/server/storage/mvcc/kvstore_compaction.go @@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc start := time.Now() tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) for _, key := range keys { rev = bytesToRev(key) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 31cb37655..4a51e9c40 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -881,6 +881,7 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } +func (b *fakeBatchTx) LockWithoutHook() {} func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} @@ -924,6 +925,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) SetTxPostLockHook(func()) {} type indexGetResp struct { rev revision diff --git a/server/storage/schema/alarm.go b/server/storage/schema/alarm.go index 605bb3a0b..09a49994d 100644 --- a/server/storage/schema/alarm.go +++ b/server/storage/schema/alarm.go @@ -34,7 +34,7 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend { func (s *alarmBackend) CreateAlarmBucket() { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Alarm) } diff --git a/server/storage/schema/auth.go b/server/storage/schema/auth.go index 93ef34c37..fc334a8bc 100644 --- a/server/storage/schema/auth.go +++ b/server/storage/schema/auth.go @@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend { func (abe *authBackend) CreateAuthBuckets() { tx := abe.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Auth) tx.UnsafeCreateBucket(AuthUsers) diff --git a/server/storage/schema/cindex.go b/server/storage/schema/cindex.go index d7b06b9ce..38eea6f91 100644 --- a/server/storage/schema/cindex.go +++ b/server/storage/schema/cindex.go @@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) { // CreateMetaBucket creates the `meta` bucket (if it does not exists yet). func CreateMetaBucket(tx backend.BatchTx) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Meta) } @@ -51,8 +51,8 @@ func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { // ReadConsistentIndex loads consistent index and term from given transaction. // returns 0 if the data are not found. func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeReadConsistentIndex(tx) } diff --git a/server/storage/schema/membership.go b/server/storage/schema/membership.go index 844b50a85..153699e69 100644 --- a/server/storage/schema/membership.go +++ b/server/storage/schema/membership.go @@ -61,7 +61,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) { // from the v3 backend. func (s *membershipBackend) TrimClusterFromBackend() error { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeDeleteBucket(Cluster) return nil @@ -121,7 +121,7 @@ func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.M func (s *membershipBackend) TrimMembershipFromBackend() error { s.lg.Info("Trimming membership information from the backend...") tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() err := tx.UnsafeForEach(Members, func(k, v []byte) error { tx.UnsafeDelete(Members, k) @@ -167,7 +167,7 @@ func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.Downgr func (s *membershipBackend) MustCreateBackendBuckets() { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Members) tx.UnsafeCreateBucket(MembersRemoved) diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index 47734b4b8..e1e44dab5 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -49,7 +49,7 @@ func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (pla } func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return p.unsafeExecute(lg, tx) } @@ -90,7 +90,7 @@ func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange) // execute runs actions required to migrate etcd storage between two minor versions. func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return s.unsafeExecute(lg, tx) } diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 850b55d5b..2b4c15f29 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -31,7 +31,7 @@ var ( // Validate checks provided backend to confirm that schema used is supported. func Validate(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return unsafeValidate(lg, tx) } @@ -60,7 +60,7 @@ type WALVersion interface { // Migrate updates storage schema to provided target version. // Downgrading requires that provided WAL doesn't contain unsupported entries. func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return UnsafeMigrate(lg, tx, w, target) } @@ -89,8 +89,8 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semv // * v3.5 will return it's version if it includes all storage fields added in v3.5 (might require a snapshot). // * v3.4 and older is not supported and will return error. func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeDetectSchemaVersion(lg, tx) } diff --git a/server/verify/verify.go b/server/verify/verify.go index 9402e5eb5..2b73fbc07 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) { } func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { - tx := be.BatchTx() - index, term := schema.ReadConsistentIndex(tx) + index, term := schema.ReadConsistentIndex(be.ReadTx()) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) } From a4c5da844d732e9308646c117adc45b654608427 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Thu, 31 Mar 2022 17:06:48 +0800 Subject: [PATCH 2/6] added detailed comment to explain the difference between Lock and LockWithoutHook --- server/storage/backend/batch_tx.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index a27266bb2..8628d9aaa 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -55,7 +55,6 @@ type BatchTx interface { CommitAndStop() LockInsideApply() LockOutsideApply() - } type batchTx struct { From 7ac995cdde6ae56beba93b040fe231dfca03c38d Mon Sep 17 00:00:00 2001 From: ahrtr Date: Sat, 2 Apr 2022 06:02:22 +0800 Subject: [PATCH 3/6] enhanced authBackend to support authReadTx --- server/auth/range_perm_cache.go | 4 +- server/auth/store.go | 12 ++++-- server/auth/store_mock_test.go | 4 ++ server/etcdserver/cindex/cindex.go | 2 +- server/etcdserver/server.go | 5 ++- server/storage/schema/auth.go | 54 +++++++++++++++++++------ server/storage/schema/auth_roles.go | 62 +++++++++++++++++------------ server/storage/schema/auth_users.go | 50 +++++++++++++---------- 8 files changed, 127 insertions(+), 66 deletions(-) diff --git a/server/auth/range_perm_cache.go b/server/auth/range_perm_cache.go index bae07ef52..2ebe5439b 100644 --- a/server/auth/range_perm_cache.go +++ b/server/auth/range_perm_cache.go @@ -20,7 +20,7 @@ import ( "go.uber.org/zap" ) -func getMergedPerms(tx AuthBatchTx, userName string) *unifiedRangePermissions { +func getMergedPerms(tx AuthReadTx, userName string) *unifiedRangePermissions { user := tx.UnsafeGetUser(userName) if user == nil { return nil @@ -103,7 +103,7 @@ func checkKeyPoint(lg *zap.Logger, cachedPerms *unifiedRangePermissions, key []b return false } -func (as *authStore) isRangeOpPermitted(tx AuthBatchTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { +func (as *authStore) isRangeOpPermitted(tx AuthReadTx, userName string, key, rangeEnd []byte, permtyp authpb.Permission_Type) bool { // assumption: tx is Lock()ed _, ok := as.rangePermCache[userName] if !ok { diff --git a/server/auth/store.go b/server/auth/store.go index 408b235ba..762caecd7 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -196,6 +196,7 @@ type TokenProvider interface { type AuthBackend interface { CreateAuthBuckets() ForceCommit() + ReadTx() AuthReadTx BatchTx() AuthBatchTx GetUser(string) *authpb.User @@ -345,7 +346,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { // CompareHashAndPassword is very expensive, so we use closures // to avoid putting it in the critical section of the tx lock. revision, err := func() (uint64, error) { - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -855,7 +856,7 @@ func (as *authStore) isOpPermitted(userName string, revision uint64, key, rangeE return ErrAuthOldRevision } - tx := as.be.BatchTx() + tx := as.be.ReadTx() tx.Lock() defer tx.Unlock() @@ -897,7 +898,10 @@ func (as *authStore) IsAdminPermitted(authInfo *AuthInfo) error { return ErrUserEmpty } - u := as.be.GetUser(authInfo.Username) + tx := as.be.ReadTx() + tx.Lock() + defer tx.Unlock() + u := tx.UnsafeGetUser(authInfo.Username) if u == nil { return ErrUserNotFound @@ -935,6 +939,8 @@ func NewAuthStore(lg *zap.Logger, be AuthBackend, tp TokenProvider, bcryptCost i be.CreateAuthBuckets() tx := be.BatchTx() + // We should call LockWithoutHook here, but the txPostLockHoos isn't set + // to EtcdServer yet, so it's OK. tx.Lock() enabled := tx.UnsafeReadAuthEnabled() as := &authStore{ diff --git a/server/auth/store_mock_test.go b/server/auth/store_mock_test.go index d49f8dd33..39c3f6d13 100644 --- a/server/auth/store_mock_test.go +++ b/server/auth/store_mock_test.go @@ -36,6 +36,10 @@ func (b *backendMock) CreateAuthBuckets() { func (b *backendMock) ForceCommit() { } +func (b *backendMock) ReadTx() AuthReadTx { + return &txMock{be: b} +} + func (b *backendMock) BatchTx() AuthBatchTx { return &txMock{be: b} } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 7ec1b1212..6367967f8 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -89,7 +89,7 @@ func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { return index } - v, term := schema.UnsafeReadConsistentIndex(ci.be.BatchTx()) + v, term := schema.UnsafeReadConsistentIndex(ci.be.ReadTx()) ci.SetConsistentIndex(v, term) return v } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index b22f680bb..6a89f4592 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -343,7 +343,6 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = b.storage.backend.be - srv.be.SetTxPostLockHook(srv.getTxPostLockHook()) srv.beHooks = b.storage.backend.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat @@ -404,6 +403,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { }) } + // Set the hook after EtcdServer finishes the initialization to avoid + // the hook being called during the initialization process. + srv.be.SetTxPostLockHook(srv.getTxPostLockHook()) + // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ Logger: cfg.Logger, diff --git a/server/storage/schema/auth.go b/server/storage/schema/auth.go index fc334a8bc..3956ca782 100644 --- a/server/storage/schema/auth.go +++ b/server/storage/schema/auth.go @@ -60,15 +60,25 @@ func (abe *authBackend) ForceCommit() { abe.be.ForceCommit() } +func (abe *authBackend) ReadTx() auth.AuthReadTx { + return &authReadTx{tx: abe.be.ReadTx(), lg: abe.lg} +} + func (abe *authBackend) BatchTx() auth.AuthBatchTx { return &authBatchTx{tx: abe.be.BatchTx(), lg: abe.lg} } +type authReadTx struct { + tx backend.ReadTx + lg *zap.Logger +} + type authBatchTx struct { tx backend.BatchTx lg *zap.Logger } +var _ auth.AuthReadTx = (*authReadTx)(nil) var _ auth.AuthBatchTx = (*authBatchTx)(nil) func (atx *authBatchTx) UnsafeSaveAuthEnabled(enabled bool) { @@ -86,22 +96,13 @@ func (atx *authBatchTx) UnsafeSaveAuthRevision(rev uint64) { } func (atx *authBatchTx) UnsafeReadAuthEnabled() bool { - _, vs := atx.tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0) - if len(vs) == 1 { - if bytes.Equal(vs[0], authEnabled) { - return true - } - } - return false + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeReadAuthEnabled() } func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 { - _, vs := atx.tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0) - if len(vs) != 1 { - // this can happen in the initialization phase - return 0 - } - return binary.BigEndian.Uint64(vs[0]) + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeReadAuthRevision() } func (atx *authBatchTx) Lock() { @@ -111,3 +112,30 @@ func (atx *authBatchTx) Lock() { func (atx *authBatchTx) Unlock() { atx.tx.Unlock() } + +func (atx *authReadTx) UnsafeReadAuthEnabled() bool { + _, vs := atx.tx.UnsafeRange(Auth, AuthEnabledKeyName, nil, 0) + if len(vs) == 1 { + if bytes.Equal(vs[0], authEnabled) { + return true + } + } + return false +} + +func (atx *authReadTx) UnsafeReadAuthRevision() uint64 { + _, vs := atx.tx.UnsafeRange(Auth, AuthRevisionKeyName, nil, 0) + if len(vs) != 1 { + // this can happen in the initialization phase + return 0 + } + return binary.BigEndian.Uint64(vs[0]) +} + +func (atx *authReadTx) Lock() { + atx.tx.RLock() +} + +func (atx *authReadTx) Unlock() { + atx.tx.RUnlock() +} diff --git a/server/storage/schema/auth_roles.go b/server/storage/schema/auth_roles.go index 541e37b71..dfda7ce5b 100644 --- a/server/storage/schema/auth_roles.go +++ b/server/storage/schema/auth_roles.go @@ -32,17 +32,8 @@ func (abe *authBackend) GetRole(roleName string) *authpb.Role { } func (atx *authBatchTx) UnsafeGetRole(roleName string) *authpb.Role { - _, vs := atx.tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0) - if len(vs) == 0 { - return nil - } - - role := &authpb.Role{} - err := role.Unmarshal(vs[0]) - if err != nil { - atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err)) - } - return role + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetRole(roleName) } func (abe *authBackend) GetAllRoles() []*authpb.Role { @@ -53,21 +44,8 @@ func (abe *authBackend) GetAllRoles() []*authpb.Role { } func (atx *authBatchTx) UnsafeGetAllRoles() []*authpb.Role { - _, vs := atx.tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1) - if len(vs) == 0 { - return nil - } - - roles := make([]*authpb.Role, len(vs)) - for i := range vs { - role := &authpb.Role{} - err := role.Unmarshal(vs[i]) - if err != nil { - atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err)) - } - roles[i] = role - } - return roles + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetAllRoles() } func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) { @@ -86,3 +64,35 @@ func (atx *authBatchTx) UnsafePutRole(role *authpb.Role) { func (atx *authBatchTx) UnsafeDeleteRole(rolename string) { atx.tx.UnsafeDelete(AuthRoles, []byte(rolename)) } + +func (atx *authReadTx) UnsafeGetRole(roleName string) *authpb.Role { + _, vs := atx.tx.UnsafeRange(AuthRoles, []byte(roleName), nil, 0) + if len(vs) == 0 { + return nil + } + + role := &authpb.Role{} + err := role.Unmarshal(vs[0]) + if err != nil { + atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err)) + } + return role +} + +func (atx *authReadTx) UnsafeGetAllRoles() []*authpb.Role { + _, vs := atx.tx.UnsafeRange(AuthRoles, []byte{0}, []byte{0xff}, -1) + if len(vs) == 0 { + return nil + } + + roles := make([]*authpb.Role, len(vs)) + for i := range vs { + role := &authpb.Role{} + err := role.Unmarshal(vs[i]) + if err != nil { + atx.lg.Panic("failed to unmarshal 'authpb.Role'", zap.Error(err)) + } + roles[i] = role + } + return roles +} diff --git a/server/storage/schema/auth_users.go b/server/storage/schema/auth_users.go index f385afa51..c3e7a92ff 100644 --- a/server/storage/schema/auth_users.go +++ b/server/storage/schema/auth_users.go @@ -27,6 +27,35 @@ func (abe *authBackend) GetUser(username string) *authpb.User { } func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetUser(username) +} + +func (abe *authBackend) GetAllUsers() []*authpb.User { + tx := abe.BatchTx() + tx.Lock() + defer tx.Unlock() + return tx.UnsafeGetAllUsers() +} + +func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User { + arx := &authReadTx{tx: atx.tx, lg: atx.lg} + return arx.UnsafeGetAllUsers() +} + +func (atx *authBatchTx) UnsafePutUser(user *authpb.User) { + b, err := user.Marshal() + if err != nil { + atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err)) + } + atx.tx.UnsafePut(AuthUsers, user.Name, b) +} + +func (atx *authBatchTx) UnsafeDeleteUser(username string) { + atx.tx.UnsafeDelete(AuthUsers, []byte(username)) +} + +func (atx *authReadTx) UnsafeGetUser(username string) *authpb.User { _, vs := atx.tx.UnsafeRange(AuthUsers, []byte(username), nil, 0) if len(vs) == 0 { return nil @@ -44,14 +73,7 @@ func (atx *authBatchTx) UnsafeGetUser(username string) *authpb.User { return user } -func (abe *authBackend) GetAllUsers() []*authpb.User { - tx := abe.BatchTx() - tx.Lock() - defer tx.Unlock() - return tx.UnsafeGetAllUsers() -} - -func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User { +func (atx *authReadTx) UnsafeGetAllUsers() []*authpb.User { _, vs := atx.tx.UnsafeRange(AuthUsers, []byte{0}, []byte{0xff}, -1) if len(vs) == 0 { return nil @@ -68,15 +90,3 @@ func (atx *authBatchTx) UnsafeGetAllUsers() []*authpb.User { } return users } - -func (atx *authBatchTx) UnsafePutUser(user *authpb.User) { - b, err := user.Marshal() - if err != nil { - atx.lg.Panic("failed to unmarshal 'authpb.User'", zap.Error(err)) - } - atx.tx.UnsafePut(AuthUsers, user.Name, b) -} - -func (atx *authBatchTx) UnsafeDeleteUser(username string) { - atx.tx.UnsafeDelete(AuthUsers, []byte(username)) -} From 47038593e9eba0993156a6ddef2e1500758685a1 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Mon, 4 Apr 2022 19:56:38 +0800 Subject: [PATCH 4/6] set the consistent_index directly when applyV3 isn't performed --- server/etcdserver/server.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 6a89f4592..612454227 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1815,6 +1815,14 @@ func (s *EtcdServer) apply( // applyEntryNormal applies an EntryNormal type raftpb request to the EtcdServer func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { shouldApplyV3 := membership.ApplyV2storeOnly + applyV3Performed := false + defer func() { + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && !applyV3Performed && membership.ApplyBoth == shouldApplyV3 { + s.consistIndex.SetConsistentIndex(e.Index, e.Term) + } + }() index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry @@ -1870,6 +1878,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { if !needResult && raftReq.Txn != nil { removeNeedlessRangeReqs(raftReq.Txn) } + applyV3Performed = true ar = s.applyV3.Apply(&raftReq, shouldApplyV3) } @@ -1912,6 +1921,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if err := s.cluster.ValidateConfigurationChange(cc); err != nil { cc.NodeID = raft.None s.r.ApplyConfChange(cc) + + // The txPostLock callback will not get called in this case, + // so we should set the consistent index directly. + if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 { + s.consistIndex.SetConsistentIndex(s.consistentIdx, s.consistentTerm) + } return false, err } From e155e50886316b8ff6c0d3e6362406ca2fb11a7d Mon Sep 17 00:00:00 2001 From: ahrtr Date: Wed, 6 Apr 2022 05:07:07 +0800 Subject: [PATCH 5/6] rename LockWithoutHook to LockOutsideApply and add LockInsideApply --- etcdutl/etcdutl/backup_command.go | 2 +- etcdutl/etcdutl/migrate_command.go | 2 +- server/auth/store.go | 4 +-- server/etcdserver/adapters.go | 2 +- server/etcdserver/bootstrap.go | 2 +- server/etcdserver/cindex/cindex.go | 4 +-- server/etcdserver/server.go | 4 +-- server/lease/lessor.go | 4 +-- server/storage/backend/backend.go | 10 +++---- server/storage/backend/batch_tx.go | 30 ++++++++++++-------- server/storage/backend/hooks_test.go | 2 -- server/storage/backend/verify.go | 14 ++++++++++ server/storage/backend/verify_test.go | 34 ++++++++++++++++++----- server/storage/mvcc/kvstore.go | 4 +-- server/storage/mvcc/kvstore_compaction.go | 2 +- server/storage/mvcc/kvstore_test.go | 11 ++++---- server/storage/mvcc/kvstore_txn.go | 2 +- server/storage/mvcc/store.go | 4 +-- server/storage/schema/alarm.go | 6 ++-- server/storage/schema/auth.go | 4 +-- server/storage/schema/cindex.go | 2 +- server/storage/schema/membership.go | 14 +++++----- server/storage/schema/migration.go | 4 +-- server/storage/schema/schema.go | 8 +++--- server/storage/schema/schema_test.go | 2 +- 25 files changed, 106 insertions(+), 71 deletions(-) diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index 49b6a9310..e0d53fc1b 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir if !v3 { tx := be.BatchTx() - tx.Lock() + tx.LockOutsideApply() defer tx.Unlock() schema.UnsafeCreateMetaBucket(tx) schema.UnsafeUpdateConsistentIndex(tx, idx, term, false) diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 195576e31..87b10664f 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -140,7 +140,7 @@ func migrateCommandFunc(c *migrateConfig) error { } func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() // Storage version is only supported since v3.6 if target.LessThan(schema.V3_6) { diff --git a/server/auth/store.go b/server/auth/store.go index 762caecd7..2d978a011 100644 --- a/server/auth/store.go +++ b/server/auth/store.go @@ -374,7 +374,7 @@ func (as *authStore) CheckPassword(username, password string) (uint64, error) { func (as *authStore) Recover(be AuthBackend) { as.be = be - tx := be.BatchTx() + tx := be.ReadTx() tx.Lock() enabled := tx.UnsafeReadAuthEnabled() @@ -939,7 +939,7 @@ func NewAuthStore(lg *zap.Logger, be AuthBackend, tp TokenProvider, bcryptCost i be.CreateAuthBuckets() tx := be.BatchTx() - // We should call LockWithoutHook here, but the txPostLockHoos isn't set + // We should call LockOutsideApply here, but the txPostLockHoos isn't set // to EtcdServer yet, so it's OK. tx.Lock() enabled := tx.UnsafeReadAuthEnabled() diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 5f1bcfef1..d875cf14e 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -94,7 +94,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error defer s.bemu.RUnlock() tx := s.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target) } diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 857e7afa6..43605be5e 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -231,7 +231,7 @@ func bootstrapBackend(cfg config.ServerConfig, haveWAL bool, st v2store.Store, s } } if beExist { - err = schema.Validate(cfg.Logger, be.BatchTx()) + err = schema.Validate(cfg.Logger, be.ReadTx()) if err != nil { cfg.Logger.Error("Failed to validate schema", zap.Error(err)) return nil, err diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 6367967f8..91046cd03 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -82,8 +82,6 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { return v } -// UnsafeConsistentIndex is similar to ConsistentIndex, -// but it shouldn't lock the transaction. func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { return index @@ -134,7 +132,7 @@ func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 612454227..015bcaf6f 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -405,7 +405,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // Set the hook after EtcdServer finishes the initialization to avoid // the hook being called during the initialization process. - srv.be.SetTxPostLockHook(srv.getTxPostLockHook()) + srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockHook()) // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ @@ -984,7 +984,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) - newbe.SetTxPostLockHook(s.getTxPostLockHook()) + newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockHook()) lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) diff --git a/server/lease/lessor.go b/server/lease/lessor.go index 4af816c76..931cb3d09 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() schema.UnsafeCreateLeaseBucket(tx) lpbs := schema.MustUnsafeGetAllLeases(tx) tx.Unlock() @@ -845,7 +845,7 @@ func (l *Lease) expired() bool { func (l *Lease) persistTo(b backend.Backend) { lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} tx := b.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() schema.MustUnsafePutLease(tx, &lpb) } diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index f949f282b..ebb99ee2c 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -68,8 +68,8 @@ type Backend interface { ForceCommit() Close() error - // SetTxPostLockHook sets a txPostLockHook. - SetTxPostLockHook(func()) + // SetTxPostLockInsideApplyHook sets a txPostLockHook. + SetTxPostLockInsideApplyHook(func()) } type Snapshot interface { @@ -233,10 +233,10 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } -func (b *backend) SetTxPostLockHook(hook func()) { +func (b *backend) SetTxPostLockInsideApplyHook(hook func()) { // It needs to lock the batchTx, because the periodic commit // may be accessing the txPostLockHook at the moment. - b.batchTx.LockWithoutHook() + b.batchTx.lock() defer b.batchTx.Unlock() b.txPostLockHook = hook } @@ -452,7 +452,7 @@ func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. - b.batchTx.LockWithoutHook() + b.batchTx.LockOutsideApply() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 8628d9aaa..7eca835fd 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -65,25 +65,31 @@ type batchTx struct { pending int } +// Lock is supposed to be called only by the unit test. func (t *batchTx) Lock() { - t.LockWithoutHook() - if t.backend.txPostLockHook != nil { - t.backend.txPostLockHook() - } + ValidateCalledInsideUnittest(t.backend.lg) + t.lock() } -func (t *batchTx) LockWithoutHook() { +func (t *batchTx) lock() { t.Mutex.Lock() } func (t *batchTx) LockInsideApply() { - ValidateCalledInsideApply(t.backend.lg) - t.Lock() + t.lock() + if t.backend.txPostLockHook != nil { + // The callers of some methods (i.e., (*RaftCluster).AddMember) + // can be coming from both InsideApply and OutsideApply, but the + // callers from OutsideApply will have a nil txPostLockHook. So we + // should check the txPostLockHook before validating the callstack. + ValidateCalledInsideApply(t.backend.lg) + t.backend.txPostLockHook() + } } func (t *batchTx) LockOutsideApply() { ValidateCalledOutSideApply(t.backend.lg) - t.Lock() + t.lock() } func (t *batchTx) Unlock() { @@ -233,14 +239,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { - t.LockWithoutHook() + t.lock() t.commit(false) t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { - t.LockWithoutHook() + t.lock() t.commit(true) t.Unlock() } @@ -310,13 +316,13 @@ func (t *batchTxBuffered) Unlock() { } func (t *batchTxBuffered) Commit() { - t.LockWithoutHook() + t.lock() t.commit(false) t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { - t.LockWithoutHook() + t.lock() t.commit(true) t.Unlock() } diff --git a/server/storage/backend/hooks_test.go b/server/storage/backend/hooks_test.go index 766464484..b77efbba4 100644 --- a/server/storage/backend/hooks_test.go +++ b/server/storage/backend/hooks_test.go @@ -41,8 +41,6 @@ func TestBackendPreCommitHook(t *testing.T) { // Empty commit. tx.Commit() - write(tx, []byte("foo"), []byte("bar")) - assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits") tx.Commit() assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits") diff --git a/server/storage/backend/verify.go b/server/storage/backend/verify.go index 2f3dc0221..a6a0b8675 100644 --- a/server/storage/backend/verify.go +++ b/server/storage/backend/verify.go @@ -46,6 +46,15 @@ func ValidateCalledOutSideApply(lg *zap.Logger) { } } +func ValidateCalledInsideUnittest(lg *zap.Logger) { + if !verifyLockEnabled() { + return + } + if !insideUnittest() { + lg.Fatal("Lock called outside of unit test!", zap.Stack("stacktrace")) + } +} + func verifyLockEnabled() bool { return os.Getenv(ENV_VERIFY) == ENV_VERIFY_ALL_VALUE || os.Getenv(ENV_VERIFY) == ENV_VERIFY_LOCK } @@ -54,3 +63,8 @@ func insideApply() bool { stackTraceStr := string(debug.Stack()) return strings.Contains(stackTraceStr, ".applyEntries") } + +func insideUnittest() bool { + stackTraceStr := string(debug.Stack()) + return strings.Contains(stackTraceStr, "_test.go") && !strings.Contains(stackTraceStr, "tests/") +} diff --git a/server/storage/backend/verify_test.go b/server/storage/backend/verify_test.go index 08efb3921..2345f46b5 100644 --- a/server/storage/backend/verify_test.go +++ b/server/storage/backend/verify_test.go @@ -15,7 +15,6 @@ package backend_test import ( - "fmt" "os" "testing" "time" @@ -26,40 +25,60 @@ import ( func TestLockVerify(t *testing.T) { tcs := []struct { - insideApply bool - lock func(tx backend.BatchTx) - expectPanic bool + name string + insideApply bool + lock func(tx backend.BatchTx) + txPostLockHook func() + expectPanic bool }{ { + name: "call lockInsideApply from inside apply", insideApply: true, lock: lockInsideApply, expectPanic: false, }, { + name: "call lockInsideApply from outside apply (without txPostLockHook)", insideApply: false, lock: lockInsideApply, - expectPanic: true, + expectPanic: false, }, { + name: "call lockInsideApply from outside apply (with txPostLockHook)", + insideApply: false, + lock: lockInsideApply, + txPostLockHook: func() {}, + expectPanic: true, + }, + { + name: "call lockOutsideApply from outside apply", insideApply: false, lock: lockOutsideApply, expectPanic: false, }, { + name: "call lockOutsideApply from inside apply", insideApply: true, lock: lockOutsideApply, expectPanic: true, }, + { + name: "call Lock from unit test", + insideApply: false, + lock: lockFromUT, + expectPanic: false, + }, } env := os.Getenv("ETCD_VERIFY") os.Setenv("ETCD_VERIFY", "lock") defer func() { os.Setenv("ETCD_VERIFY", env) }() - for i, tc := range tcs { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) + be.SetTxPostLockInsideApplyHook(tc.txPostLockHook) hasPaniced := handlePanic(func() { if tc.insideApply { @@ -89,3 +108,4 @@ func applyEntries(be backend.Backend, f func(tx backend.BatchTx)) { func lockInsideApply(tx backend.BatchTx) { tx.LockInsideApply() } func lockOutsideApply(tx backend.BatchTx) { tx.LockOutsideApply() } +func lockFromUT(tx backend.BatchTx) { tx.Lock() } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 9b79c090a..074f1bea6 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi } tx := s.b.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() tx.UnsafeCreateBucket(schema.Key) schema.UnsafeCreateMetaBucket(tx) tx.Unlock() @@ -331,7 +331,7 @@ func (s *store) restore() error { // restore index tx := s.b.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() finishedCompact, found := UnsafeReadFinishedCompact(tx) if found { diff --git a/server/storage/mvcc/kvstore_compaction.go b/server/storage/mvcc/kvstore_compaction.go index 941f056a9..849f73b95 100644 --- a/server/storage/mvcc/kvstore_compaction.go +++ b/server/storage/mvcc/kvstore_compaction.go @@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc start := time.Now() tx := s.b.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) for _, key := range keys { rev = bytesToRev(key) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 4a51e9c40..2779f10b7 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -881,7 +881,8 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } -func (b *fakeBatchTx) LockWithoutHook() {} +func (b *fakeBatchTx) LockInsideApply() {} +func (b *fakeBatchTx) LockOutsideApply() {} func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} @@ -905,10 +906,8 @@ func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) { func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error { return nil } -func (b *fakeBatchTx) Commit() {} -func (b *fakeBatchTx) CommitAndStop() {} -func (b *fakeBatchTx) LockInsideApply() {} -func (b *fakeBatchTx) LockOutsideApply() {} +func (b *fakeBatchTx) Commit() {} +func (b *fakeBatchTx) CommitAndStop() {} type fakeBackend struct { tx *fakeBatchTx @@ -925,7 +924,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } -func (b *fakeBackend) SetTxPostLockHook(func()) {} +func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {} type indexGetResp struct { rev revision diff --git a/server/storage/mvcc/kvstore_txn.go b/server/storage/mvcc/kvstore_txn.go index fb7a9ca1f..604fac78c 100644 --- a/server/storage/mvcc/kvstore_txn.go +++ b/server/storage/mvcc/kvstore_txn.go @@ -133,7 +133,7 @@ type storeTxnWrite struct { func (s *store) Write(trace *traceutil.Trace) TxnWrite { s.mu.RLock() tx := s.b.BatchTx() - tx.Lock() + tx.LockInsideApply() tw := &storeTxnWrite{ storeTxnRead: storeTxnRead{s, tx, 0, 0, trace}, tx: tx, diff --git a/server/storage/mvcc/store.go b/server/storage/mvcc/store.go index e530c82f4..a002ada71 100644 --- a/server/storage/mvcc/store.go +++ b/server/storage/mvcc/store.go @@ -36,7 +36,7 @@ func UnsafeReadScheduledCompact(tx backend.ReadTx) (scheduledComact int64, found } func SetScheduledCompact(tx backend.BatchTx, value int64) { - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() UnsafeSetScheduledCompact(tx, value) } @@ -48,7 +48,7 @@ func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64) { } func SetFinishedCompact(tx backend.BatchTx, value int64) { - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() UnsafeSetFinishedCompact(tx, value) } diff --git a/server/storage/schema/alarm.go b/server/storage/schema/alarm.go index 09a49994d..825a8dbe0 100644 --- a/server/storage/schema/alarm.go +++ b/server/storage/schema/alarm.go @@ -34,14 +34,14 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend { func (s *alarmBackend) CreateAlarmBucket() { tx := s.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Alarm) } func (s *alarmBackend) MustPutAlarm(alarm *etcdserverpb.AlarmMember) { tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() s.mustUnsafePutAlarm(tx, alarm) } @@ -57,7 +57,7 @@ func (s *alarmBackend) mustUnsafePutAlarm(tx backend.BatchTx, alarm *etcdserverp func (s *alarmBackend) MustDeleteAlarm(alarm *etcdserverpb.AlarmMember) { tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() s.mustUnsafeDeleteAlarm(tx, alarm) } diff --git a/server/storage/schema/auth.go b/server/storage/schema/auth.go index 3956ca782..aa695bb1d 100644 --- a/server/storage/schema/auth.go +++ b/server/storage/schema/auth.go @@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend { func (abe *authBackend) CreateAuthBuckets() { tx := abe.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Auth) tx.UnsafeCreateBucket(AuthUsers) @@ -106,7 +106,7 @@ func (atx *authBatchTx) UnsafeReadAuthRevision() uint64 { } func (atx *authBatchTx) Lock() { - atx.tx.Lock() + atx.tx.LockInsideApply() } func (atx *authBatchTx) Unlock() { diff --git a/server/storage/schema/cindex.go b/server/storage/schema/cindex.go index 38eea6f91..7d215bac6 100644 --- a/server/storage/schema/cindex.go +++ b/server/storage/schema/cindex.go @@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) { // CreateMetaBucket creates the `meta` bucket (if it does not exists yet). func CreateMetaBucket(tx backend.BatchTx) { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Meta) } diff --git a/server/storage/schema/membership.go b/server/storage/schema/membership.go index 153699e69..a42353c41 100644 --- a/server/storage/schema/membership.go +++ b/server/storage/schema/membership.go @@ -52,7 +52,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) { } tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(Members, mkey, mvalue) } @@ -61,7 +61,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) { // from the v3 backend. func (s *membershipBackend) TrimClusterFromBackend() error { tx := s.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeDeleteBucket(Cluster) return nil @@ -71,7 +71,7 @@ func (s *membershipBackend) MustDeleteMemberFromBackend(id types.ID) { mkey := BackendMemberKey(id) tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafeDelete(Members, mkey) tx.UnsafePut(MembersRemoved, mkey, []byte("removed")) @@ -121,7 +121,7 @@ func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.M func (s *membershipBackend) TrimMembershipFromBackend() error { s.lg.Info("Trimming membership information from the backend...") tx := s.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() err := tx.UnsafeForEach(Members, func(k, v []byte) error { tx.UnsafeDelete(Members, k) @@ -146,7 +146,7 @@ func (s *membershipBackend) MustSaveClusterVersionToBackend(ver *semver.Version) ckey := ClusterClusterVersionKeyName tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(Cluster, ckey, []byte(ver.String())) } @@ -160,14 +160,14 @@ func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.Downgr s.lg.Panic("failed to marshal downgrade information", zap.Error(err)) } tx := s.be.BatchTx() - tx.Lock() + tx.LockInsideApply() defer tx.Unlock() tx.UnsafePut(Cluster, dkey, dvalue) } func (s *membershipBackend) MustCreateBackendBuckets() { tx := s.be.BatchTx() - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() tx.UnsafeCreateBucket(Members) tx.UnsafeCreateBucket(MembersRemoved) diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index e1e44dab5..61ea51bf2 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -49,7 +49,7 @@ func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (pla } func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() return p.unsafeExecute(lg, tx) } @@ -90,7 +90,7 @@ func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange) // execute runs actions required to migrate etcd storage between two minor versions. func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() return s.unsafeExecute(lg, tx) } diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 2b4c15f29..68bb212d7 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -30,13 +30,13 @@ var ( ) // Validate checks provided backend to confirm that schema used is supported. -func Validate(lg *zap.Logger, tx backend.BatchTx) error { - tx.LockWithoutHook() +func Validate(lg *zap.Logger, tx backend.ReadTx) error { + tx.Lock() defer tx.Unlock() return unsafeValidate(lg, tx) } -func unsafeValidate(lg *zap.Logger, tx backend.BatchTx) error { +func unsafeValidate(lg *zap.Logger, tx backend.ReadTx) error { current, err := UnsafeDetectSchemaVersion(lg, tx) if err != nil { // v3.5 requires a wal snapshot to persist its fields, so we can assign it a schema version. @@ -60,7 +60,7 @@ type WALVersion interface { // Migrate updates storage schema to provided target version. // Downgrading requires that provided WAL doesn't contain unsupported entries. func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { - tx.LockWithoutHook() + tx.LockOutsideApply() defer tx.Unlock() return UnsafeMigrate(lg, tx, w, target) } diff --git a/server/storage/schema/schema_test.go b/server/storage/schema/schema_test.go index f3c0c4a7f..8dbd337b2 100644 --- a/server/storage/schema/schema_test.go +++ b/server/storage/schema/schema_test.go @@ -88,7 +88,7 @@ func TestValidate(t *testing.T) { b := backend.NewDefaultBackend(lg, dataPath) defer b.Close() - err := Validate(lg, b.BatchTx()) + err := Validate(lg, b.ReadTx()) if (err != nil) != tc.expectError { t.Errorf("Validate(lg, tx) = %+v, expected error: %v", err, tc.expectError) } From 4033f5c2b9b57cd341d1b3522abb03bedbeb9a07 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Thu, 7 Apr 2022 06:11:34 +0800 Subject: [PATCH 6/6] move the consistentIdx and consistentTerm from Etcdserver to cindex package Removed the fields consistentIdx and consistentTerm from struct EtcdServer, and added applyingIndex and applyingTerm into struct consistentIndex in package cindex. We may remove the two fields completely if we decide to remove the OnPreCommitUnsafe, and it will depend on the performance test result. --- server/etcdserver/cindex/cindex.go | 43 +++++++++++++++++++++++++-- server/etcdserver/server.go | 34 ++++++++------------- server/storage/backend/backend.go | 10 +++---- server/storage/backend/batch_tx.go | 9 +++--- server/storage/backend/verify_test.go | 24 +++++++-------- 5 files changed, 75 insertions(+), 45 deletions(-) diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 91046cd03..de64c1c11 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -33,12 +33,18 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // ConsistentApplyingIndex returns the consistent applying index of current executing entry. + ConsistentApplyingIndex() (uint64, uint64) + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. UnsafeConsistentIndex() uint64 // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) + // SetConsistentApplyingIndex set the consistent applying index of current executing entry. + SetConsistentApplyingIndex(v uint64, term uint64) + // UnsafeSave must be called holding the lock on the tx. // It saves consistentIndex to the underlying stable storage. UnsafeSave(tx backend.BatchTx) @@ -58,6 +64,19 @@ type consistentIndex struct { // The value is being persisted in the backend since v3.5. term uint64 + // applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index + // and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be + // saved to consistentIndex and term above in the txPostLockInsideApplyHook. + // + // TODO(ahrtr): try to remove the OnPreCommitUnsafe, and compare the + // performance difference. Afterwards we can make a decision on whether + // or not we should remove OnPreCommitUnsafe. If it is true, then we + // can remove applyingIndex and applyingTerm, and save the e.Index and + // e.Term to consistentIndex and term directly in applyEntries, and + // persist them into db in the txPostLockInsideApplyHook. + applyingIndex uint64 + applyingTerm uint64 + // be is used for initial read consistentIndex be Backend // mutex is protecting be. @@ -111,6 +130,15 @@ func (ci *consistentIndex) SetBackend(be Backend) { ci.SetConsistentIndex(0, 0) } +func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm) +} + +func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) { + atomic.StoreUint64(&ci.applyingIndex, v) + atomic.StoreUint64(&ci.applyingTerm, term) +} + func NewFakeConsistentIndex(index uint64) ConsistentIndexer { return &fakeConsistentIndex{index: index} } @@ -120,13 +148,24 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } -func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} +func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term) +} +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) atomic.StoreUint64(&f.term, term) } +func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) { + atomic.StoreUint64(&f.index, index) + atomic.StoreUint64(&f.term, term) +} func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 015bcaf6f..a3d0c9376 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -207,10 +207,8 @@ type EtcdServer struct { term uint64 // must use atomic operations to access; keep 64-bit aligned. lead uint64 // must use atomic operations to access; keep 64-bit aligned. - consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned. - consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned. - consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex - r raftNode // uses 64-bit atomics; keep 64-bit aligned. + consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex + r raftNode // uses 64-bit atomics; keep 64-bit aligned. readych chan struct{} Cfg config.ServerConfig @@ -405,7 +403,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // Set the hook after EtcdServer finishes the initialization to avoid // the hook being called during the initialization process. - srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockHook()) + srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook()) // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ @@ -984,7 +982,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) - newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockHook()) + newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook()) lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) @@ -1555,15 +1553,6 @@ func (s *EtcdServer) getTerm() uint64 { return atomic.LoadUint64(&s.term) } -func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) { - atomic.StoreUint64(&s.consistentIdx, cIdx) - atomic.StoreUint64(&s.consistentTerm, cTerm) -} - -func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) { - return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm) -} - func (s *EtcdServer) setLead(v uint64) { atomic.StoreUint64(&s.lead, v) } @@ -1788,7 +1777,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.setConsistentIndexAndTerm(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -1826,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.setConsistentIndexAndTerm(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -1925,7 +1914,8 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con // The txPostLock callback will not get called in this case, // so we should set the consistent index directly. if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 { - s.consistIndex.SetConsistentIndex(s.consistentIdx, s.consistentTerm) + applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm) } return false, err } @@ -2329,11 +2319,11 @@ func (s *EtcdServer) Version() *serverversion.Manager { return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s)) } -func (s *EtcdServer) getTxPostLockHook() func() { +func (s *EtcdServer) getTxPostLockInsideApplyHook() func() { return func() { - cIdx, term := s.getConsistentIndexAndTerm() - if cIdx > s.consistIndex.UnsafeConsistentIndex() { - s.consistIndex.SetConsistentIndex(cIdx, term) + applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + if applyingIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm) } } } diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index ebb99ee2c..f30d79062 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -68,7 +68,7 @@ type Backend interface { ForceCommit() Close() error - // SetTxPostLockInsideApplyHook sets a txPostLockHook. + // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook. SetTxPostLockInsideApplyHook(func()) } @@ -122,8 +122,8 @@ type backend struct { hooks Hooks - // txPostLockHook is called each time right after locking the tx. - txPostLockHook func() + // txPostLockInsideApplyHook is called each time right after locking the tx. + txPostLockInsideApplyHook func() lg *zap.Logger } @@ -235,10 +235,10 @@ func (b *backend) BatchTx() BatchTx { func (b *backend) SetTxPostLockInsideApplyHook(hook func()) { // It needs to lock the batchTx, because the periodic commit - // may be accessing the txPostLockHook at the moment. + // may be accessing the txPostLockInsideApplyHook at the moment. b.batchTx.lock() defer b.batchTx.Unlock() - b.txPostLockHook = hook + b.txPostLockInsideApplyHook = hook } func (b *backend) ReadTx() ReadTx { return b.readTx } diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 7eca835fd..c8fa55954 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -77,13 +77,14 @@ func (t *batchTx) lock() { func (t *batchTx) LockInsideApply() { t.lock() - if t.backend.txPostLockHook != nil { + if t.backend.txPostLockInsideApplyHook != nil { // The callers of some methods (i.e., (*RaftCluster).AddMember) // can be coming from both InsideApply and OutsideApply, but the - // callers from OutsideApply will have a nil txPostLockHook. So we - // should check the txPostLockHook before validating the callstack. + // callers from OutsideApply will have a nil txPostLockInsideApplyHook. + // So we should check the txPostLockInsideApplyHook before validating + // the callstack. ValidateCalledInsideApply(t.backend.lg) - t.backend.txPostLockHook() + t.backend.txPostLockInsideApplyHook() } } diff --git a/server/storage/backend/verify_test.go b/server/storage/backend/verify_test.go index 2345f46b5..5cb38ee9d 100644 --- a/server/storage/backend/verify_test.go +++ b/server/storage/backend/verify_test.go @@ -25,11 +25,11 @@ import ( func TestLockVerify(t *testing.T) { tcs := []struct { - name string - insideApply bool - lock func(tx backend.BatchTx) - txPostLockHook func() - expectPanic bool + name string + insideApply bool + lock func(tx backend.BatchTx) + txPostLockInsideApplyHook func() + expectPanic bool }{ { name: "call lockInsideApply from inside apply", @@ -38,17 +38,17 @@ func TestLockVerify(t *testing.T) { expectPanic: false, }, { - name: "call lockInsideApply from outside apply (without txPostLockHook)", + name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)", insideApply: false, lock: lockInsideApply, expectPanic: false, }, { - name: "call lockInsideApply from outside apply (with txPostLockHook)", - insideApply: false, - lock: lockInsideApply, - txPostLockHook: func() {}, - expectPanic: true, + name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)", + insideApply: false, + lock: lockInsideApply, + txPostLockInsideApplyHook: func() {}, + expectPanic: true, }, { name: "call lockOutsideApply from outside apply", @@ -78,7 +78,7 @@ func TestLockVerify(t *testing.T) { t.Run(tc.name, func(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) - be.SetTxPostLockInsideApplyHook(tc.txPostLockHook) + be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook) hasPaniced := handlePanic(func() { if tc.insideApply {