Merge pull request #12964 from ptabor/main

Persists Term in the (bbolt) Backend
This commit is contained in:
Piotr Tabor 2021-05-14 06:35:21 +02:00 committed by GitHub
commit eae7a845a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 89 additions and 53 deletions

View File

@ -76,7 +76,8 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and
### Storage format changes
- [WAL log's snapshots persists raftpb.ConfState](https://github.com/etcd-io/etcd/pull/12735)
- [Backend persists raftpb.ConfState](https://github.com/etcd-io/etcd/pull/12962) in the `meta` bucket `confState` key.
- Backend persists downgrade in the `cluster` bucket
- [Backend persists applied term](https://github.com/etcd-io/etcd/pull/) in the `meta` bucket.
- Backend persists `downgrade` in the `cluster` bucket
### Security

View File

@ -124,7 +124,7 @@ func handleBackup(c *cli.Context) error {
walsnap := saveSnap(lg, destSnap, srcSnap, &desired)
metadata, state, ents := translateWAL(lg, srcWAL, walsnap, withV3)
saveDB(lg, destDbPath, srcDbPath, state.Commit, &desired, withV3)
saveDB(lg, destDbPath, srcDbPath, state.Commit, state.Term, &desired, withV3)
neww, err := wal.Create(lg, destWAL, pbutil.MustMarshal(&metadata))
if err != nil {
@ -265,7 +265,7 @@ func raftEntryToNoOp(entry *raftpb.Entry) {
}
// saveDB copies the v3 backend and strips cluster information.
func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) {
func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desired *desiredCluster, v3 bool) {
// open src db to safely copy db state
if v3 {
@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl
tx.Lock()
defer tx.Unlock()
cindex.UnsafeCreateMetaBucket(tx)
cindex.UnsafeUpdateConsistentIndex(tx, idx, false)
cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false)
} else {
// Thanks to translateWAL not moving entries, but just replacing them with
// 'empty', there is no need to update the consistency index.

View File

@ -82,7 +82,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
writer, reader, errc = defaultTransformer()
}
st, index := rebuildStoreV2()
st, index, term := rebuildStoreV2()
be := prepareBackend()
defer be.Close()
@ -92,7 +92,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) {
}()
readKeys(reader, be)
cindex.UpdateConsistentIndex(be.BatchTx(), index, true)
cindex.UpdateConsistentIndex(be.BatchTx(), index, term, true)
err := <-errc
if err != nil {
fmt.Println("failed to transform keys")
@ -127,8 +127,7 @@ func prepareBackend() backend.Backend {
return be
}
func rebuildStoreV2() (v2store.Store, uint64) {
var index uint64
func rebuildStoreV2() (st v2store.Store, index uint64, term uint64) {
cl := membership.NewCluster(zap.NewExample())
waldir := migrateWALdir
@ -147,6 +146,7 @@ func rebuildStoreV2() (v2store.Store, uint64) {
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
index = snapshot.Metadata.Index
term = snapshot.Metadata.Term
}
w, err := wal.OpenForRead(zap.NewExample(), waldir, walsnap)
@ -160,7 +160,7 @@ func rebuildStoreV2() (v2store.Store, uint64) {
ExitWithError(ExitError, err)
}
st := v2store.New()
st = v2store.New()
if snapshot != nil {
err := st.Recovery(snapshot.Data)
if err != nil {
@ -191,12 +191,13 @@ func rebuildStoreV2() (v2store.Store, uint64) {
applyRequest(req, applier)
}
}
if ent.Index > index {
if ent.Index >= index {
index = ent.Index
term = ent.Term
}
}
return st, index
return st, index, term
}
func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) {

View File

@ -265,7 +265,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error {
return err
}
if err := s.updateCIndex(hardstate.Commit); err != nil {
if err := s.updateCIndex(hardstate.Commit, hardstate.Term); err != nil {
return err
}
@ -475,10 +475,10 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
return &hardState, w.SaveSnapshot(snapshot)
}
func (s *v3Manager) updateCIndex(commit uint64) error {
func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
be := backend.NewDefaultBackend(s.outDbPath())
defer be.Close()
cindex.UpdateConsistentIndex(be.BatchTx(), commit, false)
cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false)
return nil
}

View File

@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend {
func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) {
consistentIndex := uint64(0)
if beExist {
consistentIndex = cindex.ReadConsistentIndex(oldbe.BatchTx())
consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx())
}
if snapshot.Metadata.Index <= consistentIndex {
return oldbe, nil

View File

@ -26,6 +26,7 @@ var (
MetaBucketName = []byte("meta")
ConsistentIndexKeyName = []byte("consistent_index")
TermKeyName = []byte("term")
)
type Backend interface {
@ -39,7 +40,7 @@ type ConsistentIndexer interface {
ConsistentIndex() uint64
// SetConsistentIndex set the consistent index of current executing entry.
SetConsistentIndex(v uint64)
SetConsistentIndex(v uint64, term uint64)
// UnsafeSave must be called holding the lock on the tx.
// It saves consistentIndex to the underlying stable storage.
@ -52,9 +53,13 @@ type ConsistentIndexer interface {
// consistentIndex implements the ConsistentIndexer interface.
type consistentIndex struct {
// consistentIndex represents the offset of an entry in a consistent replica log.
// it caches the "consistent_index" key's value.
// It caches the "consistent_index" key's value.
// Accessed through atomics so must be 64-bit aligned.
consistentIndex uint64
// term represents the RAFT term of committed entry in a consistent replica log.
// Accessed through atomics so must be 64-bit aligned.
// The value is being persisted in the backend since v3.5.
term uint64
// be is used for initial read consistentIndex
be Backend
@ -75,18 +80,20 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
ci.mutex.Lock()
defer ci.mutex.Unlock()
v := ReadConsistentIndex(ci.be.BatchTx())
atomic.StoreUint64(&ci.consistentIndex, v)
v, term := ReadConsistentIndex(ci.be.BatchTx())
ci.SetConsistentIndex(v, term)
return v
}
func (ci *consistentIndex) SetConsistentIndex(v uint64) {
func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
atomic.StoreUint64(&ci.consistentIndex, v)
atomic.StoreUint64(&ci.term, term)
}
func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)
UnsafeUpdateConsistentIndex(tx, index, true)
term := atomic.LoadUint64(&ci.term)
UnsafeUpdateConsistentIndex(tx, index, term, true)
}
func (ci *consistentIndex) SetBackend(be Backend) {
@ -94,19 +101,23 @@ func (ci *consistentIndex) SetBackend(be Backend) {
defer ci.mutex.Unlock()
ci.be = be
// After the backend is changed, the first access should re-read it.
ci.SetConsistentIndex(0)
ci.SetConsistentIndex(0, 0)
}
func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
return &fakeConsistentIndex{index: index}
}
type fakeConsistentIndex struct{ index uint64 }
type fakeConsistentIndex struct {
index uint64
term uint64
}
func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index }
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
atomic.StoreUint64(&f.index, index)
atomic.StoreUint64(&f.term, term)
}
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
@ -124,47 +135,61 @@ func CreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(MetaBucketName)
}
// unsafeGetConsistentIndex loads consistent index from given transaction.
// returns 0 if the data are not found.
func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 {
// unsafeGetConsistentIndex loads consistent index & term from given transaction.
// returns 0,0 if the data are not found.
// Term is persisted since v3.5.
func unsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
_, vs := tx.UnsafeRange(MetaBucketName, ConsistentIndexKeyName, nil, 0)
if len(vs) == 0 {
return 0
return 0, 0
}
v := binary.BigEndian.Uint64(vs[0])
return v
_, ts := tx.UnsafeRange(MetaBucketName, TermKeyName, nil, 0)
if len(ts) == 0 {
return v, 0
}
t := binary.BigEndian.Uint64(ts[0])
return v, t
}
// ReadConsistentIndex loads consistent index from given transaction.
// ReadConsistentIndex loads consistent index and term from given transaction.
// returns 0 if the data are not found.
func ReadConsistentIndex(tx backend.ReadTx) uint64 {
func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
tx.Lock()
defer tx.Unlock()
return unsafeReadConsistentIndex(tx)
}
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
}
if onlyGrow {
oldi := unsafeReadConsistentIndex(tx)
if index <= oldi {
oldi, oldTerm := unsafeReadConsistentIndex(tx)
if term < oldTerm {
return
}
if term == oldTerm && index <= oldi {
return
}
}
bs := make([]byte, 8) // this is kept on stack (not heap) so its quick.
binary.BigEndian.PutUint64(bs, index)
bs1 := make([]byte, 8)
binary.BigEndian.PutUint64(bs1, index)
// put the index into the underlying backend
// tx has been locked in TxnBegin, so there is no need to lock it again
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs)
tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs1)
if term > 0 {
bs2 := make([]byte, 8)
binary.BigEndian.PutUint64(bs2, term)
tx.UnsafePut(MetaBucketName, TermKeyName, bs2)
}
}
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) {
func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) {
tx.Lock()
defer tx.Unlock()
UnsafeUpdateConsistentIndex(tx, index, onlyGrow)
UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow)
}

View File

@ -19,6 +19,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
)
@ -38,8 +39,9 @@ func TestConsistentIndex(t *testing.T) {
UnsafeCreateMetaBucket(tx)
tx.Unlock()
be.ForceCommit()
r := rand.Uint64()
ci.SetConsistentIndex(r)
r := uint64(7890123)
term := uint64(234)
ci.SetConsistentIndex(r, term)
index := ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
@ -54,15 +56,11 @@ func TestConsistentIndex(t *testing.T) {
defer b.Close()
ci.SetBackend(b)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
assert.Equal(t, r, index)
ci = NewConsistentIndex(b)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)
}
assert.Equal(t, r, index)
}
func TestFakeConsistentIndex(t *testing.T) {
@ -74,7 +72,7 @@ func TestFakeConsistentIndex(t *testing.T) {
t.Errorf("expected %d,got %d", r, index)
}
r = rand.Uint64()
ci.SetConsistentIndex(r)
ci.SetConsistentIndex(r, 5)
index = ci.ConsistentIndex()
if index != r {
t.Errorf("expected %d,got %d", r, index)

View File

@ -2124,7 +2124,7 @@ func (s *EtcdServer) apply(
// set the consistent index of current executing entry
if e.Index > s.consistIndex.ConsistentIndex() {
s.consistIndex.SetConsistentIndex(e.Index)
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
@ -2154,7 +2154,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.SetConsistentIndex(e.Index)
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
s.lg.Debug("apply entry normal",

View File

@ -678,6 +678,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
cc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: b}
ents := []raftpb.Entry{{
Index: 2,
Term: 4,
Type: raftpb.EntryConfChange,
Data: pbutil.MustMarshal(cc),
}}
@ -695,7 +696,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
srv.beHooks.OnPreCommitUnsafe(tx)
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx))
})
assert.Equal(t, consistIndex, cindex.ReadConsistentIndex(be.BatchTx()))
rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx())
assert.Equal(t, consistIndex, rindex)
assert.Equal(t, uint64(4), rterm)
}
func realisticRaftNode(lg *zap.Logger) *raftNode {

View File

@ -305,6 +305,7 @@ func init() {
// consistent index might be changed due to v2 internal sync, which
// is not controllable by the user.
{Bucket: string(MetaBucketName), Key: string(cindex.ConsistentIndexKeyName)}: {},
{Bucket: string(MetaBucketName), Key: string(cindex.TermKeyName)}: {},
}
}

View File

@ -79,7 +79,7 @@ func BenchmarkConsistentIndex(b *testing.B) {
defer betesting.Close(b, be)
// This will force the index to be reread from scratch on each call.
ci.SetConsistentIndex(0)
ci.SetConsistentIndex(0, 0)
tx := be.BatchTx()
tx.Lock()

View File

@ -109,13 +109,20 @@ func MustVerifyIfEnabled(cfg Config) {
func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error {
tx := be.BatchTx()
index := cindex.ReadConsistentIndex(tx)
index, term := cindex.ReadConsistentIndex(tx)
if cfg.ExactIndex && index != hardstate.Commit {
return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit)
}
if cfg.ExactIndex && term != hardstate.Term {
return fmt.Errorf("backend.Term (%v) expected == WAL.HardState.term, (%v)", term, hardstate.Term)
}
if index > hardstate.Commit {
return fmt.Errorf("backend.ConsistentIndex (%v) must be <= WAL.HardState.commit (%v)", index, hardstate.Commit)
}
if term > hardstate.Term {
return fmt.Errorf("backend.Term (%v) must be <= WAL.HardState.term, (%v)", term, hardstate.Term)
}
if index < snapshot.Index {
return fmt.Errorf("backend.ConsistentIndex (%v) must be >= last snapshot index (%v)", index, snapshot.Index)
}