From bfd5170f66812b43775b95e11ccb742d61cff466 Mon Sep 17 00:00:00 2001 From: ahrtr Date: Wed, 30 Mar 2022 15:26:31 +0800 Subject: [PATCH] add a txPostLockHook into the backend Previously the SetConsistentIndex() is called during the apply workflow, but it's outside the db transaction. If a commit happens between SetConsistentIndex and the following apply workflow, and etcd crashes for whatever reason right after the commit, then etcd commits an incomplete transaction to db. Eventually etcd runs into the data inconsistency issue. In this commit, we move the SetConsistentIndex into a txPostLockHook, so it will be executed inside the transaction lock. --- etcdutl/etcdutl/migrate_command.go | 4 +-- server/etcdserver/adapters.go | 8 +++--- server/etcdserver/cindex/cindex.go | 23 ++++++++++++++--- server/etcdserver/server.go | 31 ++++++++++++++++++++--- server/etcdserver/server_test.go | 7 ++--- server/lease/lessor.go | 2 +- server/storage/backend.go | 2 +- server/storage/backend/backend.go | 16 +++++++++++- server/storage/backend/batch_tx.go | 16 +++++++++--- server/storage/mvcc/kvstore.go | 4 +-- server/storage/mvcc/kvstore_compaction.go | 2 +- server/storage/mvcc/kvstore_test.go | 2 ++ server/storage/schema/alarm.go | 2 +- server/storage/schema/auth.go | 2 +- server/storage/schema/cindex.go | 6 ++--- server/storage/schema/membership.go | 6 ++--- server/storage/schema/migration.go | 4 +-- server/storage/schema/schema.go | 8 +++--- server/verify/verify.go | 3 +-- 19 files changed, 104 insertions(+), 44 deletions(-) diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 88f9e473c..195576e31 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -118,7 +118,7 @@ type migrateConfig struct { func migrateCommandFunc(c *migrateConfig) error { defer c.be.Close() tx := c.be.BatchTx() - current, err := schema.DetectSchemaVersion(c.lg, tx) + current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx()) if err != nil { c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") return err @@ -140,7 +140,7 @@ func migrateCommandFunc(c *migrateConfig) error { } func migrateForce(lg *zap.Logger, tx backend.BatchTx, target *semver.Version) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() // Storage version is only supported since v3.6 if target.LessThan(schema.V3_6) { diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index 9511bc6a6..5f1bcfef1 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -78,9 +78,9 @@ func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { s.bemu.RLock() defer s.bemu.RUnlock() - tx := s.be.BatchTx() - tx.Lock() - defer tx.Unlock() + tx := s.be.ReadTx() + tx.RLock() + defer tx.RUnlock() v, err := schema.UnsafeDetectSchemaVersion(s.lg, tx) if err != nil { return nil @@ -94,7 +94,7 @@ func (s *serverVersionAdapter) UpdateStorageVersion(target semver.Version) error defer s.bemu.RUnlock() tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return schema.UnsafeMigrate(s.lg, tx, s.r.storage, target) } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 24dad6603..7ec1b1212 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -23,6 +23,7 @@ import ( ) type Backend interface { + ReadTx() backend.ReadTx BatchTx() backend.BatchTx } @@ -32,6 +33,9 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. + UnsafeConsistentIndex() uint64 + // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) @@ -73,7 +77,19 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v, term := schema.ReadConsistentIndex(ci.be.BatchTx()) + v, term := schema.ReadConsistentIndex(ci.be.ReadTx()) + ci.SetConsistentIndex(v, term) + return v +} + +// UnsafeConsistentIndex is similar to ConsistentIndex, +// but it shouldn't lock the transaction. +func (ci *consistentIndex) UnsafeConsistentIndex() uint64 { + if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 { + return index + } + + v, term := schema.UnsafeReadConsistentIndex(ci.be.BatchTx()) ci.SetConsistentIndex(v, term) return v } @@ -106,7 +122,8 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index } func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) @@ -117,7 +134,7 @@ func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index f5632318e..b22f680bb 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -207,8 +207,10 @@ type EtcdServer struct { term uint64 // must use atomic operations to access; keep 64-bit aligned. lead uint64 // must use atomic operations to access; keep 64-bit aligned. - consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex - r raftNode // uses 64-bit atomics; keep 64-bit aligned. + consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned. + consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned. + consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex + r raftNode // uses 64-bit atomics; keep 64-bit aligned. readych chan struct{} Cfg config.ServerConfig @@ -341,6 +343,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = b.storage.backend.be + srv.be.SetTxPostLockHook(srv.getTxPostLockHook()) srv.beHooks = b.storage.backend.beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat @@ -978,6 +981,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) + newbe.SetTxPostLockHook(s.getTxPostLockHook()) + lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns @@ -1547,6 +1552,15 @@ func (s *EtcdServer) getTerm() uint64 { return atomic.LoadUint64(&s.term) } +func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) { + atomic.StoreUint64(&s.consistentIdx, cIdx) + atomic.StoreUint64(&s.consistentTerm, cTerm) +} + +func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) { + return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm) +} + func (s *EtcdServer) setLead(v uint64) { atomic.StoreUint64(&s.lead, v) } @@ -1771,7 +1785,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setConsistentIndexAndTerm(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -1801,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index, e.Term) + s.setConsistentIndexAndTerm(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -2296,3 +2310,12 @@ func (s *EtcdServer) raftStatus() raft.Status { func (s *EtcdServer) Version() *serverversion.Manager { return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s)) } + +func (s *EtcdServer) getTxPostLockHook() func() { + return func() { + cIdx, term := s.getConsistentIndexAndTerm() + if cIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(cIdx, term) + } + } +} diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index cb4386976..706c78549 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -687,9 +687,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { _, appliedi, _ := srv.apply(ents, &raftpb.ConfState{}) consistIndex := srv.consistIndex.ConsistentIndex() - if consistIndex != appliedi { - t.Fatalf("consistIndex = %v, want %v", consistIndex, appliedi) - } + assert.Equal(t, uint64(2), appliedi) t.Run("verify-backend", func(t *testing.T) { tx := be.BatchTx() @@ -698,9 +696,8 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *schema.UnsafeConfStateFromBackend(lg, tx)) }) - rindex, rterm := schema.ReadConsistentIndex(be.BatchTx()) + rindex, _ := schema.ReadConsistentIndex(be.ReadTx()) assert.Equal(t, consistIndex, rindex) - assert.Equal(t, uint64(4), rterm) } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/lease/lessor.go b/server/lease/lessor.go index cb6f0d6ec..4af816c76 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -797,7 +797,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh func (le *lessor) initAndRecover() { tx := le.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() schema.UnsafeCreateLeaseBucket(tx) lpbs := schema.MustUnsafeGetAllLeases(tx) tx.Unlock() diff --git a/server/storage/backend.go b/server/storage/backend.go index b7b0eb2fc..b1101cfa6 100644 --- a/server/storage/backend.go +++ b/server/storage/backend.go @@ -99,7 +99,7 @@ func OpenBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { func RecoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks *BackendHooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - consistentIndex, _ = schema.ReadConsistentIndex(oldbe.BatchTx()) + consistentIndex, _ = schema.ReadConsistentIndex(oldbe.ReadTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index a61f4ec36..f949f282b 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -67,6 +67,9 @@ type Backend interface { Defrag() error ForceCommit() Close() error + + // SetTxPostLockHook sets a txPostLockHook. + SetTxPostLockHook(func()) } type Snapshot interface { @@ -119,6 +122,9 @@ type backend struct { hooks Hooks + // txPostLockHook is called each time right after locking the tx. + txPostLockHook func() + lg *zap.Logger } @@ -227,6 +233,14 @@ func (b *backend) BatchTx() BatchTx { return b.batchTx } +func (b *backend) SetTxPostLockHook(hook func()) { + // It needs to lock the batchTx, because the periodic commit + // may be accessing the txPostLockHook at the moment. + b.batchTx.LockWithoutHook() + defer b.batchTx.Unlock() + b.txPostLockHook = hook +} + func (b *backend) ReadTx() ReadTx { return b.readTx } // ConcurrentReadTx creates and returns a new ReadTx, which: @@ -438,7 +452,7 @@ func (b *backend) defrag() error { // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. - b.batchTx.Lock() + b.batchTx.LockWithoutHook() defer b.batchTx.Unlock() // lock database after lock tx to avoid deadlock. diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index a8e649199..a27266bb2 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -55,6 +55,7 @@ type BatchTx interface { CommitAndStop() LockInsideApply() LockOutsideApply() + } type batchTx struct { @@ -66,6 +67,13 @@ type batchTx struct { } func (t *batchTx) Lock() { + t.LockWithoutHook() + if t.backend.txPostLockHook != nil { + t.backend.txPostLockHook() + } +} + +func (t *batchTx) LockWithoutHook() { t.Mutex.Lock() } @@ -226,14 +234,14 @@ func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { - t.Lock() + t.LockWithoutHook() t.commit(false) t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { - t.Lock() + t.LockWithoutHook() t.commit(true) t.Unlock() } @@ -303,13 +311,13 @@ func (t *batchTxBuffered) Unlock() { } func (t *batchTxBuffered) Commit() { - t.Lock() + t.LockWithoutHook() t.commit(false) t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { - t.Lock() + t.LockWithoutHook() t.commit(true) t.Unlock() } diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 846f83cde..9b79c090a 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -121,7 +121,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi } tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() tx.UnsafeCreateBucket(schema.Key) schema.UnsafeCreateMetaBucket(tx) tx.Unlock() @@ -331,7 +331,7 @@ func (s *store) restore() error { // restore index tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() finishedCompact, found := UnsafeReadFinishedCompact(tx) if found { diff --git a/server/storage/mvcc/kvstore_compaction.go b/server/storage/mvcc/kvstore_compaction.go index ba9440082..941f056a9 100644 --- a/server/storage/mvcc/kvstore_compaction.go +++ b/server/storage/mvcc/kvstore_compaction.go @@ -42,7 +42,7 @@ func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struc start := time.Now() tx := s.b.BatchTx() - tx.Lock() + tx.LockWithoutHook() keys, _ := tx.UnsafeRange(schema.Key, last, end, int64(batchNum)) for _, key := range keys { rev = bytesToRev(key) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 31cb37655..4a51e9c40 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -881,6 +881,7 @@ type fakeBatchTx struct { rangeRespc chan rangeResp } +func (b *fakeBatchTx) LockWithoutHook() {} func (b *fakeBatchTx) Lock() {} func (b *fakeBatchTx) Unlock() {} func (b *fakeBatchTx) RLock() {} @@ -924,6 +925,7 @@ func (b *fakeBackend) Snapshot() backend.Snapshot func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Close() error { return nil } +func (b *fakeBackend) SetTxPostLockHook(func()) {} type indexGetResp struct { rev revision diff --git a/server/storage/schema/alarm.go b/server/storage/schema/alarm.go index 605bb3a0b..09a49994d 100644 --- a/server/storage/schema/alarm.go +++ b/server/storage/schema/alarm.go @@ -34,7 +34,7 @@ func NewAlarmBackend(lg *zap.Logger, be backend.Backend) *alarmBackend { func (s *alarmBackend) CreateAlarmBucket() { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Alarm) } diff --git a/server/storage/schema/auth.go b/server/storage/schema/auth.go index 93ef34c37..fc334a8bc 100644 --- a/server/storage/schema/auth.go +++ b/server/storage/schema/auth.go @@ -49,7 +49,7 @@ func NewAuthBackend(lg *zap.Logger, be backend.Backend) *authBackend { func (abe *authBackend) CreateAuthBuckets() { tx := abe.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Auth) tx.UnsafeCreateBucket(AuthUsers) diff --git a/server/storage/schema/cindex.go b/server/storage/schema/cindex.go index d7b06b9ce..38eea6f91 100644 --- a/server/storage/schema/cindex.go +++ b/server/storage/schema/cindex.go @@ -26,7 +26,7 @@ func UnsafeCreateMetaBucket(tx backend.BatchTx) { // CreateMetaBucket creates the `meta` bucket (if it does not exists yet). func CreateMetaBucket(tx backend.BatchTx) { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Meta) } @@ -51,8 +51,8 @@ func UnsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { // ReadConsistentIndex loads consistent index and term from given transaction. // returns 0 if the data are not found. func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeReadConsistentIndex(tx) } diff --git a/server/storage/schema/membership.go b/server/storage/schema/membership.go index 844b50a85..153699e69 100644 --- a/server/storage/schema/membership.go +++ b/server/storage/schema/membership.go @@ -61,7 +61,7 @@ func (s *membershipBackend) MustSaveMemberToBackend(m *membership.Member) { // from the v3 backend. func (s *membershipBackend) TrimClusterFromBackend() error { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeDeleteBucket(Cluster) return nil @@ -121,7 +121,7 @@ func (s *membershipBackend) readMembersFromBackend() (map[types.ID]*membership.M func (s *membershipBackend) TrimMembershipFromBackend() error { s.lg.Info("Trimming membership information from the backend...") tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() err := tx.UnsafeForEach(Members, func(k, v []byte) error { tx.UnsafeDelete(Members, k) @@ -167,7 +167,7 @@ func (s *membershipBackend) MustSaveDowngradeToBackend(downgrade *version.Downgr func (s *membershipBackend) MustCreateBackendBuckets() { tx := s.be.BatchTx() - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() tx.UnsafeCreateBucket(Members) tx.UnsafeCreateBucket(MembersRemoved) diff --git a/server/storage/schema/migration.go b/server/storage/schema/migration.go index 47734b4b8..e1e44dab5 100644 --- a/server/storage/schema/migration.go +++ b/server/storage/schema/migration.go @@ -49,7 +49,7 @@ func newPlan(lg *zap.Logger, current semver.Version, target semver.Version) (pla } func (p migrationPlan) Execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return p.unsafeExecute(lg, tx) } @@ -90,7 +90,7 @@ func newMigrationStep(v semver.Version, isUpgrade bool, changes []schemaChange) // execute runs actions required to migrate etcd storage between two minor versions. func (s migrationStep) execute(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return s.unsafeExecute(lg, tx) } diff --git a/server/storage/schema/schema.go b/server/storage/schema/schema.go index 850b55d5b..2b4c15f29 100644 --- a/server/storage/schema/schema.go +++ b/server/storage/schema/schema.go @@ -31,7 +31,7 @@ var ( // Validate checks provided backend to confirm that schema used is supported. func Validate(lg *zap.Logger, tx backend.BatchTx) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return unsafeValidate(lg, tx) } @@ -60,7 +60,7 @@ type WALVersion interface { // Migrate updates storage schema to provided target version. // Downgrading requires that provided WAL doesn't contain unsupported entries. func Migrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semver.Version) error { - tx.Lock() + tx.LockWithoutHook() defer tx.Unlock() return UnsafeMigrate(lg, tx, w, target) } @@ -89,8 +89,8 @@ func UnsafeMigrate(lg *zap.Logger, tx backend.BatchTx, w WALVersion, target semv // * v3.5 will return it's version if it includes all storage fields added in v3.5 (might require a snapshot). // * v3.4 and older is not supported and will return error. func DetectSchemaVersion(lg *zap.Logger, tx backend.ReadTx) (v semver.Version, err error) { - tx.Lock() - defer tx.Unlock() + tx.RLock() + defer tx.RUnlock() return UnsafeDetectSchemaVersion(lg, tx) } diff --git a/server/verify/verify.go b/server/verify/verify.go index 9402e5eb5..2b73fbc07 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -108,8 +108,7 @@ func MustVerifyIfEnabled(cfg Config) { } func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { - tx := be.BatchTx() - index, term := schema.ReadConsistentIndex(tx) + index, term := schema.ReadConsistentIndex(be.ReadTx()) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) }