diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index 335fe553f..c8e1fe540 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -32,6 +32,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/verify" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -117,12 +118,13 @@ func handleBackup(c *cli.Context) error { lg.Fatal("failed creating backup snapshot dir", zap.String("dest-snap", destSnap), zap.Error(err)) } + destDbPath := datadir.ToBackendFileName(destDir) + srcDbPath := datadir.ToBackendFileName(srcDir) desired := newDesiredCluster() walsnap := saveSnap(lg, destSnap, srcSnap, &desired) - metadata, state, ents := loadWAL(lg, srcWAL, walsnap, withV3) - destDbPath := datadir.ToBackendFileName(destDir) - saveDB(lg, destDbPath, datadir.ToBackendFileName(srcDir), state.Commit, &desired, withV3) + metadata, state, ents := translateWAL(lg, srcWAL, walsnap, withV3) + saveDB(lg, destDbPath, srcDbPath, state.Commit, &desired, withV3) neww, err := wal.Create(lg, destWAL, pbutil.MustMarshal(&metadata)) if err != nil { @@ -183,7 +185,7 @@ func mustTranslateV2store(lg *zap.Logger, storeData []byte, desired *desiredClus return outputData } -func loadWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) { +func translateWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) { w, err := wal.OpenForRead(lg, srcWAL, walsnap) if err != nil { lg.Fatal("wal.OpenForRead failed", zap.Error(err)) @@ -202,18 +204,17 @@ func loadWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (et re := path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes") memberAttrRE := regexp.MustCompile(re) - removed := uint64(0) - i := 0 - remove := func() { - ents = append(ents[:i], ents[i+1:]...) - removed++ - i-- - } - for i = 0; i < len(ents); i++ { - ents[i].Index -= removed + for i := 0; i < len(ents); i++ { + + // Replacing WAL entries with 'dummy' entries allows to avoid + // complicated entries shifting and risk of other data (like consistent_index) + // running out of sync. + // Also moving entries and computing offsets would get complicated if + // TERM changes (so there are superflous entries from previous term). + if ents[i].Type == raftpb.EntryConfChange { lg.Info("ignoring EntryConfChange raft entry") - remove() + raftEntryToNoOp(&ents[i]) continue } @@ -227,18 +228,20 @@ func loadWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (et } if v2Req != nil && v2Req.Method == "PUT" && memberAttrRE.MatchString(v2Req.Path) { - lg.Info("ignoring member attribute update on", zap.String("v2Req.Path", v2Req.Path)) - remove() + lg.Info("ignoring member attribute update on", + zap.Stringer("entry", &ents[i]), + zap.String("v2Req.Path", v2Req.Path)) + raftEntryToNoOp(&ents[i]) continue } if v2Req != nil { - continue + lg.Debug("preserving log entry", zap.Stringer("entry", &ents[i])) } if raftReq.ClusterMemberAttrSet != nil { lg.Info("ignoring cluster_member_attr_set") - remove() + raftEntryToNoOp(&ents[i]) continue } @@ -247,14 +250,20 @@ func loadWAL(lg *zap.Logger, srcWAL string, walsnap walpb.Snapshot, v3 bool) (et continue } lg.Info("ignoring v3 raft entry") - remove() + raftEntryToNoOp(&ents[i]) } - state.Commit -= removed var metadata etcdserverpb.Metadata pbutil.MustUnmarshal(&metadata, wmetadata) return metadata, state, ents } +func raftEntryToNoOp(entry *raftpb.Entry) { + // Empty (dummy) entries are send by RAFT when new leader is getting elected. + // They do not cary any change to data-model so its safe to replace entries + // to be ignored with them. + *entry = raftpb.Entry{Term: entry.Term, Index: entry.Index, Type: raftpb.EntryNormal, Data: nil} +} + // saveDB copies the v3 backend and strips cluster information. func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) { @@ -272,7 +281,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl select { case src = <-ch: case <-time.After(time.Second): - lg.Fatal("waiting to acquire lock on", zap.String("srcDB", srcDB)) + lg.Fatal("timed out waiting to acquire lock on", zap.String("srcDB", srcDB)) src = <-ch } defer src.Close() @@ -312,10 +321,13 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCl tx := be.BatchTx() tx.Lock() defer tx.Unlock() - tx.UnsafeCreateBucket([]byte("meta")) + cindex.UnsafeCreateMetaBucket(tx) ci := cindex.NewConsistentIndex(tx) ci.SetConsistentIndex(idx) ci.UnsafeSave(tx) + } else { + // Thanks to translateWAL not moving entries, but just replacing them with + // 'empty', there is no need to update the consistency index. } } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index e47e186e4..ed5c14b56 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -23,9 +23,9 @@ import ( ) var ( - metaBucketName = []byte("meta") + MetaBucketName = []byte("meta") - consistentIndexKeyName = []byte("consistent_index") + ConsistentIndexKeyName = []byte("consistent_index") ) // ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex. @@ -52,14 +52,11 @@ type consistentIndex struct { // it caches the "consistent_index" key's value. Accessed // through atomics so must be 64-bit aligned. consistentIndex uint64 - // bytesBuf8 is a byte slice of length 8 - // to avoid a repetitive allocation in saveIndex. - bytesBuf8 []byte - mutex sync.Mutex + mutex sync.Mutex } func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer { - return &consistentIndex{tx: tx, bytesBuf8: make([]byte, 8)} + return &consistentIndex{tx: tx} } func (ci *consistentIndex) ConsistentIndex() uint64 { @@ -69,14 +66,7 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { } ci.mutex.Lock() defer ci.mutex.Unlock() - ci.tx.Lock() - defer ci.tx.Unlock() - _, vs := ci.tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0) - if len(vs) == 0 { - return 0 - } - v := binary.BigEndian.Uint64(vs[0]) - atomic.StoreUint64(&ci.consistentIndex, v) + v := ReadConsistentIndex(ci.tx) return v } @@ -85,11 +75,16 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) { } func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { - bs := ci.bytesBuf8 - binary.BigEndian.PutUint64(bs, ci.consistentIndex) + index := atomic.LoadUint64(&ci.consistentIndex) + if index == 0 { + // Never save 0 as it means that we didn't loaded the real index yet. + return + } + bs := make([]byte, 8) // this is kept on stack (not heap) so its quick. + binary.BigEndian.PutUint64(bs, index) // put the index into the underlying backend // tx has been locked in TxnBegin, so there is no need to lock it again - tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) + tx.UnsafePut(MetaBucketName, ConsistentIndexKeyName, bs) } func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) { @@ -112,3 +107,26 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) { func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {} func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {} + +func UnsafeCreateMetaBucket(tx backend.BatchTx) { + tx.UnsafeCreateBucket(MetaBucketName) +} + +// unsafeGetConsistentIndex loads consistent index from given transaction. +// returns 0 if the data are not found. +func unsafeReadConsistentIndex(tx backend.ReadTx) uint64 { + _, vs := tx.UnsafeRange(MetaBucketName, ConsistentIndexKeyName, nil, 0) + if len(vs) == 0 { + return 0 + } + v := binary.BigEndian.Uint64(vs[0]) + return v +} + +// ReadConsistentIndex loads consistent index from given transaction. +// returns 0 if the data are not found. +func ReadConsistentIndex(tx backend.ReadTx) uint64 { + tx.Lock() + defer tx.Unlock() + return unsafeReadConsistentIndex(tx) +} diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index aa5761c2c..eb577b8fd 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -34,7 +34,7 @@ func TestConsistentIndex(t *testing.T) { t.Fatal("batch tx is nil") } tx.Lock() - tx.UnsafeCreateBucket(metaBucketName) + UnsafeCreateMetaBucket(tx) tx.Unlock() be.ForceCommit() r := rand.Uint64() @@ -50,6 +50,7 @@ func TestConsistentIndex(t *testing.T) { be.Close() b := backend.NewDefaultBackend(tmpPath) + defer b.Close() ci.SetConsistentIndex(0) ci.SetBatchTx(b.BatchTx()) index = ci.ConsistentIndex() @@ -62,8 +63,6 @@ func TestConsistentIndex(t *testing.T) { if index != r { t.Errorf("expected %d,got %d", r, index) } - b.Close() - } func TestFakeConsistentIndex(t *testing.T) { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index bb4ff752e..c0565e3aa 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2256,6 +2256,9 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { clone := s.v2store.Clone() // commit kv to write metadata (for example: consistent index) to disk. + // + // This guarantees that Backend's consistent_index is >= index of last snapshot. + // // KV().commit() updates the consistent index in backend. // All operations that update consistent index must be called sequentially // from applyAll function. diff --git a/server/verify/verify.go b/server/verify/verify.go index b2483cb1c..67efcf60a 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -76,7 +76,7 @@ func Verify(cfg Config) error { be := backend.New(beConfig) defer be.Close() - _, hardstate, err := validateWal(cfg) + snapshot, hardstate, err := validateWal(cfg) if err != nil { return err } @@ -84,7 +84,7 @@ func Verify(cfg Config) error { // TODO: Perform validation of consistency of membership between // backend/members & WAL confstate (and maybe storev2 if still exists). - return validateConsistentIndex(cfg, hardstate, be) + return validateConsistentIndex(cfg, hardstate, snapshot, be) } // VerifyIfEnabled performs verification according to ETCD_VERIFY env settings. @@ -101,22 +101,25 @@ func VerifyIfEnabled(cfg Config) error { // See Verify for more information. func MustVerifyIfEnabled(cfg Config) { if err := VerifyIfEnabled(cfg); err != nil { - cfg.Logger.Panic("Verification failed", + cfg.Logger.Fatal("Verification failed", zap.String("data-dir", cfg.DataDir), zap.Error(err)) } } -func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, be backend.Backend) error { +func validateConsistentIndex(cfg Config, hardstate *raftpb.HardState, snapshot *walpb.Snapshot, be backend.Backend) error { tx := be.BatchTx() - ci := cindex.NewConsistentIndex(tx) - index := ci.ConsistentIndex() + index := cindex.ReadConsistentIndex(tx) if cfg.ExactIndex && index != hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) expected == WAL.HardState.commit (%v)", index, hardstate.Commit) } if index > hardstate.Commit { return fmt.Errorf("backend.ConsistentIndex (%v) must be <= WAL.HardState.commit (%v)", index, hardstate.Commit) } + if index < snapshot.Index { + return fmt.Errorf("backend.ConsistentIndex (%v) must be >= last snapshot index (%v)", index, snapshot.Index) + } + cfg.Logger.Info("verification: consistentIndex OK", zap.Uint64("backend-consistent-index", index), zap.Uint64("hardstate-commit", hardstate.Commit)) return nil }