diff --git a/CHANGELOG-3.5.md b/CHANGELOG-3.5.md index f933c8f98..a9d31eec6 100644 --- a/CHANGELOG-3.5.md +++ b/CHANGELOG-3.5.md @@ -76,7 +76,8 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.4.0...v3.5.0) and ### Storage format changes - [WAL log's snapshots persists raftpb.ConfState](https://github.com/etcd-io/etcd/pull/12735) - [Backend persists raftpb.ConfState](https://github.com/etcd-io/etcd/pull/12962) in the `meta` bucket `confState` key. -- Backend persists downgrade in the `cluster` bucket +- [Backend persists applied term](https://github.com/etcd-io/etcd/pull/) in the `meta` bucket. +- Backend persists `downgrade` in the `cluster` bucket ### Security diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index e3ec04eea..1549fdd6f 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -124,7 +124,7 @@ func handleBackup(c *cli.Context) error { walsnap := saveSnap(lg, destSnap, srcSnap, &desired) metadata, state, ents := translateWAL(lg, srcWAL, walsnap, withV3) - saveDB(lg, destDbPath, srcDbPath, state.Commit, &desired, withV3) + saveDB(lg, destDbPath, srcDbPath, state.Commit, state.Term, &desired, withV3) neww, err := wal.Create(lg, destWAL, pbutil.MustMarshal(&metadata)) if err != nil { @@ -265,7 +265,7 @@ func raftEntryToNoOp(entry *raftpb.Entry) { } // saveDB copies the v3 backend and strips cluster information. -func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) { +func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desired *desiredCluster, v3 bool) { // open src db to safely copy db state if v3 { @@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl tx.Lock() defer tx.Unlock() cindex.UnsafeCreateMetaBucket(tx) - cindex.UnsafeUpdateConsistentIndex(tx, idx, false) + cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false) } else { // Thanks to translateWAL not moving entries, but just replacing them with // 'empty', there is no need to update the consistency index. diff --git a/etcdctl/ctlv3/command/migrate_command.go b/etcdctl/ctlv3/command/migrate_command.go index deb23fb4c..ff6816560 100644 --- a/etcdctl/ctlv3/command/migrate_command.go +++ b/etcdctl/ctlv3/command/migrate_command.go @@ -82,7 +82,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) { writer, reader, errc = defaultTransformer() } - st, index := rebuildStoreV2() + st, index, term := rebuildStoreV2() be := prepareBackend() defer be.Close() @@ -92,7 +92,7 @@ func migrateCommandFunc(cmd *cobra.Command, args []string) { }() readKeys(reader, be) - cindex.UpdateConsistentIndex(be.BatchTx(), index, true) + cindex.UpdateConsistentIndex(be.BatchTx(), index, term, true) err := <-errc if err != nil { fmt.Println("failed to transform keys") @@ -127,8 +127,7 @@ func prepareBackend() backend.Backend { return be } -func rebuildStoreV2() (v2store.Store, uint64) { - var index uint64 +func rebuildStoreV2() (st v2store.Store, index uint64, term uint64) { cl := membership.NewCluster(zap.NewExample()) waldir := migrateWALdir @@ -147,6 +146,7 @@ func rebuildStoreV2() (v2store.Store, uint64) { if snapshot != nil { walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term index = snapshot.Metadata.Index + term = snapshot.Metadata.Term } w, err := wal.OpenForRead(zap.NewExample(), waldir, walsnap) @@ -160,7 +160,7 @@ func rebuildStoreV2() (v2store.Store, uint64) { ExitWithError(ExitError, err) } - st := v2store.New() + st = v2store.New() if snapshot != nil { err := st.Recovery(snapshot.Data) if err != nil { @@ -191,12 +191,13 @@ func rebuildStoreV2() (v2store.Store, uint64) { applyRequest(req, applier) } } - if ent.Index > index { + if ent.Index >= index { index = ent.Index + term = ent.Term } } - return st, index + return st, index, term } func applyConf(cc raftpb.ConfChange, cl *membership.RaftCluster) { diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index e726a59f9..9272a8f0b 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -265,7 +265,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { return err } - if err := s.updateCIndex(hardstate.Commit); err != nil { + if err := s.updateCIndex(hardstate.Commit, hardstate.Term); err != nil { return err } @@ -475,10 +475,10 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) { return &hardState, w.SaveSnapshot(snapshot) } -func (s *v3Manager) updateCIndex(commit uint64) error { +func (s *v3Manager) updateCIndex(commit uint64, term uint64) error { be := backend.NewDefaultBackend(s.outDbPath()) defer be.Close() - cindex.UpdateConsistentIndex(be.BatchTx(), commit, false) + cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false) return nil } diff --git a/server/etcdserver/backend.go b/server/etcdserver/backend.go index 120d0124f..081be2b52 100644 --- a/server/etcdserver/backend.go +++ b/server/etcdserver/backend.go @@ -99,7 +99,7 @@ func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { - consistentIndex = cindex.ReadConsistentIndex(oldbe.BatchTx()) + consistentIndex, _ = cindex.ReadConsistentIndex(oldbe.BatchTx()) } if snapshot.Metadata.Index <= consistentIndex { return oldbe, nil diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 6f8661b6d..5086490f3 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -26,6 +26,7 @@ var ( MetaBucketName = []byte("meta") ConsistentIndexKeyName = []byte("consistent_index") + TermKeyName = []byte("term") ) type Backend interface { @@ -39,7 +40,7 @@ type ConsistentIndexer interface { ConsistentIndex() uint64 // SetConsistentIndex set the consistent index of current executing entry. - SetConsistentIndex(v uint64) + SetConsistentIndex(v uint64, term uint64) // UnsafeSave must be called holding the lock on the tx. // It saves consistentIndex to the underlying stable storage. @@ -52,9 +53,13 @@ type ConsistentIndexer interface { // consistentIndex implements the ConsistentIndexer interface. type consistentIndex struct { // consistentIndex represents the offset of an entry in a consistent replica log. - // it caches the "consistent_index" key's value. + // It caches the "consistent_index" key's value. // Accessed through atomics so must be 64-bit aligned. consistentIndex uint64 + // term represents the RAFT term of committed entry in a consistent replica log. + // Accessed through atomics so must be 64-bit aligned. + // The value is being persisted in the backend since v3.5. + term uint64 // be is used for initial read consistentIndex be Backend @@ -75,18 +80,20 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { ci.mutex.Lock() defer ci.mutex.Unlock() - v := ReadConsistentIndex(ci.be.BatchTx()) - atomic.StoreUint64(&ci.consistentIndex, v) + v, term := ReadConsistentIndex(ci.be.BatchTx()) + ci.SetConsistentIndex(v, term) return v } -func (ci *consistentIndex) SetConsistentIndex(v uint64) { +func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) { atomic.StoreUint64(&ci.consistentIndex, v) + atomic.StoreUint64(&ci.term, term) } func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { index := atomic.LoadUint64(&ci.consistentIndex) - UnsafeUpdateConsistentIndex(tx, index, true) + term := atomic.LoadUint64(&ci.term) + UnsafeUpdateConsistentIndex(tx, index, term, true) } func (ci *consistentIndex) SetBackend(be Backend) { @@ -94,19 +101,23 @@ func (ci *consistentIndex) SetBackend(be Backend) { defer ci.mutex.Unlock() ci.be = be // After the backend is changed, the first access should re-read it. - ci.SetConsistentIndex(0) + ci.SetConsistentIndex(0, 0) } func NewFakeConsistentIndex(index uint64) ConsistentIndexer { return &fakeConsistentIndex{index: index} } -type fakeConsistentIndex struct{ index uint64 } +type fakeConsistentIndex struct { + index uint64 + term uint64 +} func (f *fakeConsistentIndex) ConsistentIndex() uint64 { return f.index } -func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) { +func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) { atomic.StoreUint64(&f.index, index) + atomic.StoreUint64(&f.term, term) } func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} @@ -124,47 +135,61 @@ func CreateMetaBucket(tx backend.BatchTx) { tx.UnsafeCreateBucket(MetaBucketName) } -// unsafeGetConsistentIndex loads consistent index from given transaction. -// returns 0 if the data are not found. -func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 { +// unsafeGetConsistentIndex loads consistent index & term from given transaction. +// returns 0,0 if the data are not found. +// Term is persisted since v3.5. +func unsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { _, vs := tx.UnsafeRange(MetaBucketName, ConsistentIndexKeyName, nil, 0) if len(vs) == 0 { - return 0 + return 0, 0 } v := binary.BigEndian.Uint64(vs[0]) - return v + _, ts := tx.UnsafeRange(MetaBucketName, TermKeyName, nil, 0) + if len(ts) == 0 { + return v, 0 + } + t := binary.BigEndian.Uint64(ts[0]) + return v, t } -// ReadConsistentIndex loads consistent index from given transaction. +// ReadConsistentIndex loads consistent index and term from given transaction. // returns 0 if the data are not found. -func ReadConsistentIndex(tx backend.ReadTx) uint64 { +func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { tx.Lock() defer tx.Unlock() return unsafeReadConsistentIndex(tx) } -func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) { +func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { if index == 0 { // Never save 0 as it means that we didn't loaded the real index yet. return } if onlyGrow { - oldi := unsafeReadConsistentIndex(tx) - if index <= oldi { + oldi, oldTerm := unsafeReadConsistentIndex(tx) + if term < oldTerm { + return + } + if term == oldTerm && index <= oldi { return } } - bs := make([]byte, 8) // this is kept on stack (not heap) so its quick. - binary.BigEndian.PutUint64(bs, index) + bs1 := make([]byte, 8) + binary.BigEndian.PutUint64(bs1, index) // put the index into the underlying backend // tx has been locked in TxnBegin, so there is no need to lock it again - tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs) + tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs1) + if term > 0 { + bs2 := make([]byte, 8) + binary.BigEndian.PutUint64(bs2, term) + tx.UnsafePut(MetaBucketName, TermKeyName, bs2) + } } -func UpdateConsistentIndex(tx backend.BatchTx, index uint64, onlyGrow bool) { +func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { tx.Lock() defer tx.Unlock() - UnsafeUpdateConsistentIndex(tx, index, onlyGrow) + UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) } diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index c500260d3..1e111b9e8 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "go.etcd.io/etcd/server/v3/mvcc/backend" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" ) @@ -38,8 +39,9 @@ func TestConsistentIndex(t *testing.T) { UnsafeCreateMetaBucket(tx) tx.Unlock() be.ForceCommit() - r := rand.Uint64() - ci.SetConsistentIndex(r) + r := uint64(7890123) + term := uint64(234) + ci.SetConsistentIndex(r, term) index := ci.ConsistentIndex() if index != r { t.Errorf("expected %d,got %d", r, index) @@ -54,15 +56,11 @@ func TestConsistentIndex(t *testing.T) { defer b.Close() ci.SetBackend(b) index = ci.ConsistentIndex() - if index != r { - t.Errorf("expected %d,got %d", r, index) - } + assert.Equal(t, r, index) ci = NewConsistentIndex(b) index = ci.ConsistentIndex() - if index != r { - t.Errorf("expected %d,got %d", r, index) - } + assert.Equal(t, r, index) } func TestFakeConsistentIndex(t *testing.T) { @@ -74,7 +72,7 @@ func TestFakeConsistentIndex(t *testing.T) { t.Errorf("expected %d,got %d", r, index) } r = rand.Uint64() - ci.SetConsistentIndex(r) + ci.SetConsistentIndex(r, 5) index = ci.ConsistentIndex() if index != r { t.Errorf("expected %d,got %d", r, index) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index a3ff94b2c..83babf952 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2124,7 +2124,7 @@ func (s *EtcdServer) apply( // set the consistent index of current executing entry if e.Index > s.consistIndex.ConsistentIndex() { - s.consistIndex.SetConsistentIndex(e.Index) + s.consistIndex.SetConsistentIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } @@ -2154,7 +2154,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { index := s.consistIndex.ConsistentIndex() if e.Index > index { // set the consistent index of current executing entry - s.consistIndex.SetConsistentIndex(e.Index) + s.consistIndex.SetConsistentIndex(e.Index, e.Term) shouldApplyV3 = membership.ApplyBoth } s.lg.Debug("apply entry normal", diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index e62507657..07a22e2b0 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -678,6 +678,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { cc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2, Context: b} ents := []raftpb.Entry{{ Index: 2, + Term: 4, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(cc), }} @@ -695,7 +696,9 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv.beHooks.OnPreCommitUnsafe(tx) assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx)) }) - assert.Equal(t, consistIndex, cindex.ReadConsistentIndex(be.BatchTx())) + rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx()) + assert.Equal(t, consistIndex, rindex) + assert.Equal(t, uint64(4), rterm) } func realisticRaftNode(lg *zap.Logger) *raftNode { diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index a166fd5bd..5116b15cb 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -305,6 +305,7 @@ func init() { // consistent index might be changed due to v2 internal sync, which // is not controllable by the user. {Bucket: string(MetaBucketName), Key: string(cindex.ConsistentIndexKeyName)}: {}, + {Bucket: string(MetaBucketName), Key: string(cindex.TermKeyName)}: {}, } } diff --git a/server/mvcc/kvstore_bench_test.go b/server/mvcc/kvstore_bench_test.go index 9ea70dad4..918cecacc 100644 --- a/server/mvcc/kvstore_bench_test.go +++ b/server/mvcc/kvstore_bench_test.go @@ -79,7 +79,7 @@ func BenchmarkConsistentIndex(b *testing.B) { defer betesting.Close(b, be) // This will force the index to be reread from scratch on each call. - ci.SetConsistentIndex(0) + ci.SetConsistentIndex(0, 0) tx := be.BatchTx() tx.Lock() diff --git a/server/verify/verify.go b/server/verify/verify.go index 67efcf60a..f727201ce 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -109,13 +109,20 @@ func MustVerifyIfEnabled(cfg Config) { func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { tx := be.BatchTx() - index := cindex.ReadConsistentIndex(tx) + index, term := cindex.ReadConsistentIndex(tx) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) } + if cfg.ExactIndex && term != hardstate.Term { + return fmt.Errorf("backend.Term (%v) expected == WAL.HardState.term, (%v)", term, hardstate.Term) + } if index > hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) must be <= WAL.HardState.commit (%v)", index, hardstate.Commit) } + if term > hardstate.Term { + return fmt.Errorf("backend.Term (%v) must be <= WAL.HardState.term, (%v)", term, hardstate.Term) + } + if index < snapshot.Index { return fmt.Errorf("backend.ConsistentIndex (%v) must be >= last snapshot index (%v)", index, snapshot.Index) }