From 911204cd7624a4e9f94e0552c11c8cbb8c3771ca Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Wed, 28 Apr 2021 22:46:42 +0200 Subject: [PATCH] Fix `ETCDCTL_API=2 etcdctl backup --with-v3` consistent index consistency Prior to this CL, `ETCDCTL_API=2 etcdctl backup --with-v3` was readacting WAL log (by removal of some entries), but was NOT updating consistent_index in the backend. Also the WAL editing logic was buggy, as it didn't took in consideration the fact that when TERM changes, there can be entries with duplicated indexes in the log. So its NOT sufficient to subtract number of removed entries to get accurate log indexes. The PR replaces removing and shifting of WAL entries with replacing them with an no-op entries. Thanks to this consistent-index references are staying up to date. The PR also: - updates 'verification' logic to check whether consistent_index does not lag befor last snapshot - env-gated execution of verification framework in `etcdctl backup`. Tested with: ``` (./build.sh && cd tests && EXPECT_DEBUG=TRUE 'env' 'go' 'test' '-timeout=300m' 'go.etcd.io/etcd/tests/v3/e2e' -run=TestCtlV2Backup --count=1000 2>&1 | tee TestCtlV2BackupV3.log) ``` --- etcdctl/ctlv2/command/backup_command.go | 56 +++++++++++++++---------- server/etcdserver/cindex/cindex.go | 54 ++++++++++++++++-------- server/etcdserver/cindex/cindex_test.go | 5 +-- server/etcdserver/server.go | 3 ++ server/verify/verify.go | 15 ++++--- 5 files changed, 84 insertions(+), 49 deletions(-) diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index 335fe553f..c8e1fe540 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -32,6 +32,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/verify" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -117,12 +118,13 @@ func handleBackup(c *cli.Context) error { lg.Fatal("failed creating backup snapshot dir", zap.String("dest-snap", destSnap), zap.Error(err)) } + destDbPath := datadir.ToBackendFileName(destDir) + srcDbPath := datadir.ToBackendFileName(srcDir) desired := newDesiredCluster() walsnap := saveSnap(lg, destSnap, srcSnap, &desired) - metadata, state, ents := loadWAL(lg, srcWAL, walsnap, withV3) - destDbPath := datadir.ToBackendFileName(destDir) - saveDB(lg, destDbPath, datadir.ToBackendFileName(srcDir), state.Commit, &desired, withV3) + metadata, state, ents := translateWAL(lg, srcWAL, walsnap, withV3) + saveDB(lg, destDbPath, srcDbPath, state.Commit, &desired, withV3) neww, err := wal.Create(lg, destWAL, pbutil.MustMarshal(&metadata)) if err != nil { @@ -183,7 +185,7 @@ func mustTranslateV2store(lg *zap.Logger, storeData []byte, desired *desiredClus return outputData } -func loadWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) { +func translateWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) { w, err := wal.OpenForRead(lg, srcWAL, walsnap) if err != nil { lg.Fatal("wal.OpenForRead failed", zap.Error(err)) @@ -202,18 +204,17 @@ func loadWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (et re := path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes") memberAttrRE := regexp.MustCompile(re) - removed := uint64(0) - i := 0 - remove := func() { - ents = append(ents[:i], ents[i+1:]...) - removed++ - i-- - } - for i = 0; i < len(ents); i++ { - ents[i].Index -= removed + for i := 0; i < len(ents); i++ { + + // Replacing WAL entries with 'dummy' entries allows to avoid + // complicated entries shifting and risk of other data (like consistent_index) + // running out of sync. + // Also moving entries and computing offsets would get complicated if + // TERM changes (so there are superflous entries from previous term). + if ents[i].Type == raftpb.EntryConfChange { lg.Info("ignoring EntryConfChange raft entry") - remove() + raftEntryToNoOp(&ents[i]) continue } @@ -227,18 +228,20 @@ func loadWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (et } if v2Req != nil && v2Req.Method == "PUT" && memberAttrRE.MatchString(v2Req.Path) { - lg.Info("ignoring member attribute update on", zap.String("v2Req.Path", v2Req.Path)) - remove() + lg.Info("ignoring member attribute update on", + zap.Stringer("entry", &ents[i]), + zap.String("v2Req.Path", v2Req.Path)) + raftEntryToNoOp(&ents[i]) continue } if v2Req != nil { - continue + lg.Debug("preserving log entry", zap.Stringer("entry", &ents[i])) } if raftReq.ClusterMemberAttrSet != nil { lg.Info("ignoring cluster_member_attr_set") - remove() + raftEntryToNoOp(&ents[i]) continue } @@ -247,14 +250,20 @@ func loadWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (et continue } lg.Info("ignoring v3 raft entry") - remove() + raftEntryToNoOp(&ents[i]) } - state.Commit -= removed var metadata etcdserverpb.Metadata pbutil.MustUnmarshal(&metadata, wmetadata) return metadata, state, ents } +func raftEntryToNoOp(entry *raftpb.Entry) { + // Empty (dummy) entries are send by RAFT when new leader is getting elected. + // They do not cary any change to data-model so its safe to replace entries + // to be ignored with them. + *entry = raftpb.Entry{Term: entry.Term, Index: entry.Index, Type: raftpb.EntryNormal, Data: nil} +} + // saveDB copies the v3 backend and strips cluster information. func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) { @@ -272,7 +281,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl select { case src = <-ch: case <-time.After(time.Second): - lg.Fatal("waiting to acquire lock on", zap.String("srcDB", srcDB)) + lg.Fatal("timed out waiting to acquire lock on", zap.String("srcDB", srcDB)) src = <-ch } defer src.Close() @@ -312,10 +321,13 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl tx := be.BatchTx() tx.Lock() defer tx.Unlock() - tx.UnsafeCreateBucket([]byte("meta")) + cindex.UnsafeCreateMetaBucket(tx) ci := cindex.NewConsistentIndex(tx) ci.SetConsistentIndex(idx) ci.UnsafeSave(tx) + } else { + // Thanks to translateWAL not moving entries, but just replacing them with + // 'empty', there is no need to update the consistency index. } } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index e47e186e4..ed5c14b56 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -23,9 +23,9 @@ import ( ) var ( - metaBucketName = []byte("meta") + MetaBucketName = []byte("meta") - consistentIndexKeyName = []byte("consistent_index") + ConsistentIndexKeyName = []byte("consistent_index") ) // ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex. @@ -52,14 +52,11 @@ type consistentIndex struct { // it caches the "consistent_index" key's value. Accessed // through atomics so must be 64-bit aligned. consistentIndex uint64 - // bytesBuf8 is a byte slice of length 8 - // to avoid a repetitive allocation in saveIndex. - bytesBuf8 []byte - mutex sync.Mutex + mutex sync.Mutex } func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer { - return &consistentIndex{tx: tx, bytesBuf8: make([]byte, 8)} + return &consistentIndex{tx: tx} } func (ci *consistentIndex) ConsistentIndex() uint64 { @@ -69,14 +66,7 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { } ci.mutex.Lock() defer ci.mutex.Unlock() - ci.tx.Lock() - defer ci.tx.Unlock() - _, vs := ci.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) - if len(vs) == 0 { - return 0 - } - v := binary.BigEndian.Uint64(vs[0]) - atomic.StoreUint64(&ci.consistentIndex, v) + v := ReadConsistentIndex(ci.tx) return v } @@ -85,11 +75,16 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) { } func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { - bs := ci.bytesBuf8 - binary.BigEndian.PutUint64(bs, ci.consistentIndex) + index := atomic.LoadUint64(&ci.consistentIndex) + if index == 0 { + // Never save 0 as it means that we didn't loaded the real index yet. + return + } + bs := make([]byte, 8) // this is kept on stack (not heap) so its quick. + binary.BigEndian.PutUint64(bs, index) // put the index into the underlying backend // tx has been locked in TxnBegin, so there is no need to lock it again - tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) + tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs) } func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) { @@ -112,3 +107,26 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) { func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {} func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {} + +func UnsafeCreateMetaBucket(tx backend.BatchTx) { + tx.UnsafeCreateBucket(MetaBucketName) +} + +// unsafeGetConsistentIndex loads consistent index from given transaction. +// returns 0 if the data are not found. +func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 { + _, vs := tx.UnsafeRange(MetaBucketName, ConsistentIndexKeyName, nil, 0) + if len(vs) == 0 { + return 0 + } + v := binary.BigEndian.Uint64(vs[0]) + return v +} + +// ReadConsistentIndex loads consistent index from given transaction. +// returns 0 if the data are not found. +func ReadConsistentIndex(tx backend.ReadTx) uint64 { + tx.Lock() + defer tx.Unlock() + return unsafeReadConsistentIndex(tx) +} diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index aa5761c2c..eb577b8fd 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -34,7 +34,7 @@ func TestConsistentIndex(t *testing.T) { t.Fatal("batch tx is nil") } tx.Lock() - tx.UnsafeCreateBucket(metaBucketName) + UnsafeCreateMetaBucket(tx) tx.Unlock() be.ForceCommit() r := rand.Uint64() @@ -50,6 +50,7 @@ func TestConsistentIndex(t *testing.T) { be.Close() b := backend.NewDefaultBackend(tmpPath) + defer b.Close() ci.SetConsistentIndex(0) ci.SetBatchTx(b.BatchTx()) index = ci.ConsistentIndex() @@ -62,8 +63,6 @@ func TestConsistentIndex(t *testing.T) { if index != r { t.Errorf("expected %d,got %d", r, index) } - b.Close() - } func TestFakeConsistentIndex(t *testing.T) { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index bb4ff752e..c0565e3aa 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2256,6 +2256,9 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { clone := s.v2store.Clone() // commit kv to write metadata (for example: consistent index) to disk. + // + // This guarantees that Backend's consistent_index is >= index of last snapshot. + // // KV().commit() updates the consistent index in backend. // All operations that update consistent index must be called sequentially // from applyAll function. diff --git a/server/verify/verify.go b/server/verify/verify.go index b2483cb1c..67efcf60a 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -76,7 +76,7 @@ func Verify(cfg Config) error { be := backend.New(beConfig) defer be.Close() - _, hardstate, err := validateWal(cfg) + snapshot, hardstate, err := validateWal(cfg) if err != nil { return err } @@ -84,7 +84,7 @@ func Verify(cfg Config) error { // TODO: Perform validation of consistency of membership between // backend/members & WAL confstate (and maybe storev2 if still exists). - return validateConsistentIndex(cfg, hardstate, be) + return validateConsistentIndex(cfg, hardstate, snapshot, be) } // VerifyIfEnabled performs verification according to ETCD_VERIFY env settings. @@ -101,22 +101,25 @@ func VerifyIfEnabled(cfg Config) error { // See Verify for more information. func MustVerifyIfEnabled(cfg Config) { if err := VerifyIfEnabled(cfg); err != nil { - cfg.Logger.Panic("Verification failed", + cfg.Logger.Fatal("Verification failed", zap.String("data-dir", cfg.DataDir), zap.Error(err)) } } -func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, be backend.Backend) error { +func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { tx := be.BatchTx() - ci := cindex.NewConsistentIndex(tx) - index := ci.ConsistentIndex() + index := cindex.ReadConsistentIndex(tx) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) } if index > hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) must be <= WAL.HardState.commit (%v)", index, hardstate.Commit) } + if index < snapshot.Index { + return fmt.Errorf("backend.ConsistentIndex (%v) must be >= last snapshot index (%v)", index, snapshot.Index) + } + cfg.Logger.Info("verification: consistentIndex OK", zap.Uint64("backend-consistent-index", index), zap.Uint64("hardstate-commit", hardstate.Commit)) return nil }