mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
move the consistentIdx and consistentTerm from Etcdserver to cindex package
Removed the fields consistentIdx and consistentTerm from struct EtcdServer, and added applyingIndex and applyingTerm into struct consistentIndex in package cindex. We may remove the two fields completely if we decide to remove the OnPreCommitUnsafe, and it will depend on the performance test result.
This commit is contained in:
@@ -33,12 +33,18 @@ type ConsistentIndexer interface {
|
||||
// ConsistentIndex returns the consistent index of current executing entry.
|
||||
ConsistentIndex() uint64
|
||||
|
||||
// ConsistentApplyingIndex returns the consistent applying index of current executing entry.
|
||||
ConsistentApplyingIndex() (uint64, uint64)
|
||||
|
||||
// UnsafeConsistentIndex is similar to ConsistentIndex, but it doesn't lock the transaction.
|
||||
UnsafeConsistentIndex() uint64
|
||||
|
||||
// SetConsistentIndex set the consistent index of current executing entry.
|
||||
SetConsistentIndex(v uint64, term uint64)
|
||||
|
||||
// SetConsistentApplyingIndex set the consistent applying index of current executing entry.
|
||||
SetConsistentApplyingIndex(v uint64, term uint64)
|
||||
|
||||
// UnsafeSave must be called holding the lock on the tx.
|
||||
// It saves consistentIndex to the underlying stable storage.
|
||||
UnsafeSave(tx backend.BatchTx)
|
||||
@@ -58,6 +64,19 @@ type consistentIndex struct {
|
||||
// The value is being persisted in the backend since v3.5.
|
||||
term uint64
|
||||
|
||||
// applyingIndex and applyingTerm are just temporary cache of the raftpb.Entry.Index
|
||||
// and raftpb.Entry.Term, and they are not ready to be persisted yet. They will be
|
||||
// saved to consistentIndex and term above in the txPostLockInsideApplyHook.
|
||||
//
|
||||
// TODO(ahrtr): try to remove the OnPreCommitUnsafe, and compare the
|
||||
// performance difference. Afterwards we can make a decision on whether
|
||||
// or not we should remove OnPreCommitUnsafe. If it is true, then we
|
||||
// can remove applyingIndex and applyingTerm, and save the e.Index and
|
||||
// e.Term to consistentIndex and term directly in applyEntries, and
|
||||
// persist them into db in the txPostLockInsideApplyHook.
|
||||
applyingIndex uint64
|
||||
applyingTerm uint64
|
||||
|
||||
// be is used for initial read consistentIndex
|
||||
be Backend
|
||||
// mutex is protecting be.
|
||||
@@ -111,6 +130,15 @@ func (ci *consistentIndex) SetBackend(be Backend) {
|
||||
ci.SetConsistentIndex(0, 0)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
|
||||
return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) {
|
||||
atomic.StoreUint64(&ci.applyingIndex, v)
|
||||
atomic.StoreUint64(&ci.applyingTerm, term)
|
||||
}
|
||||
|
||||
func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
|
||||
return &fakeConsistentIndex{index: index}
|
||||
}
|
||||
@@ -120,13 +148,24 @@ type fakeConsistentIndex struct {
|
||||
term uint64
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
|
||||
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 { return f.index }
|
||||
func (f *fakeConsistentIndex) ConsistentIndex() uint64 {
|
||||
return atomic.LoadUint64(&f.index)
|
||||
}
|
||||
func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
|
||||
return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term)
|
||||
}
|
||||
func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 {
|
||||
return atomic.LoadUint64(&f.index)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
|
||||
atomic.StoreUint64(&f.index, index)
|
||||
atomic.StoreUint64(&f.term, term)
|
||||
}
|
||||
func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) {
|
||||
atomic.StoreUint64(&f.index, index)
|
||||
atomic.StoreUint64(&f.term, term)
|
||||
}
|
||||
|
||||
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
|
||||
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
||||
|
||||
@@ -207,10 +207,8 @@ type EtcdServer struct {
|
||||
term uint64 // must use atomic operations to access; keep 64-bit aligned.
|
||||
lead uint64 // must use atomic operations to access; keep 64-bit aligned.
|
||||
|
||||
consistentIdx uint64 // must use atomic operations to access; keep 64-bit aligned.
|
||||
consistentTerm uint64 // must use atomic operations to access; keep 64-bit aligned.
|
||||
consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
|
||||
r raftNode // uses 64-bit atomics; keep 64-bit aligned.
|
||||
consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
|
||||
r raftNode // uses 64-bit atomics; keep 64-bit aligned.
|
||||
|
||||
readych chan struct{}
|
||||
Cfg config.ServerConfig
|
||||
@@ -405,7 +403,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
|
||||
// Set the hook after EtcdServer finishes the initialization to avoid
|
||||
// the hook being called during the initialization process.
|
||||
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockHook())
|
||||
srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
|
||||
|
||||
// TODO: move transport initialization near the definition of remote
|
||||
tr := &rafthttp.Transport{
|
||||
@@ -984,7 +982,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
}
|
||||
|
||||
s.consistIndex.SetBackend(newbe)
|
||||
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockHook())
|
||||
newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())
|
||||
|
||||
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
|
||||
|
||||
@@ -1555,15 +1553,6 @@ func (s *EtcdServer) getTerm() uint64 {
|
||||
return atomic.LoadUint64(&s.term)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) setConsistentIndexAndTerm(cIdx, cTerm uint64) {
|
||||
atomic.StoreUint64(&s.consistentIdx, cIdx)
|
||||
atomic.StoreUint64(&s.consistentTerm, cTerm)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) getConsistentIndexAndTerm() (uint64, uint64) {
|
||||
return atomic.LoadUint64(&s.consistentIdx), atomic.LoadUint64(&s.consistentTerm)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) setLead(v uint64) {
|
||||
atomic.StoreUint64(&s.lead, v)
|
||||
}
|
||||
@@ -1788,7 +1777,7 @@ func (s *EtcdServer) apply(
|
||||
|
||||
// set the consistent index of current executing entry
|
||||
if e.Index > s.consistIndex.ConsistentIndex() {
|
||||
s.setConsistentIndexAndTerm(e.Index, e.Term)
|
||||
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
|
||||
@@ -1826,7 +1815,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
index := s.consistIndex.ConsistentIndex()
|
||||
if e.Index > index {
|
||||
// set the consistent index of current executing entry
|
||||
s.setConsistentIndexAndTerm(e.Index, e.Term)
|
||||
s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
|
||||
shouldApplyV3 = membership.ApplyBoth
|
||||
}
|
||||
s.lg.Debug("apply entry normal",
|
||||
@@ -1925,7 +1914,8 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
|
||||
// The txPostLock callback will not get called in this case,
|
||||
// so we should set the consistent index directly.
|
||||
if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
|
||||
s.consistIndex.SetConsistentIndex(s.consistentIdx, s.consistentTerm)
|
||||
applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
|
||||
s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
@@ -2329,11 +2319,11 @@ func (s *EtcdServer) Version() *serverversion.Manager {
|
||||
return serverversion.NewManager(s.Logger(), NewServerVersionAdapter(s))
|
||||
}
|
||||
|
||||
func (s *EtcdServer) getTxPostLockHook() func() {
|
||||
func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
|
||||
return func() {
|
||||
cIdx, term := s.getConsistentIndexAndTerm()
|
||||
if cIdx > s.consistIndex.UnsafeConsistentIndex() {
|
||||
s.consistIndex.SetConsistentIndex(cIdx, term)
|
||||
applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
|
||||
if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
|
||||
s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,7 +68,7 @@ type Backend interface {
|
||||
ForceCommit()
|
||||
Close() error
|
||||
|
||||
// SetTxPostLockInsideApplyHook sets a txPostLockHook.
|
||||
// SetTxPostLockInsideApplyHook sets a txPostLockInsideApplyHook.
|
||||
SetTxPostLockInsideApplyHook(func())
|
||||
}
|
||||
|
||||
@@ -122,8 +122,8 @@ type backend struct {
|
||||
|
||||
hooks Hooks
|
||||
|
||||
// txPostLockHook is called each time right after locking the tx.
|
||||
txPostLockHook func()
|
||||
// txPostLockInsideApplyHook is called each time right after locking the tx.
|
||||
txPostLockInsideApplyHook func()
|
||||
|
||||
lg *zap.Logger
|
||||
}
|
||||
@@ -235,10 +235,10 @@ func (b *backend) BatchTx() BatchTx {
|
||||
|
||||
func (b *backend) SetTxPostLockInsideApplyHook(hook func()) {
|
||||
// It needs to lock the batchTx, because the periodic commit
|
||||
// may be accessing the txPostLockHook at the moment.
|
||||
// may be accessing the txPostLockInsideApplyHook at the moment.
|
||||
b.batchTx.lock()
|
||||
defer b.batchTx.Unlock()
|
||||
b.txPostLockHook = hook
|
||||
b.txPostLockInsideApplyHook = hook
|
||||
}
|
||||
|
||||
func (b *backend) ReadTx() ReadTx { return b.readTx }
|
||||
|
||||
@@ -77,13 +77,14 @@ func (t *batchTx) lock() {
|
||||
|
||||
func (t *batchTx) LockInsideApply() {
|
||||
t.lock()
|
||||
if t.backend.txPostLockHook != nil {
|
||||
if t.backend.txPostLockInsideApplyHook != nil {
|
||||
// The callers of some methods (i.e., (*RaftCluster).AddMember)
|
||||
// can be coming from both InsideApply and OutsideApply, but the
|
||||
// callers from OutsideApply will have a nil txPostLockHook. So we
|
||||
// should check the txPostLockHook before validating the callstack.
|
||||
// callers from OutsideApply will have a nil txPostLockInsideApplyHook.
|
||||
// So we should check the txPostLockInsideApplyHook before validating
|
||||
// the callstack.
|
||||
ValidateCalledInsideApply(t.backend.lg)
|
||||
t.backend.txPostLockHook()
|
||||
t.backend.txPostLockInsideApplyHook()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,11 +25,11 @@ import (
|
||||
|
||||
func TestLockVerify(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
insideApply bool
|
||||
lock func(tx backend.BatchTx)
|
||||
txPostLockHook func()
|
||||
expectPanic bool
|
||||
name string
|
||||
insideApply bool
|
||||
lock func(tx backend.BatchTx)
|
||||
txPostLockInsideApplyHook func()
|
||||
expectPanic bool
|
||||
}{
|
||||
{
|
||||
name: "call lockInsideApply from inside apply",
|
||||
@@ -38,17 +38,17 @@ func TestLockVerify(t *testing.T) {
|
||||
expectPanic: false,
|
||||
},
|
||||
{
|
||||
name: "call lockInsideApply from outside apply (without txPostLockHook)",
|
||||
name: "call lockInsideApply from outside apply (without txPostLockInsideApplyHook)",
|
||||
insideApply: false,
|
||||
lock: lockInsideApply,
|
||||
expectPanic: false,
|
||||
},
|
||||
{
|
||||
name: "call lockInsideApply from outside apply (with txPostLockHook)",
|
||||
insideApply: false,
|
||||
lock: lockInsideApply,
|
||||
txPostLockHook: func() {},
|
||||
expectPanic: true,
|
||||
name: "call lockInsideApply from outside apply (with txPostLockInsideApplyHook)",
|
||||
insideApply: false,
|
||||
lock: lockInsideApply,
|
||||
txPostLockInsideApplyHook: func() {},
|
||||
expectPanic: true,
|
||||
},
|
||||
{
|
||||
name: "call lockOutsideApply from outside apply",
|
||||
@@ -78,7 +78,7 @@ func TestLockVerify(t *testing.T) {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||
be.SetTxPostLockInsideApplyHook(tc.txPostLockHook)
|
||||
be.SetTxPostLockInsideApplyHook(tc.txPostLockInsideApplyHook)
|
||||
|
||||
hasPaniced := handlePanic(func() {
|
||||
if tc.insideApply {
|
||||
|
||||
Reference in New Issue
Block a user