mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13903 from serathius/term
server: Save consistency index and term to backend even when they decease
This commit is contained in:
commit
3dce38085d
@ -325,7 +325,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
schema.UnsafeCreateMetaBucket(tx)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, idx, term, false)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, idx, term)
|
||||
} else {
|
||||
// Thanks to translateWAL not moving entries, but just replacing them with
|
||||
// 'empty', there is no need to update the consistency index.
|
||||
|
@ -483,6 +483,6 @@ func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
|
||||
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
|
||||
defer be.Close()
|
||||
|
||||
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false)
|
||||
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term)
|
||||
return nil
|
||||
}
|
||||
|
@ -288,7 +288,7 @@ func createSnapshotAndBackendDB(cfg config.ServerConfig, snapshotTerm, snapshotI
|
||||
// create snapshot db file: "%016x.snap.db"
|
||||
be := serverstorage.OpenBackend(cfg, nil)
|
||||
schema.CreateMetaBucket(be.BatchTx())
|
||||
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), snapshotIndex, snapshotTerm, false)
|
||||
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), snapshotIndex, snapshotTerm)
|
||||
schema.MustUnsafeSaveConfStateToBackend(cfg.Logger, be.BatchTx(), &confState)
|
||||
if err = be.Close(); err != nil {
|
||||
return
|
||||
@ -301,6 +301,6 @@ func createSnapshotAndBackendDB(cfg config.ServerConfig, snapshotTerm, snapshotI
|
||||
// create backend db file
|
||||
be = serverstorage.OpenBackend(cfg, nil)
|
||||
schema.CreateMetaBucket(be.BatchTx())
|
||||
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), 1, 1, false)
|
||||
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), 1, 1)
|
||||
return be.Close()
|
||||
}
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
|
||||
type Backend interface {
|
||||
ReadTx() backend.ReadTx
|
||||
BatchTx() backend.BatchTx
|
||||
}
|
||||
|
||||
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
|
||||
@ -119,7 +118,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
|
||||
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
|
||||
index := atomic.LoadUint64(&ci.consistentIndex)
|
||||
term := atomic.LoadUint64(&ci.term)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term, true)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) SetBackend(be Backend) {
|
||||
@ -170,8 +169,8 @@ func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint
|
||||
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
|
||||
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
||||
|
||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
|
||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term)
|
||||
}
|
||||
|
@ -65,6 +65,58 @@ func TestConsistentIndex(t *testing.T) {
|
||||
assert.Equal(t, r, index)
|
||||
}
|
||||
|
||||
func TestConsistentIndexDecrease(t *testing.T) {
|
||||
initIndex := uint64(100)
|
||||
initTerm := uint64(10)
|
||||
|
||||
tcs := []struct {
|
||||
name string
|
||||
index uint64
|
||||
term uint64
|
||||
}{
|
||||
{
|
||||
name: "Decrease term",
|
||||
index: initIndex + 1,
|
||||
term: initTerm - 1,
|
||||
},
|
||||
{
|
||||
name: "Decrease CI",
|
||||
index: initIndex - 1,
|
||||
term: initTerm + 1,
|
||||
},
|
||||
{
|
||||
name: "Decrease CI and term",
|
||||
index: initIndex - 1,
|
||||
term: initTerm - 1,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
schema.UnsafeCreateMetaBucket(tx)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, initIndex, initTerm)
|
||||
tx.Unlock()
|
||||
be.ForceCommit()
|
||||
be.Close()
|
||||
|
||||
be = backend.NewDefaultBackend(zaptest.NewLogger(t), tmpPath)
|
||||
defer be.Close()
|
||||
ci := NewConsistentIndex(be)
|
||||
ci.SetConsistentIndex(tc.index, tc.term)
|
||||
tx = be.BatchTx()
|
||||
tx.Lock()
|
||||
ci.UnsafeSave(tx)
|
||||
tx.Unlock()
|
||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||
|
||||
ci = NewConsistentIndex(be)
|
||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFakeConsistentIndex(t *testing.T) {
|
||||
|
||||
r := rand.Uint64()
|
||||
|
@ -16,6 +16,7 @@ package schema
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
)
|
||||
|
||||
@ -56,32 +57,11 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
|
||||
return UnsafeReadConsistentIndex(tx)
|
||||
}
|
||||
|
||||
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
|
||||
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
||||
if index == 0 {
|
||||
// Never save 0 as it means that we didn't load the real index yet.
|
||||
return
|
||||
}
|
||||
|
||||
if onlyGrow {
|
||||
oldi, oldTerm := UnsafeReadConsistentIndex(tx)
|
||||
if term < oldTerm {
|
||||
return
|
||||
}
|
||||
if index > oldi {
|
||||
bs1 := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bs1, index)
|
||||
// put the index into the underlying backend
|
||||
// tx has been locked in TxnBegin, so there is no need to lock it again
|
||||
tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)
|
||||
}
|
||||
if term > 0 && term > oldTerm {
|
||||
bs2 := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bs2, term)
|
||||
tx.UnsafePut(Meta, MetaTermKeyName, bs2)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
bs1 := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bs1, index)
|
||||
// put the index into the underlying backend
|
||||
|
@ -67,7 +67,7 @@ func TestValidate(t *testing.T) {
|
||||
version: V3_5,
|
||||
overrideKeys: func(tx backend.BatchTx) {
|
||||
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1)
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -313,14 +313,14 @@ func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx
|
||||
case V3_4:
|
||||
case V3_5:
|
||||
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1)
|
||||
case V3_6:
|
||||
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1)
|
||||
UnsafeSetStorageVersion(tx, &V3_6)
|
||||
case V3_7:
|
||||
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1)
|
||||
UnsafeSetStorageVersion(tx, &V3_7)
|
||||
tx.UnsafePut(Meta, []byte("future-key"), []byte(""))
|
||||
default:
|
||||
|
Loading…
x
Reference in New Issue
Block a user