mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Save consistency index and term to backend even when they decrease
Reason to store CI and term in backend was to make db fully independent snapshot, it was never meant to interfere with apply logic. Skip of CI was introduced for v2->v3 migration where we wanted to prevent it from decreasing when replaying wal in https://github.com/etcd-io/etcd/pull/5391. By mistake it was added to apply flow during refactor in https://github.com/etcd-io/etcd/pull/12855#commitcomment-70713670. Consistency index and term should only be negotiated and used by raft to make decisions. Their values should only driven by raft state machine and backend should only be responsible for storing them.
This commit is contained in:
parent
a5b9f72da6
commit
1ea53d527e
@ -325,7 +325,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
schema.UnsafeCreateMetaBucket(tx)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, idx, term, false)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, idx, term)
|
||||
} else {
|
||||
// Thanks to translateWAL not moving entries, but just replacing them with
|
||||
// 'empty', there is no need to update the consistency index.
|
||||
|
@ -483,6 +483,6 @@ func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
|
||||
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
|
||||
defer be.Close()
|
||||
|
||||
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false)
|
||||
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term)
|
||||
return nil
|
||||
}
|
||||
|
@ -288,7 +288,7 @@ func createSnapshotAndBackendDB(cfg config.ServerConfig, snapshotTerm, snapshotI
|
||||
// create snapshot db file: "%016x.snap.db"
|
||||
be := serverstorage.OpenBackend(cfg, nil)
|
||||
schema.CreateMetaBucket(be.BatchTx())
|
||||
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), snapshotIndex, snapshotTerm, false)
|
||||
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), snapshotIndex, snapshotTerm)
|
||||
schema.MustUnsafeSaveConfStateToBackend(cfg.Logger, be.BatchTx(), &confState)
|
||||
if err = be.Close(); err != nil {
|
||||
return
|
||||
@ -301,6 +301,6 @@ func createSnapshotAndBackendDB(cfg config.ServerConfig, snapshotTerm, snapshotI
|
||||
// create backend db file
|
||||
be = serverstorage.OpenBackend(cfg, nil)
|
||||
schema.CreateMetaBucket(be.BatchTx())
|
||||
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), 1, 1, false)
|
||||
schema.UnsafeUpdateConsistentIndex(be.BatchTx(), 1, 1)
|
||||
return be.Close()
|
||||
}
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
|
||||
type Backend interface {
|
||||
ReadTx() backend.ReadTx
|
||||
BatchTx() backend.BatchTx
|
||||
}
|
||||
|
||||
// ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex.
|
||||
@ -119,7 +118,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
|
||||
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
|
||||
index := atomic.LoadUint64(&ci.consistentIndex)
|
||||
term := atomic.LoadUint64(&ci.term)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term, true)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term)
|
||||
}
|
||||
|
||||
func (ci *consistentIndex) SetBackend(be Backend) {
|
||||
@ -170,8 +169,8 @@ func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint
|
||||
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
|
||||
func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
|
||||
|
||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
|
||||
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
||||
tx.LockOutsideApply()
|
||||
defer tx.Unlock()
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, index, term)
|
||||
}
|
||||
|
@ -65,6 +65,58 @@ func TestConsistentIndex(t *testing.T) {
|
||||
assert.Equal(t, r, index)
|
||||
}
|
||||
|
||||
func TestConsistentIndexDecrease(t *testing.T) {
|
||||
initIndex := uint64(100)
|
||||
initTerm := uint64(10)
|
||||
|
||||
tcs := []struct {
|
||||
name string
|
||||
index uint64
|
||||
term uint64
|
||||
}{
|
||||
{
|
||||
name: "Decrease term",
|
||||
index: initIndex + 1,
|
||||
term: initTerm - 1,
|
||||
},
|
||||
{
|
||||
name: "Decrease CI",
|
||||
index: initIndex - 1,
|
||||
term: initTerm + 1,
|
||||
},
|
||||
{
|
||||
name: "Decrease CI and term",
|
||||
index: initIndex - 1,
|
||||
term: initTerm - 1,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
schema.UnsafeCreateMetaBucket(tx)
|
||||
schema.UnsafeUpdateConsistentIndex(tx, initIndex, initTerm)
|
||||
tx.Unlock()
|
||||
be.ForceCommit()
|
||||
be.Close()
|
||||
|
||||
be = backend.NewDefaultBackend(zaptest.NewLogger(t), tmpPath)
|
||||
defer be.Close()
|
||||
ci := NewConsistentIndex(be)
|
||||
ci.SetConsistentIndex(tc.index, tc.term)
|
||||
tx = be.BatchTx()
|
||||
tx.Lock()
|
||||
ci.UnsafeSave(tx)
|
||||
tx.Unlock()
|
||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||
|
||||
ci = NewConsistentIndex(be)
|
||||
assert.Equal(t, tc.index, ci.ConsistentIndex())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFakeConsistentIndex(t *testing.T) {
|
||||
|
||||
r := rand.Uint64()
|
||||
|
@ -16,6 +16,7 @@ package schema
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
)
|
||||
|
||||
@ -56,32 +57,11 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
|
||||
return UnsafeReadConsistentIndex(tx)
|
||||
}
|
||||
|
||||
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
|
||||
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
|
||||
if index == 0 {
|
||||
// Never save 0 as it means that we didn't load the real index yet.
|
||||
return
|
||||
}
|
||||
|
||||
if onlyGrow {
|
||||
oldi, oldTerm := UnsafeReadConsistentIndex(tx)
|
||||
if term < oldTerm {
|
||||
return
|
||||
}
|
||||
if index > oldi {
|
||||
bs1 := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bs1, index)
|
||||
// put the index into the underlying backend
|
||||
// tx has been locked in TxnBegin, so there is no need to lock it again
|
||||
tx.UnsafePut(Meta, MetaConsistentIndexKeyName, bs1)
|
||||
}
|
||||
if term > 0 && term > oldTerm {
|
||||
bs2 := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bs2, term)
|
||||
tx.UnsafePut(Meta, MetaTermKeyName, bs2)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
bs1 := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(bs1, index)
|
||||
// put the index into the underlying backend
|
||||
|
@ -67,7 +67,7 @@ func TestValidate(t *testing.T) {
|
||||
version: V3_5,
|
||||
overrideKeys: func(tx backend.BatchTx) {
|
||||
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1)
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -313,14 +313,14 @@ func setupBackendData(t *testing.T, version semver.Version, overrideKeys func(tx
|
||||
case V3_4:
|
||||
case V3_5:
|
||||
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1)
|
||||
case V3_6:
|
||||
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1)
|
||||
UnsafeSetStorageVersion(tx, &V3_6)
|
||||
case V3_7:
|
||||
MustUnsafeSaveConfStateToBackend(zap.NewNop(), tx, &raftpb.ConfState{})
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1, false)
|
||||
UnsafeUpdateConsistentIndex(tx, 1, 1)
|
||||
UnsafeSetStorageVersion(tx, &V3_7)
|
||||
tx.UnsafePut(Meta, []byte("future-key"), []byte(""))
|
||||
default:
|
||||
|
Loading…
x
Reference in New Issue
Block a user