From ab586cd4631048ed808d96a750896eeb0d7c0cd1 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Thu, 13 May 2021 21:02:02 +0200 Subject: [PATCH] Persists Term in the (bbolt) Backend. Additional layer of protection, that allows to validate whether we start replaying log not only from the proper 'index', but also of the right 'term'. --- CHANGELOG-3.5.md | 3 +- etcdctl/ctlv2/command/backup_command.go | 6 +- etcdctl/ctlv3/command/migrate_command.go | 15 ++--- etcdctl/snapshot/v3_snapshot.go | 6 +- server/etcdserver/backend.go | 2 +- server/etcdserver/cindex/cindex.go | 73 ++++++++++++++++-------- server/etcdserver/cindex/cindex_test.go | 16 +++--- server/etcdserver/server.go | 4 +- server/etcdserver/server_test.go | 5 +- server/mvcc/kvstore.go | 1 + server/mvcc/kvstore_bench_test.go | 2 +- server/verify/verify.go | 9 ++- 12 files changed, 89 insertions(+), 53 deletions(-) diff --git a/CHANGELOG-3.5.md b/CHANGELOG-3.5.md index f933c8f98..a9d31eec6 100644 --- a/CHANGELOG-3.5.md +++ b/CHANGELOG-3.5.md @@ -76,7 +76,8 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and ### Storage format changes - [WAL log's snapshots persists raftpb.ConfState](https://github.com/etcd-io/etcd/pull/12735) - [Backend persists raftpb.ConfState](https://github.com/etcd-io/etcd/pull/12962) in the `meta` bucket `confState` key. -- Backend persists downgrade in the `cluster` bucket +- [Backend persists applied term](https://github.com/etcd-io/etcd/pull/) in the `meta` bucket. +- Backend persists `downgrade` in the `cluster` bucket ### Security diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index e3ec04eea..1549fdd6f 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -124,7 +124,7 @@ func handleBackup(c *cli.Context) error { walsnap := saveSnap(lg, destSnap, srcSnap, &desired) metadata, state, ents := translateWAL(lg, srcWAL, walsnap, withV3) - saveDB(lg, destDbPath, srcDbPath, state.Commit, &desired, withV3) + saveDB(lg, destDbPath, srcDbPath, state.Commit, state.Term, &desired, withV3) neww, err := wal.Create(lg, destWAL, pbutil.MustMarshal(&metadata)) if err != nil { @@ -265,7 +265,7 @@ func raftEntryToNoOp(entry *raftpb.Entry) { } // saveDB copies the v3 backend and strips cluster information. -func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) { +func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desired *desiredCluster, v3 bool) { // open src db to safely copy db state if v3 { @@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl tx.Lock() defer tx.Unlock() cindex.UnsafeCreateMetaBucket(tx) - cindex.UnsafeUpdateConsistentIndex(tx, idx, false) + cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false) } else { // Thanks to translateWAL not moving entries, but just replacing them with // 'empty', there is no need to update the consistency index. diff --git a/etcdctl/ctlv3/command/migrate_command.go b/etcdctl/ctlv3/command/migrate_command.go index deb23fb4c..ff6816560 100644 --- a/etcdctl/ctlv3/command/migrate_command.go +++ b/etcdctl/ctlv3/command/migrate_command.go @@ -82,7 +82,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) { writer, reader, errc = defaultTransformer() } - st, index := rebuildStoreV2() + st, index, term := rebuildStoreV2() be := prepareBackend() defer be.Close() @@ -92,7 +92,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) { }() readKeys(reader, be) - cindex.UpdateConsistentIndex(be.BatchTx(), index, true) + cindex.UpdateConsistentIndex(be.BatchTx(), index, term, true) err := <-errc if err != nil { fmt.Println("failed to transform keys") @@ -127,8 +127,7 @@ func prepareBackend() backend.Backend { return be } -func rebuildStoreV2() (v2store.Store, uint64) { - var index uint64 +func rebuildStoreV2() (st v2store.Store, index uint64, term uint64) { cl := membership.NewCluster(zap.NewExample()) waldir := migrateWALdir @@ -147,6 +146,7 @@ func rebuildStoreV2() (v2store.Store, uint64) { if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term index = snapshot.Metadata.Index + term = snapshot.Metadata.Term } w, err := wal.OpenForRead(zap.NewExample(), waldir, walsnap) @@ -160,7 +160,7 @@ func rebuildStoreV2() (v2store.Store, uint64) { ExitWithError(ExitError, err) } - st := v2store.New() + st = v2store.New() if snapshot != nil { err := st.Recovery(snapshot.Data) if err != nil { @@ -191,12 +191,13 @@ func rebuildStoreV2() (v2store.Store, uint64) { applyRequest(req, applier) } } - if ent.Index > index { + if ent.Index >= index { index = ent.Index + term = ent.Term } } - return st, index + return st, index, term } func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) { diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index e726a59f9..9272a8f0b 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -265,7 +265,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { return err } - if err := s.updateCIndex(hardstate.Commit); err != nil { + if err := s.updateCIndex(hardstate.Commit, hardstate.Term); err != nil { return err } @@ -475,10 +475,10 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) { return &hardState, w.SaveSnapshot(snapshot) } -func (s *v3Manager) updateCIndex(commit uint64) error { +func (s *v3Manager) updateCIndex(commit uint64, term uint64) error { be := backend.NewDefaultBackend(s.outDbPath()) defer be.Close() - cindex.UpdateConsistentIndex(be.BatchTx(), commit, false) + cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false) return nil } diff --git a/server/etcdserver/backend.go b/server/etcdserver/backend.go index 120d0124f..081be2b52 100644 --- a/server/etcdserver/backend.go +++ b/server/etcdserver/backend.go @@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - consistentIndex = cindex.ReadConsistentIndex(oldbe.BatchTx()) + consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 6f8661b6d..5086490f3 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -26,6 +26,7 @@ var ( MetaBucketName = []byte("meta") ConsistentIndexKeyName = []byte("consistent_index") + TermKeyName = []byte("term") ) type Backend interface { @@ -39,7 +40,7 @@ type ConsistentIndexer interface { ConsistentIndex() uint64 // SetConsistentIndex set the consistent index of current executing entry. - SetConsistentIndex(v uint64) + SetConsistentIndex(v uint64, term uint64) // UnsafeSave must be called holding the lock on the tx. // It saves consistentIndex to the underlying stable storage. @@ -52,9 +53,13 @@ type ConsistentIndexer interface { // consistentIndex implements the ConsistentIndexer interface. type consistentIndex struct { // consistentIndex represents the offset of an entry in a consistent replica log. - // it caches the "consistent_index" key's value. + // It caches the "consistent_index" key's value. // Accessed through atomics so must be 64-bit aligned. consistentIndex uint64 + // term represents the RAFT term of committed entry in a consistent replica log. + // Accessed through atomics so must be 64-bit aligned. + // The value is being persisted in the backend since v3.5. + term uint64 // be is used for initial read consistentIndex be Backend @@ -75,18 +80,20 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v := ReadConsistentIndex(ci.be.BatchTx()) - atomic.StoreUint64(&ci.consistentIndex, v) + v, term := ReadConsistentIndex(ci.be.BatchTx()) + ci.SetConsistentIndex(v, term) return v } -func (ci *consistentIndex) SetConsistentIndex(v uint64) { +func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) { atomic.StoreUint64(&ci.consistentIndex, v) + atomic.StoreUint64(&ci.term, term) } func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { index := atomic.LoadUint64(&ci.consistentIndex) - UnsafeUpdateConsistentIndex(tx, index, true) + term := atomic.LoadUint64(&ci.term) + UnsafeUpdateConsistentIndex(tx, index, term, true) } func (ci *consistentIndex) SetBackend(be Backend) { @@ -94,19 +101,23 @@ func (ci *consistentIndex) SetBackend(be Backend) { defer ci.mutex.Unlock() ci.be = be // After the backend is changed, the first access should re-read it. - ci.SetConsistentIndex(0) + ci.SetConsistentIndex(0, 0) } func NewFakeConsistentIndex(index uint64) ConsistentIndexer { return &fakeConsistentIndex{index: index} } -type fakeConsistentIndex struct{ index uint64 } +type fakeConsistentIndex struct { + index uint64 + term uint64 +} func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } -func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) { +func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) + atomic.StoreUint64(&f.term, term) } func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} @@ -124,47 +135,61 @@ func CreateMetaBucket(tx backend.BatchTx) { tx.UnsafeCreateBucket(MetaBucketName) } -// unsafeGetConsistentIndex loads consistent index from given transaction. -// returns 0 if the data are not found. -func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 { +// unsafeGetConsistentIndex loads consistent index & term from given transaction. +// returns 0,0 if the data are not found. +// Term is persisted since v3.5. +func unsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { _, vs := tx.UnsafeRange(MetaBucketName, ConsistentIndexKeyName, nil, 0) if len(vs) == 0 { - return 0 + return 0, 0 } v := binary.BigEndian.Uint64(vs[0]) - return v + _, ts := tx.UnsafeRange(MetaBucketName, TermKeyName, nil, 0) + if len(ts) == 0 { + return v, 0 + } + t := binary.BigEndian.Uint64(ts[0]) + return v, t } -// ReadConsistentIndex loads consistent index from given transaction. +// ReadConsistentIndex loads consistent index and term from given transaction. // returns 0 if the data are not found. -func ReadConsistentIndex(tx backend.ReadTx) uint64 { +func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { tx.Lock() defer tx.Unlock() return unsafeReadConsistentIndex(tx) } -func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) { +func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { if index == 0 { // Never save 0 as it means that we didn't loaded the real index yet. return } if onlyGrow { - oldi := unsafeReadConsistentIndex(tx) - if index <= oldi { + oldi, oldTerm := unsafeReadConsistentIndex(tx) + if term < oldTerm { + return + } + if term == oldTerm && index <= oldi { return } } - bs := make([]byte, 8) // this is kept on stack (not heap) so its quick. - binary.BigEndian.PutUint64(bs, index) + bs1 := make([]byte, 8) + binary.BigEndian.PutUint64(bs1, index) // put the index into the underlying backend // tx has been locked in TxnBegin, so there is no need to lock it again - tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs) + tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs1) + if term > 0 { + bs2 := make([]byte, 8) + binary.BigEndian.PutUint64(bs2, term) + tx.UnsafePut(MetaBucketName, TermKeyName, bs2) + } } -func UpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) { +func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { tx.Lock() defer tx.Unlock() - UnsafeUpdateConsistentIndex(tx, index, onlyGrow) + UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index c500260d3..1e111b9e8 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/server/v3/mvcc/backend" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" ) @@ -38,8 +39,9 @@ func TestConsistentIndex(t *testing.T) { UnsafeCreateMetaBucket(tx) tx.Unlock() be.ForceCommit() - r := rand.Uint64() - ci.SetConsistentIndex(r) + r := uint64(7890123) + term := uint64(234) + ci.SetConsistentIndex(r, term) index := ci.ConsistentIndex() if index != r { t.Errorf("expected %d,got %d", r, index) @@ -54,15 +56,11 @@ func TestConsistentIndex(t *testing.T) { defer b.Close() ci.SetBackend(b) index = ci.ConsistentIndex() - if index != r { - t.Errorf("expected %d,got %d", r, index) - } + assert.Equal(t, r, index) ci = NewConsistentIndex(b) index = ci.ConsistentIndex() - if index != r { - t.Errorf("expected %d,got %d", r, index) - } + assert.Equal(t, r, index) } func TestFakeConsistentIndex(t *testing.T) { @@ -74,7 +72,7 @@ func TestFakeConsistentIndex(t *testing.T) { t.Errorf("expected %d,got %d", r, index) } r = rand.Uint64() - ci.SetConsistentIndex(r) + ci.SetConsistentIndex(r, 5) index = ci.ConsistentIndex() if index != r { t.Errorf("expected %d,got %d", r, index) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a3ff94b2c..83babf952 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2124,7 +2124,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index) + s.consistIndex.SetConsistentIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -2154,7 +2154,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index) + s.consistIndex.SetConsistentIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index e62507657..07a22e2b0 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -678,6 +678,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { cc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: b} ents := []raftpb.Entry{{ Index: 2, + Term: 4, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(cc), }} @@ -695,7 +696,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx)) }) - assert.Equal(t, consistIndex, cindex.ReadConsistentIndex(be.BatchTx())) + rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx()) + assert.Equal(t, consistIndex, rindex) + assert.Equal(t, uint64(4), rterm) } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index a166fd5bd..5116b15cb 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -305,6 +305,7 @@ func init() { // consistent index might be changed due to v2 internal sync, which // is not controllable by the user. {Bucket: string(MetaBucketName), Key: string(cindex.ConsistentIndexKeyName)}: {}, + {Bucket: string(MetaBucketName), Key: string(cindex.TermKeyName)}: {}, } } diff --git a/server/mvcc/kvstore_bench_test.go b/server/mvcc/kvstore_bench_test.go index 9ea70dad4..918cecacc 100644 --- a/server/mvcc/kvstore_bench_test.go +++ b/server/mvcc/kvstore_bench_test.go @@ -79,7 +79,7 @@ func BenchmarkConsistentIndex(b *testing.B) { defer betesting.Close(b, be) // This will force the index to be reread from scratch on each call. - ci.SetConsistentIndex(0) + ci.SetConsistentIndex(0, 0) tx := be.BatchTx() tx.Lock() diff --git a/server/verify/verify.go b/server/verify/verify.go index 67efcf60a..f727201ce 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -109,13 +109,20 @@ func MustVerifyIfEnabled(cfg Config) { func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { tx := be.BatchTx() - index := cindex.ReadConsistentIndex(tx) + index, term := cindex.ReadConsistentIndex(tx) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) } + if cfg.ExactIndex && term != hardstate.Term { + return fmt.Errorf("backend.Term (%v) expected == WAL.HardState.term, (%v)", term, hardstate.Term) + } if index > hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) must be <= WAL.HardState.commit (%v)", index, hardstate.Commit) } + if term > hardstate.Term { + return fmt.Errorf("backend.Term (%v) must be <= WAL.HardState.term, (%v)", term, hardstate.Term) + } + if index < snapshot.Index { return fmt.Errorf("backend.ConsistentIndex (%v) must be >= last snapshot index (%v)", index, snapshot.Index) }