From 768da490edf0672628c9c3b829999e6b361e0501 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 26 Mar 2021 13:01:49 +0100 Subject: [PATCH] sever: v2store deprecation: Fix `etcdctl snapshot restore` to restore correct 'backend' (bbolt) context in aspect of membership. Prior to this change the 'restored' backend used to still contain: - old memberid (mvcc deletion used, why the membership is in bolt bucket, but not mvcc part): ``` mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) defer mvs.Close() txn := mvs.Write(traceutil.TODO()) btx := be.BatchTx() del := func(k, v []byte) error { txn.DeleteRange(k, nil) return nil } // delete stored members from old cluster since using new members btx.UnsafeForEach([]byte("members"), del) ``` - didn't get new members added. --- etcdctl/ctlv2/command/backup_command.go | 2 + etcdctl/snapshot/v3_snapshot.go | 97 ++++++++----------- server/etcdserver/api/membership/cluster.go | 4 + server/etcdserver/api/membership/store.go | 70 ++++++++++++- .../etcdserver/api/membership/store_test.go | 43 ++++++++ 5 files changed, 160 insertions(+), 56 deletions(-) create mode 100644 server/etcdserver/api/membership/store_test.go diff --git a/etcdctl/ctlv2/command/backup_command.go b/etcdctl/ctlv2/command/backup_command.go index 3b30d813a..a425961d9 100644 --- a/etcdctl/ctlv2/command/backup_command.go +++ b/etcdctl/ctlv2/command/backup_command.go @@ -239,6 +239,8 @@ func saveDB(destDB, srcDB string, idx uint64, v3 bool) { } // remove membership information; should be clobbered by --force-new-cluster + // TODO: Consider refactoring to use backend.Backend instead of bolt + // and membership.TrimMembershipFromBackend. for _, bucket := range []string{"members", "members_removed", "cluster"} { tx.DeleteBucket([]byte(bucket)) } diff --git a/etcdctl/snapshot/v3_snapshot.go b/etcdctl/snapshot/v3_snapshot.go index 7c8b159f8..31b31e0ff 100644 --- a/etcdctl/snapshot/v3_snapshot.go +++ b/etcdctl/snapshot/v3_snapshot.go @@ -21,7 +21,6 @@ import ( "fmt" "hash/crc32" "io" - "math" "os" "path/filepath" "reflect" @@ -41,9 +40,6 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" - "go.etcd.io/etcd/server/v3/etcdserver/cindex" - "go.etcd.io/etcd/server/v3/lease" - "go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -81,11 +77,11 @@ func NewV3(lg *zap.Logger) Manager { type v3Manager struct { lg *zap.Logger - name string - dbPath string - walDir string - snapDir string - cl *membership.RaftCluster + name string + srcDbPath string + walDir string + snapDir string + cl *membership.RaftCluster skipHashCheck bool } @@ -246,17 +242,18 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { } s.name = cfg.Name - s.dbPath = cfg.SnapshotPath + s.srcDbPath = cfg.SnapshotPath s.walDir = walDir s.snapDir = filepath.Join(dataDir, "member", "snap") s.skipHashCheck = cfg.SkipHashCheck s.lg.Info( "restoring snapshot", - zap.String("path", s.dbPath), + zap.String("path", s.srcDbPath), zap.String("wal-dir", s.walDir), zap.String("data-dir", dataDir), zap.String("snap-dir", s.snapDir), + zap.Stack("stack"), ) if err = s.saveDB(); err != nil { return err @@ -266,7 +263,7 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { } s.lg.Info( "restored snapshot", - zap.String("path", s.dbPath), + zap.String("path", s.srcDbPath), zap.String("wal-dir", s.walDir), zap.String("data-dir", dataDir), zap.String("snap-dir", s.snapDir), @@ -275,23 +272,44 @@ func (s *v3Manager) Restore(cfg RestoreConfig) error { return nil } +func (s *v3Manager) outDbPath() string { + return filepath.Join(s.snapDir, "db") +} + // saveDB copies the database snapshot to the snapshot directory func (s *v3Manager) saveDB() error { - f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600) + err := s.copyAndVerifyDB() + if err != nil { + return err + } + + be := backend.NewDefaultBackend(s.outDbPath()) + defer be.Close() + + err = membership.TrimMembershipFromBackend(s.lg, be) + if err != nil { + return err + } + + return nil +} + +func (s *v3Manager) copyAndVerifyDB() error { + srcf, ferr := os.OpenFile(s.srcDbPath, os.O_RDONLY, 0600) if ferr != nil { return ferr } - defer f.Close() + defer srcf.Close() // get snapshot integrity hash - if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil { + if _, err := srcf.Seek(-sha256.Size, io.SeekEnd); err != nil { return err } sha := make([]byte, sha256.Size) - if _, err := f.Read(sha); err != nil { + if _, err := srcf.Read(sha); err != nil { return err } - if _, err := f.Seek(0, io.SeekStart); err != nil { + if _, err := srcf.Seek(0, io.SeekStart); err != nil { return err } @@ -299,8 +317,9 @@ func (s *v3Manager) saveDB() error { return err } - dbpath := filepath.Join(s.snapDir, "db") - db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600) + outDbPath := s.outDbPath() + + db, dberr := os.OpenFile(outDbPath, os.O_RDWR|os.O_CREATE, 0600) if dberr != nil { return dberr } @@ -311,7 +330,7 @@ func (s *v3Manager) saveDB() error { dbClosed = true } }() - if _, err := io.Copy(db, f); err != nil { + if _, err := io.Copy(db, srcf); err != nil { return err } @@ -348,41 +367,6 @@ func (s *v3Manager) saveDB() error { // db hash is OK, can now modify DB so it can be part of a new cluster db.Close() - dbClosed = true - - commit := len(s.cl.Members()) - - // update consistentIndex so applies go through on etcdserver despite - // having a new raft instance - be := backend.NewDefaultBackend(dbpath) - defer be.Close() - - ci := cindex.NewConsistentIndex(be.BatchTx()) - ci.SetConsistentIndex(uint64(commit)) - - // a lessor never timeouts leases - lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64}, ci) - defer lessor.Stop() - - mvs := mvcc.NewStore(s.lg, be, lessor, ci, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32}) - defer mvs.Close() - txn := mvs.Write(traceutil.TODO()) - btx := be.BatchTx() - del := func(k, v []byte) error { - txn.DeleteRange(k, nil) - return nil - } - - // delete stored members from old cluster since using new members - btx.UnsafeForEach([]byte("members"), del) - - // todo: add back new members when we start to deprecate old snap file. - btx.UnsafeForEach([]byte("members_removed"), del) - - // trigger write-out of new consistent index - txn.End() - - mvs.Commit() return nil } @@ -397,6 +381,9 @@ func (s *v3Manager) saveWALAndSnap() error { // add members again to persist them to the store we create. st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) s.cl.SetStore(st) + be := backend.NewDefaultBackend(s.outDbPath()) + defer be.Close() + s.cl.SetBackend(be) for _, m := range s.cl.Members() { s.cl.AddMember(m, true) } diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index a8a27984f..bd5389212 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -676,6 +676,10 @@ func membersFromStore(lg *zap.Logger, st v2store.Store) (map[types.ID]*Member, m return members, removed } +func membersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) { + return mustReadMembersFromBackend(lg, be) +} + func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version { e, err := st.Get(path.Join(storePrefix, "version"), false, false) if err != nil { diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index c3593394c..95c2e6aaa 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -67,6 +67,66 @@ func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { tx.UnsafePut(membersRemovedBucketName, mkey, []byte("removed")) } +func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) { + members := make(map[types.ID]*Member) + removed := make(map[types.ID]bool) + + tx := be.ReadTx() + tx.RLock() + defer tx.RUnlock() + err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error { + memberId := MustParseMemberIDFromBytes(lg, k) + m := &Member{ID: memberId} + if err := json.Unmarshal(v, &m); err != nil { + return err + } + members[memberId] = m + return nil + }) + if err != nil { + return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err) + } + + err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error { + memberId := MustParseMemberIDFromBytes(lg, k) + removed[memberId] = true + return nil + }) + if err != nil { + return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err) + } + return members, removed, nil +} + +func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) { + members, removed, err := readMembersFromBackend(lg, be) + if err != nil { + lg.Panic("couldn't read members from backend", zap.Error(err)) + } + return members, removed +} + +func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error { + tx := be.BatchTx() + tx.Lock() + defer tx.Unlock() + err := tx.UnsafeForEach(membersBucketName, func(k, v []byte) error { + tx.UnsafeDelete(membersBucketName, k) + return nil + }) + if err != nil { + return err + } + err = tx.UnsafeForEach(membersRemovedBucketName, func(k, v []byte) error { + tx.UnsafeDelete(membersRemovedBucketName, k) + return nil + }) + if err != nil { + return err + } + return nil +} + func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { ckey := backendClusterVersionKey() @@ -221,10 +281,18 @@ func MemberAttributesStorePath(id types.ID) string { return path.Join(MemberStoreKey(id), attributesSuffix) } +func MustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID { + id, err := types.IDFromString(string(key)) + if err != nil { + lg.Panic("failed to parse member id from key", zap.Error(err)) + } + return id +} + func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID { id, err := types.IDFromString(path.Base(key)) if err != nil { - lg.Panic("failed to parse memver id from key", zap.Error(err)) + lg.Panic("failed to parse member id from key", zap.Error(err)) } return id } diff --git a/server/etcdserver/api/membership/store_test.go b/server/etcdserver/api/membership/store_test.go new file mode 100644 index 000000000..d39a2b103 --- /dev/null +++ b/server/etcdserver/api/membership/store_test.go @@ -0,0 +1,43 @@ +package membership + +import ( + "testing" + + "github.com/coreos/go-semver/semver" + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/client/pkg/v3/types" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + + "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.uber.org/zap" +) + +func TestAddRemoveMember(t *testing.T) { + c := newTestCluster(t, nil) + be, bepath := betesting.NewDefaultTmpBackend(t) + c.SetBackend(be) + c.AddMember(newTestMember(17, nil, "node17", nil), true) + c.RemoveMember(17, true) + c.AddMember(newTestMember(18, nil, "node18", nil), true) + + // Skipping removal of already removed member + c.RemoveMember(17, true) + err := be.Close() + assert.NoError(t, err) + + be2 := backend.NewDefaultBackend(bepath) + defer func() { + assert.NoError(t, be2.Close()) + }() + + if false { + // TODO: Enable this code when Recover is reading membership from the backend. + c2 := newTestCluster(t, nil) + c2.SetBackend(be2) + c2.Recover(func(*zap.Logger, *semver.Version) {}) + assert.Equal(t, []*Member{{ID: types.ID(18), + Attributes: Attributes{Name: "node18"}}}, c2.Members()) + assert.Equal(t, true, c2.IsIDRemoved(17)) + assert.Equal(t, false, c2.IsIDRemoved(18)) + } +}