Merge pull request #13904 from serathius/term-v3.5

[release-3.5] server: Save consistency index and term to backend even when they decrease
This commit is contained in:
Marek Siarkowicz 2022-04-08 14:03:32 +02:00 committed by GitHub
commit 3ace622792
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 59 additions and 16 deletions

View File

@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
tx.Lock()
defer tx.Unlock()
cindex.UnsafeCreateMetaBucket(tx)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.

View File

@ -476,6 +476,6 @@ func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false)
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term)
return nil
}

View File

@ -25,6 +25,7 @@ import (
type Backend interface {
BatchTx() backend.BatchTx
ReadTx() backend.ReadTx
}
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
@ -87,7 +88,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
term := atomic.LoadUint64(&ci.term)
UnsafeUpdateConsistentIndex(tx, index, term, true)
UnsafeUpdateConsistentIndex(tx, index, term)
}
func (ci *consistentIndex) SetBackend(be Backend) {
@ -154,22 +155,12 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
return unsafeReadConsistentIndex(tx)
}
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
if onlyGrow {
oldi, oldTerm := unsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if term == oldTerm && index <= oldi {
return
}
}
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
@ -182,8 +173,8 @@ func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64,
}
}
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
tx.Lock()
defer tx.Unlock()
UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
UnsafeUpdateConsistentIndex(tx, index, term)
}

View File

@ -63,6 +63,58 @@ func TestConsistentIndex(t *testing.T) {
assert.Equal(t, r, index)
}
func TestConsistentIndexDecrease(t *testing.T) {
initIndex := uint64(100)
initTerm := uint64(10)
tcs := []struct {
name string
index uint64
term uint64
}{
{
name: "Decrease term",
index: initIndex + 1,
term: initTerm - 1,
},
{
name: "Decrease CI",
index: initIndex - 1,
term: initTerm + 1,
},
{
name: "Decrease CI and term",
index: initIndex - 1,
term: initTerm - 1,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
tx.Lock()
UnsafeCreateMetaBucket(tx)
UnsafeUpdateConsistentIndex(tx, initIndex, initTerm)
tx.Unlock()
be.ForceCommit()
be.Close()
be = backend.NewDefaultBackend(tmpPath)
defer be.Close()
ci := NewConsistentIndex(be)
ci.SetConsistentIndex(tc.index, tc.term)
tx = be.BatchTx()
tx.Lock()
ci.UnsafeSave(tx)
tx.Unlock()
assert.Equal(t, tc.index, ci.ConsistentIndex())
ci = NewConsistentIndex(be)
assert.Equal(t, tc.index, ci.ConsistentIndex())
})
}
}
func TestFakeConsistentIndex(t *testing.T) {
r := rand.Uint64()