From 50507d5f3c71990eb730270e5faea710f2b74060 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Tue, 29 Jun 2021 13:51:56 +0200 Subject: [PATCH] etcdserver: Membership uses MembershipStorage interface instead of directly accessing Backend --- etcdutl/etcdutl/backup_command.go | 7 +- etcdutl/snapshot/v3_snapshot.go | 5 +- server/etcdserver/api/membership/cluster.go | 105 +----- .../{store_test.go => membership_test.go} | 33 +- server/etcdserver/api/membership/store.go | 301 +----------------- server/etcdserver/api/membership/storev2.go | 173 ++++++++++ server/etcdserver/server.go | 11 +- server/etcdserver/server_test.go | 3 +- .../membership => mvcc/buckets}/confstate.go | 9 +- .../buckets}/confstate_test.go | 20 +- server/mvcc/buckets/membership.go | 237 ++++++++++++++ 11 files changed, 492 insertions(+), 412 deletions(-) rename server/etcdserver/api/membership/{store_test.go => membership_test.go} (51%) rename server/{etcdserver/api/membership => mvcc/buckets}/confstate.go (84%) rename server/{etcdserver/api/membership => mvcc/buckets}/confstate_test.go (73%) create mode 100644 server/mvcc/buckets/membership.go diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index c09bcf14a..dd1413226 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -33,6 +33,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/server/v3/verify" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -307,14 +308,14 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir be := backend.NewDefaultBackend(destDB) defer be.Close() - - if err := membership.TrimClusterFromBackend(be); err != nil { + ms := buckets.NewMembershipStore(lg, be) + if err := ms.TrimClusterFromBackend(); err != nil { lg.Fatal("bbolt tx.Membership failed", zap.Error(err)) } raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members) raftCluster.SetID(desired.nodeId, desired.clusterId) - raftCluster.SetBackend(be) + raftCluster.SetBackend(ms) raftCluster.PushMembershipToStorage() if !v3 { diff --git a/etcdutl/snapshot/v3_snapshot.go b/etcdutl/snapshot/v3_snapshot.go index 28708f55a..5ce78a260 100644 --- a/etcdutl/snapshot/v3_snapshot.go +++ b/etcdutl/snapshot/v3_snapshot.go @@ -42,6 +42,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/etcdserver/version" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/server/v3/verify" "go.etcd.io/etcd/server/v3/wal" "go.etcd.io/etcd/server/v3/wal/walpb" @@ -306,7 +307,7 @@ func (s *v3Manager) saveDB() error { be := backend.NewDefaultBackend(s.outDbPath()) defer be.Close() - err = membership.TrimMembershipFromBackend(s.lg, be) + err = buckets.NewMembershipStore(s.lg, be).TrimMembershipFromBackend() if err != nil { return err } @@ -403,7 +404,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) { s.cl.SetStore(st) be := backend.NewDefaultBackend(s.outDbPath()) defer be.Close() - s.cl.SetBackend(be) + s.cl.SetBackend(buckets.NewMembershipStore(s.lg, be)) for _, m := range s.cl.Members() { s.cl.AddMember(m, true) } diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 89a52f704..264aa347e 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -21,7 +21,6 @@ import ( "encoding/binary" "encoding/json" "fmt" - "path" "sort" "strings" "sync" @@ -33,8 +32,6 @@ import ( "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" - "go.etcd.io/etcd/server/v3/mvcc/backend" - "go.etcd.io/etcd/server/v3/mvcc/buckets" "github.com/coreos/go-semver/semver" "github.com/prometheus/client_golang/prometheus" @@ -51,7 +48,7 @@ type RaftCluster struct { cid types.ID v2store v2store.Store - be backend.Backend + be MembershipBackend sync.Mutex // guards the fields below version *semver.Version @@ -245,9 +242,9 @@ func (c *RaftCluster) SetID(localID, cid types.ID) { func (c *RaftCluster) SetStore(st v2store.Store) { c.v2store = st } -func (c *RaftCluster) SetBackend(be backend.Backend) { +func (c *RaftCluster) SetBackend(be MembershipBackend) { c.be = be - mustCreateBackendBuckets(c.be) + c.be.MustCreateBackendBuckets() } func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { @@ -255,15 +252,15 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { defer c.Unlock() if c.be != nil { - c.version = clusterVersionFromBackend(c.lg, c.be) - c.members, c.removed = membersFromBackend(c.lg, c.be) + c.version = c.be.ClusterVersionFromBackend() + c.members, c.removed = c.be.MustReadMembersFromBackend() } else { c.version = clusterVersionFromStore(c.lg, c.v2store) c.members, c.removed = membersFromStore(c.lg, c.v2store) } if c.be != nil { - c.downgradeInfo = downgradeInfoFromBackend(c.lg, c.be) + c.downgradeInfo = c.be.DowngradeInfoFromBackend() } d := &DowngradeInfo{Enabled: false} if c.downgradeInfo != nil { @@ -385,7 +382,7 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) { mustSaveMemberToStore(c.lg, c.v2store, m) } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, m) + c.be.MustSaveMemberToBackend(m) } c.members[m.ID] = m @@ -408,7 +405,7 @@ func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) { mustDeleteMemberFromStore(c.lg, c.v2store, id) } if c.be != nil && shouldApplyV3 { - mustDeleteMemberFromBackend(c.be, id) + c.be.MustDeleteMemberFromBackend(id) } m, ok := c.members[id] @@ -443,7 +440,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply mustUpdateMemberAttrInStore(c.lg, c.v2store, m) } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, m) + c.be.MustSaveMemberToBackend(m) } return } @@ -476,7 +473,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) { mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, c.members[id]) + c.be.MustSaveMemberToBackend(c.members[id]) } c.lg.Info( @@ -495,7 +492,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, c.members[id]) + c.be.MustSaveMemberToBackend(c.members[id]) } c.lg.Info( @@ -542,7 +539,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s mustSaveClusterVersionToStore(c.lg, c.v2store, ver) } if c.be != nil && shouldApplyV3 { - mustSaveClusterVersionToBackend(c.be, ver) + c.be.MustSaveClusterVersionToBackend(ver) } if oldVer != nil { ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0) @@ -676,78 +673,6 @@ func membersFromStore(lg *zap.Logger, st v2store.Store) (map[types.ID]*Member, m return members, removed } -func membersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) { - return mustReadMembersFromBackend(lg, be) -} - -func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version { - e, err := st.Get(path.Join(storePrefix, "version"), false, false) - if err != nil { - if isKeyNotFound(err) { - return nil - } - lg.Panic( - "failed to get cluster version from store", - zap.String("path", path.Join(storePrefix, "version")), - zap.Error(err), - ) - } - return semver.Must(semver.NewVersion(*e.Node.Value)) -} - -// The field is populated since etcd v3.5. -func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Version { - ckey := buckets.ClusterClusterVersionKeyName - tx := be.ReadTx() - tx.RLock() - defer tx.RUnlock() - keys, vals := tx.UnsafeRange(buckets.Cluster, ckey, nil, 0) - if len(keys) == 0 { - return nil - } - if len(keys) != 1 { - lg.Panic( - "unexpected number of keys when getting cluster version from backend", - zap.Int("number-of-key", len(keys)), - ) - } - return semver.Must(semver.NewVersion(string(vals[0]))) -} - -// The field is populated since etcd v3.5. -func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo { - dkey := buckets.ClusterDowngradeKeyName - tx := be.ReadTx() - tx.Lock() - defer tx.Unlock() - keys, vals := tx.UnsafeRange(buckets.Cluster, dkey, nil, 0) - if len(keys) == 0 { - return nil - } - - if len(keys) != 1 { - lg.Panic( - "unexpected number of keys when getting cluster version from backend", - zap.Int("number-of-key", len(keys)), - ) - } - var d DowngradeInfo - if err := json.Unmarshal(vals[0], &d); err != nil { - lg.Panic("failed to unmarshal downgrade information", zap.Error(err)) - } - - // verify the downgrade info from backend - if d.Enabled { - if _, err := semver.NewVersion(d.TargetVersion); err != nil { - lg.Panic( - "unexpected version format of the downgrade target version from backend", - zap.String("target-version", d.TargetVersion), - ) - } - } - return &d -} - // ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs // with the existing cluster. If the validation succeeds, it assigns the IDs // from the existing cluster to the local cluster. @@ -828,7 +753,7 @@ func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 ShouldApp defer c.Unlock() if c.be != nil && shouldApplyV3 { - mustSaveDowngradeToBackend(c.lg, c.be, d) + c.be.MustSaveDowngradeToBackend(d) } c.downgradeInfo = d @@ -868,9 +793,9 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID { // members, such that they fully reflect internal RaftCluster's storage. func (c *RaftCluster) PushMembershipToStorage() { if c.be != nil { - TrimMembershipFromBackend(c.lg, c.be) + c.be.TrimMembershipFromBackend() for _, m := range c.members { - mustSaveMemberToBackend(c.lg, c.be, m) + c.be.MustSaveMemberToBackend(m) } } if c.v2store != nil { diff --git a/server/etcdserver/api/membership/store_test.go b/server/etcdserver/api/membership/membership_test.go similarity index 51% rename from server/etcdserver/api/membership/store_test.go rename to server/etcdserver/api/membership/membership_test.go index d39a2b103..69f76e383 100644 --- a/server/etcdserver/api/membership/store_test.go +++ b/server/etcdserver/api/membership/membership_test.go @@ -6,15 +6,13 @@ import ( "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/client/pkg/v3/types" - betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" - "go.etcd.io/etcd/server/v3/mvcc/backend" "go.uber.org/zap" ) func TestAddRemoveMember(t *testing.T) { c := newTestCluster(t, nil) - be, bepath := betesting.NewDefaultTmpBackend(t) + be := &backendMock{} c.SetBackend(be) c.AddMember(newTestMember(17, nil, "node17", nil), true) c.RemoveMember(17, true) @@ -22,18 +20,11 @@ func TestAddRemoveMember(t *testing.T) { // Skipping removal of already removed member c.RemoveMember(17, true) - err := be.Close() - assert.NoError(t, err) - - be2 := backend.NewDefaultBackend(bepath) - defer func() { - assert.NoError(t, be2.Close()) - }() if false { // TODO: Enable this code when Recover is reading membership from the backend. c2 := newTestCluster(t, nil) - c2.SetBackend(be2) + c2.SetBackend(be) c2.Recover(func(*zap.Logger, *semver.Version) {}) assert.Equal(t, []*Member{{ID: types.ID(18), Attributes: Attributes{Name: "node18"}}}, c2.Members()) @@ -41,3 +32,23 @@ func TestAddRemoveMember(t *testing.T) { assert.Equal(t, false, c2.IsIDRemoved(18)) } } + +type backendMock struct { +} + +var _ MembershipBackend = (*backendMock)(nil) + +func (b *backendMock) MustCreateBackendBuckets() {} + +func (b *backendMock) ClusterVersionFromBackend() *semver.Version { return nil } +func (b *backendMock) MustSaveClusterVersionToBackend(version *semver.Version) {} + +func (b *backendMock) MustReadMembersFromBackend() (x map[types.ID]*Member, y map[types.ID]bool) { + return +} +func (b *backendMock) MustSaveMemberToBackend(*Member) {} +func (b *backendMock) TrimMembershipFromBackend() error { return nil } +func (b *backendMock) MustDeleteMemberFromBackend(types.ID) {} + +func (b *backendMock) MustSaveDowngradeToBackend(*DowngradeInfo) {} +func (b *backendMock) DowngradeInfoFromBackend() *DowngradeInfo { return nil } diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index 956730251..0e4c296f2 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -1,4 +1,4 @@ -// Copyright 2016 The etcd Authors +// Copyright 2021 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,299 +15,36 @@ package membership import ( - "encoding/json" - "fmt" "path" "go.etcd.io/etcd/client/pkg/v3/types" - "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" - "go.etcd.io/etcd/server/v3/mvcc/backend" - "go.etcd.io/etcd/server/v3/mvcc/buckets" "github.com/coreos/go-semver/semver" "go.uber.org/zap" ) -const ( - attributesSuffix = "attributes" - raftAttributesSuffix = "raftAttributes" - - // the prefix for storing membership related information in store provided by store pkg. - storePrefix = "/0" -) - -var ( - StoreMembersPrefix = path.Join(storePrefix, "members") - storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members") -) - -func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { - mkey := buckets.BackendMemberKey(m.ID) - mvalue, err := json.Marshal(m) - if err != nil { - lg.Panic("failed to marshal member", zap.Error(err)) - } - - tx := be.BatchTx() - tx.Lock() - defer tx.Unlock() - tx.UnsafePut(buckets.Members, mkey, mvalue) +type MembershipBackend interface { + ClusterVersionBackend + MemberBackend + DowngradeInfoBackend + MustCreateBackendBuckets() } -// TrimClusterFromBackend removes all information about cluster (versions) -// from the v3 backend. -func TrimClusterFromBackend(be backend.Backend) error { - tx := be.BatchTx() - tx.Lock() - defer tx.Unlock() - tx.UnsafeDeleteBucket(buckets.Cluster) - return nil +type ClusterVersionBackend interface { + ClusterVersionFromBackend() *semver.Version + MustSaveClusterVersionToBackend(version *semver.Version) } -func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { - mkey := buckets.BackendMemberKey(id) - - tx := be.BatchTx() - tx.Lock() - defer tx.Unlock() - tx.UnsafeDelete(buckets.Members, mkey) - tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed")) +type MemberBackend interface { + MustReadMembersFromBackend() (map[types.ID]*Member, map[types.ID]bool) + MustSaveMemberToBackend(*Member) + TrimMembershipFromBackend() error + MustDeleteMemberFromBackend(types.ID) } -func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) { - members := make(map[types.ID]*Member) - removed := make(map[types.ID]bool) - - tx := be.ReadTx() - tx.RLock() - defer tx.RUnlock() - err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error { - memberId := mustParseMemberIDFromBytes(lg, k) - m := &Member{ID: memberId} - if err := json.Unmarshal(v, &m); err != nil { - return err - } - members[memberId] = m - return nil - }) - if err != nil { - return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err) - } - - err = tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error { - memberId := mustParseMemberIDFromBytes(lg, k) - removed[memberId] = true - return nil - }) - if err != nil { - return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err) - } - return members, removed, nil -} - -func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) { - members, removed, err := readMembersFromBackend(lg, be) - if err != nil { - lg.Panic("couldn't read members from backend", zap.Error(err)) - } - return members, removed -} - -// TrimMembershipFromBackend removes all information about members & -// removed_members from the v3 backend. -func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error { - lg.Info("Trimming membership information from the backend...") - tx := be.BatchTx() - tx.Lock() - defer tx.Unlock() - err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error { - tx.UnsafeDelete(buckets.Members, k) - lg.Debug("Removed member from the backend", - zap.Stringer("member", mustParseMemberIDFromBytes(lg, k))) - return nil - }) - if err != nil { - return err - } - return tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error { - tx.UnsafeDelete(buckets.MembersRemoved, k) - lg.Debug("Removed removed_member from the backend", - zap.Stringer("member", mustParseMemberIDFromBytes(lg, k))) - return nil - }) -} - -// TrimMembershipFromV2Store removes all information about members & -// removed_members from the v2 store. -func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error { - members, removed := membersFromStore(lg, s) - - for mID := range members { - _, err := s.Delete(MemberStoreKey(mID), true, true) - if err != nil { - return err - } - } - for mID := range removed { - _, err := s.Delete(RemovedMemberStoreKey(mID), true, true) - if err != nil { - return err - } - } - - return nil -} - -// The field is populated since etcd v3.5. -func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) { - ckey := buckets.ClusterClusterVersionKeyName - - tx := be.BatchTx() - tx.Lock() - defer tx.Unlock() - tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String())) -} - -// The field is populated since etcd v3.5. -func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) { - dkey := buckets.ClusterDowngradeKeyName - dvalue, err := json.Marshal(downgrade) - if err != nil { - lg.Panic("failed to marshal downgrade information", zap.Error(err)) - } - tx := be.BatchTx() - tx.Lock() - defer tx.Unlock() - tx.UnsafePut(buckets.Cluster, dkey, dvalue) -} - -func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) { - b, err := json.Marshal(m.RaftAttributes) - if err != nil { - lg.Panic("failed to marshal raftAttributes", zap.Error(err)) - } - p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) - if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { - lg.Panic( - "failed to save member to store", - zap.String("path", p), - zap.Error(err), - ) - } -} - -func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) { - if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil { - lg.Panic( - "failed to delete member from store", - zap.String("path", MemberStoreKey(id)), - zap.Error(err), - ) - } - if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { - lg.Panic( - "failed to create removedMember", - zap.String("path", RemovedMemberStoreKey(id)), - zap.Error(err), - ) - } -} - -func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) { - b, err := json.Marshal(m.RaftAttributes) - if err != nil { - lg.Panic("failed to marshal raftAttributes", zap.Error(err)) - } - p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) - if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { - lg.Panic( - "failed to update raftAttributes", - zap.String("path", p), - zap.Error(err), - ) - } -} - -func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) { - b, err := json.Marshal(m.Attributes) - if err != nil { - lg.Panic("failed to marshal attributes", zap.Error(err)) - } - p := path.Join(MemberStoreKey(m.ID), attributesSuffix) - if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { - lg.Panic( - "failed to update attributes", - zap.String("path", p), - zap.Error(err), - ) - } -} - -func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) { - if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { - lg.Panic( - "failed to save cluster version to store", - zap.String("path", StoreClusterVersionKey()), - zap.Error(err), - ) - } -} - -// nodeToMember builds member from a key value node. -// the child nodes of the given node MUST be sorted by key. -func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) { - m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)} - attrs := make(map[string][]byte) - raftAttrKey := path.Join(n.Key, raftAttributesSuffix) - attrKey := path.Join(n.Key, attributesSuffix) - for _, nn := range n.Nodes { - if nn.Key != raftAttrKey && nn.Key != attrKey { - return nil, fmt.Errorf("unknown key %q", nn.Key) - } - attrs[nn.Key] = []byte(*nn.Value) - } - if data := attrs[raftAttrKey]; data != nil { - if err := json.Unmarshal(data, &m.RaftAttributes); err != nil { - return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err) - } - } else { - return nil, fmt.Errorf("raftAttributes key doesn't exist") - } - if data := attrs[attrKey]; data != nil { - if err := json.Unmarshal(data, &m.Attributes); err != nil { - return m, fmt.Errorf("unmarshal attributes error: %v", err) - } - } - return m, nil -} - -func mustCreateBackendBuckets(be backend.Backend) { - tx := be.BatchTx() - tx.Lock() - defer tx.Unlock() - tx.UnsafeCreateBucket(buckets.Members) - tx.UnsafeCreateBucket(buckets.MembersRemoved) - tx.UnsafeCreateBucket(buckets.Cluster) -} - -func MemberStoreKey(id types.ID) string { - return path.Join(StoreMembersPrefix, id.String()) -} - -func StoreClusterVersionKey() string { - return path.Join(storePrefix, "version") -} - -func MemberAttributesStorePath(id types.ID) string { - return path.Join(MemberStoreKey(id), attributesSuffix) -} - -func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID { - id, err := types.IDFromString(string(key)) - if err != nil { - lg.Panic("failed to parse member id from key", zap.Error(err)) - } - return id +type DowngradeInfoBackend interface { + MustSaveDowngradeToBackend(*DowngradeInfo) + DowngradeInfoFromBackend() *DowngradeInfo } func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID { @@ -317,7 +54,3 @@ func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID { } return id } - -func RemovedMemberStoreKey(id types.ID) string { - return path.Join(storeRemovedMembersPrefix, id.String()) -} diff --git a/server/etcdserver/api/membership/storev2.go b/server/etcdserver/api/membership/storev2.go index 8505c63f3..9ed94dee9 100644 --- a/server/etcdserver/api/membership/storev2.go +++ b/server/etcdserver/api/membership/storev2.go @@ -15,7 +15,28 @@ package membership import ( + "encoding/json" + "fmt" + "go.etcd.io/etcd/client/pkg/v3/types" + "path" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + + "github.com/coreos/go-semver/semver" + "go.uber.org/zap" +) + +const ( + // the prefix for storing membership related information in store provided by store pkg. + storePrefix = "/0" + + attributesSuffix = "attributes" + raftAttributesSuffix = "raftAttributes" +) + +var ( + StoreMembersPrefix = path.Join(storePrefix, "members") + storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members") ) // IsMetaStoreOnly verifies if the given `store` contains only @@ -34,3 +55,155 @@ func IsMetaStoreOnly(store v2store.Store) (bool, error) { return true, nil } + +// TrimMembershipFromV2Store removes all information about members & +// removed_members from the v2 store. +func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error { + members, removed := membersFromStore(lg, s) + + for mID := range members { + _, err := s.Delete(MemberStoreKey(mID), true, true) + if err != nil { + return err + } + } + for mID := range removed { + _, err := s.Delete(RemovedMemberStoreKey(mID), true, true) + if err != nil { + return err + } + } + + return nil +} + +func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) { + b, err := json.Marshal(m.RaftAttributes) + if err != nil { + lg.Panic("failed to marshal raftAttributes", zap.Error(err)) + } + p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) + if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { + lg.Panic( + "failed to save member to store", + zap.String("path", p), + zap.Error(err), + ) + } +} + +func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) { + if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil { + lg.Panic( + "failed to delete member from store", + zap.String("path", MemberStoreKey(id)), + zap.Error(err), + ) + } + if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { + lg.Panic( + "failed to create removedMember", + zap.String("path", RemovedMemberStoreKey(id)), + zap.Error(err), + ) + } +} + +func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) { + b, err := json.Marshal(m.RaftAttributes) + if err != nil { + lg.Panic("failed to marshal raftAttributes", zap.Error(err)) + } + p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) + if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { + lg.Panic( + "failed to update raftAttributes", + zap.String("path", p), + zap.Error(err), + ) + } +} + +func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) { + b, err := json.Marshal(m.Attributes) + if err != nil { + lg.Panic("failed to marshal attributes", zap.Error(err)) + } + p := path.Join(MemberStoreKey(m.ID), attributesSuffix) + if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { + lg.Panic( + "failed to update attributes", + zap.String("path", p), + zap.Error(err), + ) + } +} + +func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) { + if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { + lg.Panic( + "failed to save cluster version to store", + zap.String("path", StoreClusterVersionKey()), + zap.Error(err), + ) + } +} + +// nodeToMember builds member from a key value node. +// the child nodes of the given node MUST be sorted by key. +func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) { + m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)} + attrs := make(map[string][]byte) + raftAttrKey := path.Join(n.Key, raftAttributesSuffix) + attrKey := path.Join(n.Key, attributesSuffix) + for _, nn := range n.Nodes { + if nn.Key != raftAttrKey && nn.Key != attrKey { + return nil, fmt.Errorf("unknown key %q", nn.Key) + } + attrs[nn.Key] = []byte(*nn.Value) + } + if data := attrs[raftAttrKey]; data != nil { + if err := json.Unmarshal(data, &m.RaftAttributes); err != nil { + return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err) + } + } else { + return nil, fmt.Errorf("raftAttributes key doesn't exist") + } + if data := attrs[attrKey]; data != nil { + if err := json.Unmarshal(data, &m.Attributes); err != nil { + return m, fmt.Errorf("unmarshal attributes error: %v", err) + } + } + return m, nil +} + +func StoreClusterVersionKey() string { + return path.Join(storePrefix, "version") +} + +func RemovedMemberStoreKey(id types.ID) string { + return path.Join(storeRemovedMembersPrefix, id.String()) +} + +func MemberStoreKey(id types.ID) string { + return path.Join(StoreMembersPrefix, id.String()) +} + +func MemberAttributesStorePath(id types.ID) string { + return path.Join(MemberStoreKey(id), attributesSuffix) +} + +func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version { + e, err := st.Get(path.Join(storePrefix, "version"), false, false) + if err != nil { + if isKeyNotFound(err) { + return nil + } + lg.Panic( + "failed to get cluster version from store", + zap.String("path", path.Join(storePrefix, "version")), + zap.Error(err), + ) + } + return semver.Must(semver.NewVersion(*e.Node.Value)) +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 03f01586a..9c907e143 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -67,6 +67,7 @@ import ( "go.etcd.io/etcd/server/v3/lease/leasehttp" "go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.etcd.io/etcd/server/v3/wal" ) @@ -316,7 +317,7 @@ func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) { bh.confStateLock.Lock() defer bh.confStateLock.Unlock() if bh.confStateDirty { - membership.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState) + buckets.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState) // save bh.confState bh.confStateDirty = false } @@ -432,7 +433,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { remotes = existingCluster.Members() cl.SetID(types.ID(0), existingCluster.ID()) cl.SetStore(st) - cl.SetBackend(be) + cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) id, n, s, w = startNode(cfg, cl, nil) cl.SetID(id, existingCluster.ID()) @@ -467,7 +468,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } } cl.SetStore(st) - cl.SetBackend(be) + cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) id, n, s, w = startNode(cfg, cl, cl.MemberIDs()) cl.SetID(id, cl.ID()) @@ -537,7 +538,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { } cl.SetStore(st) - cl.SetBackend(be) + cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be)) cl.Recover(api.UpdateCapability) if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist { os.RemoveAll(bepath) @@ -1313,7 +1314,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Info("restored v2 store") - s.cluster.SetBackend(newbe) + s.cluster.SetBackend(buckets.NewMembershipStore(lg, newbe)) lg.Info("restoring cluster configuration") diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index c5a61da47..2da93b08b 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -53,6 +53,7 @@ import ( "go.etcd.io/etcd/server/v3/mock/mockwait" "go.etcd.io/etcd/server/v3/mvcc" betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) @@ -695,7 +696,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { tx.Lock() defer tx.Unlock() srv.beHooks.OnPreCommitUnsafe(tx) - assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx)) + assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *buckets.UnsafeConfStateFromBackend(lg, tx)) }) rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx()) assert.Equal(t, consistIndex, rindex) diff --git a/server/etcdserver/api/membership/confstate.go b/server/mvcc/buckets/confstate.go similarity index 84% rename from server/etcdserver/api/membership/confstate.go rename to server/mvcc/buckets/confstate.go index cf43cfacd..58f7d14fa 100644 --- a/server/etcdserver/api/membership/confstate.go +++ b/server/mvcc/buckets/confstate.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package membership +package buckets import ( "encoding/json" @@ -20,7 +20,6 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/mvcc/backend" - "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -32,20 +31,20 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err)) } - tx.UnsafePut(buckets.Meta, buckets.MetaConfStateName, confStateBytes) + tx.UnsafePut(Meta, MetaConfStateName, confStateBytes) } // UnsafeConfStateFromBackend retrieves ConfState from the backend. // Returns nil if confState in backend is not persisted (e.g. backend writen by