diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 91046cd03..de64c1c11 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -33,12 +33,18 @@ type ConsistentIndexer interface { // ConsistentIndex returns the consistent index of current executing entry. ConsistentIndex() uint64 + // ConsistentApplyingIndex returns the consistent applying index of current executing entry. + ConsistentApplyingIndex() (uint64, uint64) + // UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction. UnsafeConsistentIndex() uint64 // SetConsistentIndex set the consistent index of current executing entry. SetConsistentIndex(v uint64, term uint64) + // SetConsistentApplyingIndex set the consistent applying index of current executing entry. + SetConsistentApplyingIndex(v uint64, term uint64) + // UnsafeSave must be called holding the lock on the tx. // It saves consistentIndex to the underlying stable storage. UnsafeSave(tx backend.BatchTx) @@ -58,6 +64,19 @@ type consistentIndex struct { // The value is being persisted in the backend since v3.5. term uint64 + // applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index + // and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be + // saved to consistentIndex and term above in the txPostLockInsideApplyHook. + // + // TODO(ahrtr): try to remove the OnPreCommitUnsafe, and compare the + // performance difference. Afterwards we can make a decision on whether + // or not we should remove OnPreCommitUnsafe. If it is true, then we + // can remove applyingIndex and applyingTerm, and save the e.Index and + // e.Term to consistentIndex and term directly in applyEntries, and + // persist them into db in the txPostLockInsideApplyHook. + applyingIndex uint64 + applyingTerm uint64 + // be is used for initial read consistentIndex be Backend // mutex is protecting be. @@ -111,6 +130,15 @@ func (ci *consistentIndex) SetBackend(be Backend) { ci.SetConsistentIndex(0, 0) } +func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm) +} + +func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) { + atomic.StoreUint64(&ci.applyingIndex, v) + atomic.StoreUint64(&ci.applyingTerm, term) +} + func NewFakeConsistentIndex(index uint64) ConsistentIndexer { return &fakeConsistentIndex{index: index} } @@ -120,13 +148,24 @@ type fakeConsistentIndex struct { term uint64 } -func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } -func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index } +func (f *fakeConsistentIndex) ConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} +func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) { + return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term) +} +func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { + return atomic.LoadUint64(&f.index) +} func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) atomic.StoreUint64(&f.term, term) } +func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) { + atomic.StoreUint64(&f.index, index) + atomic.StoreUint64(&f.term, term) +} func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} func (f *fakeConsistentIndex) SetBackend(_ Backend) {} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 015bcaf6f..a3d0c9376 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -207,10 +207,8 @@ type EtcdServer struct { term uint64 // must use atomic operations to access; keep 64-bit aligned. lead uint64 // must use atomic operations to access; keep 64-bit aligned. - consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned. - consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned. - consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex - r raftNode // uses 64-bit atomics; keep 64-bit aligned. + consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex + r raftNode // uses 64-bit atomics; keep 64-bit aligned. readych chan struct{} Cfg config.ServerConfig @@ -405,7 +403,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { // Set the hook after EtcdServer finishes the initialization to avoid // the hook being called during the initialization process. - srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockHook()) + srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook()) // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ @@ -984,7 +982,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { } s.consistIndex.SetBackend(newbe) - newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockHook()) + newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook()) lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) @@ -1555,15 +1553,6 @@ func (s *EtcdServer) getTerm() uint64 { return atomic.LoadUint64(&s.term) } -func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) { - atomic.StoreUint64(&s.consistentIdx, cIdx) - atomic.StoreUint64(&s.consistentTerm, cTerm) -} - -func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) { - return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm) -} - func (s *EtcdServer) setLead(v uint64) { atomic.StoreUint64(&s.lead, v) } @@ -1788,7 +1777,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.setConsistentIndexAndTerm(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -1826,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.setConsistentIndexAndTerm(e.Index, e.Term) + s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", @@ -1925,7 +1914,8 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con // The txPostLock callback will not get called in this case, // so we should set the consistent index directly. if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 { - s.consistIndex.SetConsistentIndex(s.consistentIdx, s.consistentTerm) + applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm) } return false, err } @@ -2329,11 +2319,11 @@ func (s *EtcdServer) Version() *serverversion.Manager { return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s)) } -func (s *EtcdServer) getTxPostLockHook() func() { +func (s *EtcdServer) getTxPostLockInsideApplyHook() func() { return func() { - cIdx, term := s.getConsistentIndexAndTerm() - if cIdx > s.consistIndex.UnsafeConsistentIndex() { - s.consistIndex.SetConsistentIndex(cIdx, term) + applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex() + if applyingIdx > s.consistIndex.UnsafeConsistentIndex() { + s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm) } } } diff --git a/server/storage/backend/backend.go b/server/storage/backend/backend.go index ebb99ee2c..f30d79062 100644 --- a/server/storage/backend/backend.go +++ b/server/storage/backend/backend.go @@ -68,7 +68,7 @@ type Backend interface { ForceCommit() Close() error - // SetTxPostLockInsideApplyHook sets a txPostLockHook. + // SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook. SetTxPostLockInsideApplyHook(func()) } @@ -122,8 +122,8 @@ type backend struct { hooks Hooks - // txPostLockHook is called each time right after locking the tx. - txPostLockHook func() + // txPostLockInsideApplyHook is called each time right after locking the tx. + txPostLockInsideApplyHook func() lg *zap.Logger } @@ -235,10 +235,10 @@ func (b *backend) BatchTx() BatchTx { func (b *backend) SetTxPostLockInsideApplyHook(hook func()) { // It needs to lock the batchTx, because the periodic commit - // may be accessing the txPostLockHook at the moment. + // may be accessing the txPostLockInsideApplyHook at the moment. b.batchTx.lock() defer b.batchTx.Unlock() - b.txPostLockHook = hook + b.txPostLockInsideApplyHook = hook } func (b *backend) ReadTx() ReadTx { return b.readTx } diff --git a/server/storage/backend/batch_tx.go b/server/storage/backend/batch_tx.go index 7eca835fd..c8fa55954 100644 --- a/server/storage/backend/batch_tx.go +++ b/server/storage/backend/batch_tx.go @@ -77,13 +77,14 @@ func (t *batchTx) lock() { func (t *batchTx) LockInsideApply() { t.lock() - if t.backend.txPostLockHook != nil { + if t.backend.txPostLockInsideApplyHook != nil { // The callers of some methods (i.e., (*RaftCluster).AddMember) // can be coming from both InsideApply and OutsideApply, but the - // callers from OutsideApply will have a nil txPostLockHook. So we - // should check the txPostLockHook before validating the callstack. + // callers from OutsideApply will have a nil txPostLockInsideApplyHook. + // So we should check the txPostLockInsideApplyHook before validating + // the callstack. ValidateCalledInsideApply(t.backend.lg) - t.backend.txPostLockHook() + t.backend.txPostLockInsideApplyHook() } } diff --git a/server/storage/backend/verify_test.go b/server/storage/backend/verify_test.go index 2345f46b5..5cb38ee9d 100644 --- a/server/storage/backend/verify_test.go +++ b/server/storage/backend/verify_test.go @@ -25,11 +25,11 @@ import ( func TestLockVerify(t *testing.T) { tcs := []struct { - name string - insideApply bool - lock func(tx backend.BatchTx) - txPostLockHook func() - expectPanic bool + name string + insideApply bool + lock func(tx backend.BatchTx) + txPostLockInsideApplyHook func() + expectPanic bool }{ { name: "call lockInsideApply from inside apply", @@ -38,17 +38,17 @@ func TestLockVerify(t *testing.T) { expectPanic: false, }, { - name: "call lockInsideApply from outside apply (without txPostLockHook)", + name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)", insideApply: false, lock: lockInsideApply, expectPanic: false, }, { - name: "call lockInsideApply from outside apply (with txPostLockHook)", - insideApply: false, - lock: lockInsideApply, - txPostLockHook: func() {}, - expectPanic: true, + name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)", + insideApply: false, + lock: lockInsideApply, + txPostLockInsideApplyHook: func() {}, + expectPanic: true, }, { name: "call lockOutsideApply from outside apply", @@ -78,7 +78,7 @@ func TestLockVerify(t *testing.T) { t.Run(tc.name, func(t *testing.T) { be, _ := betesting.NewTmpBackend(t, time.Hour, 10000) - be.SetTxPostLockInsideApplyHook(tc.txPostLockHook) + be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook) hasPaniced := handlePanic(func() { if tc.insideApply {