Persists Term in the (bbolt) Backend.

Additional layer of protection, that allows to validate whether we
start replaying log not only from the proper 'index', but also of the
right 'term'.
This commit is contained in:
Piotr Tabor
2021-05-13 21:02:02 +02:00
parent e44fb40be5
commit ab586cd463
12 changed files with 89 additions and 53 deletions

View File

@@ -26,6 +26,7 @@ var (
MetaBucketName = []byte("meta")
ConsistentIndexKeyName = []byte("consistent_index")
TermKeyName = []byte("term")
)
type Backend interface {
@@ -39,7 +40,7 @@ type ConsistentIndexer interface {
ConsistentIndex() uint64
// SetConsistentIndex set the consistent index of current executing entry.
SetConsistentIndex(v uint64)
SetConsistentIndex(v uint64, term uint64)
// UnsafeSave must be called holding the lock on the tx.
// It saves consistentIndex to the underlying stable storage.
@@ -52,9 +53,13 @@ type ConsistentIndexer interface {
// consistentIndex implements the ConsistentIndexer interface.
type consistentIndex struct {
// consistentIndex represents the offset of an entry in a consistent replica log.
// it caches the "consistent_index" key's value.
// It caches the "consistent_index" key's value.
// Accessed through atomics so must be 64-bit aligned.
consistentIndex uint64
// term represents the RAFT term of committed entry in a consistent replica log.
// Accessed through atomics so must be 64-bit aligned.
// The value is being persisted in the backend since v3.5.
term uint64
// be is used for initial read consistentIndex
be Backend
@@ -75,18 +80,20 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
ci.mutex.Lock()
defer ci.mutex.Unlock()
v := ReadConsistentIndex(ci.be.BatchTx())
atomic.StoreUint64(&ci.consistentIndex, v)
v, term := ReadConsistentIndex(ci.be.BatchTx())
ci.SetConsistentIndex(v, term)
return v
}
func (ci *consistentIndex) SetConsistentIndex(v uint64) {
func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
atomic.StoreUint64(&ci.consistentIndex, v)
atomic.StoreUint64(&ci.term, term)
}
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
UnsafeUpdateConsistentIndex(tx, index, true)
term := atomic.LoadUint64(&ci.term)
UnsafeUpdateConsistentIndex(tx, index, term, true)
}
func (ci *consistentIndex) SetBackend(be Backend) {
@@ -94,19 +101,23 @@ func (ci *consistentIndex) SetBackend(be Backend) {
defer ci.mutex.Unlock()
ci.be = be
// After the backend is changed, the first access should re-read it.
ci.SetConsistentIndex(0)
ci.SetConsistentIndex(0, 0)
}
func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
return &fakeConsistentIndex{index: index}
}
type fakeConsistentIndex struct{ index uint64 }
type fakeConsistentIndex struct {
index uint64
term uint64
}
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
atomic.StoreUint64(&f.term, term)
}
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
@@ -124,47 +135,61 @@ func CreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(MetaBucketName)
}
// unsafeGetConsistentIndex loads consistent index from given transaction.
// returns 0 if the data are not found.
func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 {
// unsafeGetConsistentIndex loads consistent index & term from given transaction.
// returns 0,0 if the data are not found.
// Term is persisted since v3.5.
func unsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
_, vs := tx.UnsafeRange(MetaBucketName, ConsistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0
return 0, 0
}
v := binary.BigEndian.Uint64(vs[0])
return v
_, ts := tx.UnsafeRange(MetaBucketName, TermKeyName, nil, 0)
if len(ts) == 0 {
return v, 0
}
t := binary.BigEndian.Uint64(ts[0])
return v, t
}
// ReadConsistentIndex loads consistent index from given transaction.
// ReadConsistentIndex loads consistent index and term from given transaction.
// returns 0 if the data are not found.
func ReadConsistentIndex(tx backend.ReadTx) uint64 {
func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
tx.Lock()
defer tx.Unlock()
return unsafeReadConsistentIndex(tx)
}
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
if onlyGrow {
oldi := unsafeReadConsistentIndex(tx)
if index <= oldi {
oldi, oldTerm := unsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if term == oldTerm && index <= oldi {
return
}
}
bs := make([]byte, 8) // this is kept on stack (not heap) so its quick.
binary.BigEndian.PutUint64(bs, index)
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs)
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs1)
if term > 0 {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, term)
tx.UnsafePut(MetaBucketName, TermKeyName, bs2)
}
}
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
tx.Lock()
defer tx.Unlock()
UnsafeUpdateConsistentIndex(tx, index, onlyGrow)
UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
}

View File

@@ -19,6 +19,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
)
@@ -38,8 +39,9 @@ func TestConsistentIndex(t *testing.T) {
UnsafeCreateMetaBucket(tx)
tx.Unlock()
be.ForceCommit()
r := rand.Uint64()
ci.SetConsistentIndex(r)
r := uint64(7890123)
term := uint64(234)
ci.SetConsistentIndex(r, term)
index := ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
@@ -54,15 +56,11 @@ func TestConsistentIndex(t *testing.T) {
defer b.Close()
ci.SetBackend(b)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
assert.Equal(t, r, index)
ci = NewConsistentIndex(b)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
assert.Equal(t, r, index)
}
func TestFakeConsistentIndex(t *testing.T) {
@@ -74,7 +72,7 @@ func TestFakeConsistentIndex(t *testing.T) {
t.Errorf("expected %d,got %d", r, index)
}
r = rand.Uint64()
ci.SetConsistentIndex(r)
ci.SetConsistentIndex(r, 5)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)