server: Save consistency index and term to backend even when they decrease

Reason to store CI and term in backend was to make db fully independent
snapshot, it was never meant to interfere with apply logic. Skip of CI
was introduced for v2->v3 migration where we wanted to prevent it from
decreasing when replaying wal in
https://github.com/etcd-io/etcd/pull/5391. By mistake it was added to
apply flow during refactor in
https://github.com/etcd-io/etcd/pull/12855#commitcomment-70713670.

Consistency index and term should only be negotiated and used by raft to make
decisions. Their values should only driven by raft state machine and
backend should only be responsible for storing them.
This commit is contained in:
Marek Siarkowicz 2022-04-07 17:32:21 +02:00
parent 238b18c110
commit 780ec338f0
4 changed files with 59 additions and 16 deletions

View File

@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
tx.Lock()
defer tx.Unlock()
cindex.UnsafeCreateMetaBucket(tx)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.

View File

@ -476,6 +476,6 @@ func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false)
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term)
return nil
}

View File

@ -25,6 +25,7 @@ import (
type Backend interface {
BatchTx() backend.BatchTx
ReadTx() backend.ReadTx
}
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
@ -87,7 +88,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
term := atomic.LoadUint64(&ci.term)
UnsafeUpdateConsistentIndex(tx, index, term, true)
UnsafeUpdateConsistentIndex(tx, index, term)
}
func (ci *consistentIndex) SetBackend(be Backend) {
@ -154,22 +155,12 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
return unsafeReadConsistentIndex(tx)
}
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
if onlyGrow {
oldi, oldTerm := unsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if term == oldTerm && index <= oldi {
return
}
}
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
@ -182,8 +173,8 @@ func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64,
}
}
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
tx.Lock()
defer tx.Unlock()
UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
UnsafeUpdateConsistentIndex(tx, index, term)
}

View File

@ -63,6 +63,58 @@ func TestConsistentIndex(t *testing.T) {
assert.Equal(t, r, index)
}
func TestConsistentIndexDecrease(t *testing.T) {
initIndex := uint64(100)
initTerm := uint64(10)
tcs := []struct {
name string
index uint64
term uint64
}{
{
name: "Decrease term",
index: initIndex + 1,
term: initTerm - 1,
},
{
name: "Decrease CI",
index: initIndex - 1,
term: initTerm + 1,
},
{
name: "Decrease CI and term",
index: initIndex - 1,
term: initTerm - 1,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
tx.Lock()
UnsafeCreateMetaBucket(tx)
UnsafeUpdateConsistentIndex(tx, initIndex, initTerm)
tx.Unlock()
be.ForceCommit()
be.Close()
be = backend.NewDefaultBackend(tmpPath)
defer be.Close()
ci := NewConsistentIndex(be)
ci.SetConsistentIndex(tc.index, tc.term)
tx = be.BatchTx()
tx.Lock()
ci.UnsafeSave(tx)
tx.Unlock()
assert.Equal(t, tc.index, ci.ConsistentIndex())
ci = NewConsistentIndex(be)
assert.Equal(t, tc.index, ci.ConsistentIndex())
})
}
}
func TestFakeConsistentIndex(t *testing.T) {
r := rand.Uint64()