mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13161 from serathius/membership
etcdserver: Membership uses MembershipStorage interface instead of directly accessing Backend
This commit is contained in:
commit
1208505290
@ -33,6 +33,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
"go.etcd.io/etcd/server/v3/verify"
|
||||
"go.etcd.io/etcd/server/v3/wal"
|
||||
"go.etcd.io/etcd/server/v3/wal/walpb"
|
||||
@ -311,14 +312,14 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir
|
||||
|
||||
be := backend.NewDefaultBackend(destDB)
|
||||
defer be.Close()
|
||||
|
||||
if err := membership.TrimClusterFromBackend(be); err != nil {
|
||||
ms := buckets.NewMembershipStore(lg, be)
|
||||
if err := ms.TrimClusterFromBackend(); err != nil {
|
||||
lg.Fatal("bbolt tx.Membership failed", zap.Error(err))
|
||||
}
|
||||
|
||||
raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members)
|
||||
raftCluster.SetID(desired.nodeId, desired.clusterId)
|
||||
raftCluster.SetBackend(be)
|
||||
raftCluster.SetBackend(ms)
|
||||
raftCluster.PushMembershipToStorage()
|
||||
|
||||
if !v3 {
|
||||
|
@ -42,6 +42,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/version"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
"go.etcd.io/etcd/server/v3/verify"
|
||||
"go.etcd.io/etcd/server/v3/wal"
|
||||
"go.etcd.io/etcd/server/v3/wal/walpb"
|
||||
@ -306,7 +307,7 @@ func (s *v3Manager) saveDB() error {
|
||||
be := backend.NewDefaultBackend(s.outDbPath())
|
||||
defer be.Close()
|
||||
|
||||
err = membership.TrimMembershipFromBackend(s.lg, be)
|
||||
err = buckets.NewMembershipStore(s.lg, be).TrimMembershipFromBackend()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -403,7 +404,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
|
||||
s.cl.SetStore(st)
|
||||
be := backend.NewDefaultBackend(s.outDbPath())
|
||||
defer be.Close()
|
||||
s.cl.SetBackend(be)
|
||||
s.cl.SetBackend(buckets.NewMembershipStore(s.lg, be))
|
||||
for _, m := range s.cl.Members() {
|
||||
s.cl.AddMember(m, true)
|
||||
}
|
||||
|
@ -21,7 +21,6 @@ import (
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -33,8 +32,6 @@ import (
|
||||
"go.etcd.io/etcd/raft/v3"
|
||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
@ -51,7 +48,7 @@ type RaftCluster struct {
|
||||
cid types.ID
|
||||
|
||||
v2store v2store.Store
|
||||
be backend.Backend
|
||||
be MembershipBackend
|
||||
|
||||
sync.Mutex // guards the fields below
|
||||
version *semver.Version
|
||||
@ -245,9 +242,9 @@ func (c *RaftCluster) SetID(localID, cid types.ID) {
|
||||
|
||||
func (c *RaftCluster) SetStore(st v2store.Store) { c.v2store = st }
|
||||
|
||||
func (c *RaftCluster) SetBackend(be backend.Backend) {
|
||||
func (c *RaftCluster) SetBackend(be MembershipBackend) {
|
||||
c.be = be
|
||||
mustCreateBackendBuckets(c.be)
|
||||
c.be.MustCreateBackendBuckets()
|
||||
}
|
||||
|
||||
func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
|
||||
@ -255,15 +252,15 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
|
||||
defer c.Unlock()
|
||||
|
||||
if c.be != nil {
|
||||
c.version = clusterVersionFromBackend(c.lg, c.be)
|
||||
c.members, c.removed = membersFromBackend(c.lg, c.be)
|
||||
c.version = c.be.ClusterVersionFromBackend()
|
||||
c.members, c.removed = c.be.MustReadMembersFromBackend()
|
||||
} else {
|
||||
c.version = clusterVersionFromStore(c.lg, c.v2store)
|
||||
c.members, c.removed = membersFromStore(c.lg, c.v2store)
|
||||
}
|
||||
|
||||
if c.be != nil {
|
||||
c.downgradeInfo = downgradeInfoFromBackend(c.lg, c.be)
|
||||
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
|
||||
}
|
||||
d := &DowngradeInfo{Enabled: false}
|
||||
if c.downgradeInfo != nil {
|
||||
@ -385,7 +382,7 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
|
||||
mustSaveMemberToStore(c.lg, c.v2store, m)
|
||||
}
|
||||
if c.be != nil && shouldApplyV3 {
|
||||
mustSaveMemberToBackend(c.lg, c.be, m)
|
||||
c.be.MustSaveMemberToBackend(m)
|
||||
}
|
||||
|
||||
c.members[m.ID] = m
|
||||
@ -408,7 +405,7 @@ func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
|
||||
mustDeleteMemberFromStore(c.lg, c.v2store, id)
|
||||
}
|
||||
if c.be != nil && shouldApplyV3 {
|
||||
mustDeleteMemberFromBackend(c.be, id)
|
||||
c.be.MustDeleteMemberFromBackend(id)
|
||||
}
|
||||
|
||||
m, ok := c.members[id]
|
||||
@ -443,7 +440,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
|
||||
mustUpdateMemberAttrInStore(c.lg, c.v2store, m)
|
||||
}
|
||||
if c.be != nil && shouldApplyV3 {
|
||||
mustSaveMemberToBackend(c.lg, c.be, m)
|
||||
c.be.MustSaveMemberToBackend(m)
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -476,7 +473,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
|
||||
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
|
||||
}
|
||||
if c.be != nil && shouldApplyV3 {
|
||||
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
|
||||
c.be.MustSaveMemberToBackend(c.members[id])
|
||||
}
|
||||
|
||||
c.lg.Info(
|
||||
@ -495,7 +492,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
|
||||
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
|
||||
}
|
||||
if c.be != nil && shouldApplyV3 {
|
||||
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
|
||||
c.be.MustSaveMemberToBackend(c.members[id])
|
||||
}
|
||||
|
||||
c.lg.Info(
|
||||
@ -542,7 +539,7 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
|
||||
mustSaveClusterVersionToStore(c.lg, c.v2store, ver)
|
||||
}
|
||||
if c.be != nil && shouldApplyV3 {
|
||||
mustSaveClusterVersionToBackend(c.be, ver)
|
||||
c.be.MustSaveClusterVersionToBackend(ver)
|
||||
}
|
||||
if oldVer != nil {
|
||||
ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0)
|
||||
@ -676,78 +673,6 @@ func membersFromStore(lg *zap.Logger, st v2store.Store) (map[types.ID]*Member, m
|
||||
return members, removed
|
||||
}
|
||||
|
||||
func membersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) {
|
||||
return mustReadMembersFromBackend(lg, be)
|
||||
}
|
||||
|
||||
func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
|
||||
e, err := st.Get(path.Join(storePrefix, "version"), false, false)
|
||||
if err != nil {
|
||||
if isKeyNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
lg.Panic(
|
||||
"failed to get cluster version from store",
|
||||
zap.String("path", path.Join(storePrefix, "version")),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
return semver.Must(semver.NewVersion(*e.Node.Value))
|
||||
}
|
||||
|
||||
// The field is populated since etcd v3.5.
|
||||
func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Version {
|
||||
ckey := buckets.ClusterClusterVersionKeyName
|
||||
tx := be.ReadTx()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
keys, vals := tx.UnsafeRange(buckets.Cluster, ckey, nil, 0)
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(keys) != 1 {
|
||||
lg.Panic(
|
||||
"unexpected number of keys when getting cluster version from backend",
|
||||
zap.Int("number-of-key", len(keys)),
|
||||
)
|
||||
}
|
||||
return semver.Must(semver.NewVersion(string(vals[0])))
|
||||
}
|
||||
|
||||
// The field is populated since etcd v3.5.
|
||||
func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
|
||||
dkey := buckets.ClusterDowngradeKeyName
|
||||
tx := be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
keys, vals := tx.UnsafeRange(buckets.Cluster, dkey, nil, 0)
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(keys) != 1 {
|
||||
lg.Panic(
|
||||
"unexpected number of keys when getting cluster version from backend",
|
||||
zap.Int("number-of-key", len(keys)),
|
||||
)
|
||||
}
|
||||
var d DowngradeInfo
|
||||
if err := json.Unmarshal(vals[0], &d); err != nil {
|
||||
lg.Panic("failed to unmarshal downgrade information", zap.Error(err))
|
||||
}
|
||||
|
||||
// verify the downgrade info from backend
|
||||
if d.Enabled {
|
||||
if _, err := semver.NewVersion(d.TargetVersion); err != nil {
|
||||
lg.Panic(
|
||||
"unexpected version format of the downgrade target version from backend",
|
||||
zap.String("target-version", d.TargetVersion),
|
||||
)
|
||||
}
|
||||
}
|
||||
return &d
|
||||
}
|
||||
|
||||
// ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs
|
||||
// with the existing cluster. If the validation succeeds, it assigns the IDs
|
||||
// from the existing cluster to the local cluster.
|
||||
@ -828,7 +753,7 @@ func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 ShouldApp
|
||||
defer c.Unlock()
|
||||
|
||||
if c.be != nil && shouldApplyV3 {
|
||||
mustSaveDowngradeToBackend(c.lg, c.be, d)
|
||||
c.be.MustSaveDowngradeToBackend(d)
|
||||
}
|
||||
|
||||
c.downgradeInfo = d
|
||||
@ -868,9 +793,9 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID {
|
||||
// members, such that they fully reflect internal RaftCluster's storage.
|
||||
func (c *RaftCluster) PushMembershipToStorage() {
|
||||
if c.be != nil {
|
||||
TrimMembershipFromBackend(c.lg, c.be)
|
||||
c.be.TrimMembershipFromBackend()
|
||||
for _, m := range c.members {
|
||||
mustSaveMemberToBackend(c.lg, c.be, m)
|
||||
c.be.MustSaveMemberToBackend(m)
|
||||
}
|
||||
}
|
||||
if c.v2store != nil {
|
||||
|
@ -6,15 +6,13 @@ import (
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func TestAddRemoveMember(t *testing.T) {
|
||||
c := newTestCluster(t, nil)
|
||||
be, bepath := betesting.NewDefaultTmpBackend(t)
|
||||
be := &backendMock{}
|
||||
c.SetBackend(be)
|
||||
c.AddMember(newTestMember(17, nil, "node17", nil), true)
|
||||
c.RemoveMember(17, true)
|
||||
@ -22,18 +20,11 @@ func TestAddRemoveMember(t *testing.T) {
|
||||
|
||||
// Skipping removal of already removed member
|
||||
c.RemoveMember(17, true)
|
||||
err := be.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
be2 := backend.NewDefaultBackend(bepath)
|
||||
defer func() {
|
||||
assert.NoError(t, be2.Close())
|
||||
}()
|
||||
|
||||
if false {
|
||||
// TODO: Enable this code when Recover is reading membership from the backend.
|
||||
c2 := newTestCluster(t, nil)
|
||||
c2.SetBackend(be2)
|
||||
c2.SetBackend(be)
|
||||
c2.Recover(func(*zap.Logger, *semver.Version) {})
|
||||
assert.Equal(t, []*Member{{ID: types.ID(18),
|
||||
Attributes: Attributes{Name: "node18"}}}, c2.Members())
|
||||
@ -41,3 +32,23 @@ func TestAddRemoveMember(t *testing.T) {
|
||||
assert.Equal(t, false, c2.IsIDRemoved(18))
|
||||
}
|
||||
}
|
||||
|
||||
type backendMock struct {
|
||||
}
|
||||
|
||||
var _ MembershipBackend = (*backendMock)(nil)
|
||||
|
||||
func (b *backendMock) MustCreateBackendBuckets() {}
|
||||
|
||||
func (b *backendMock) ClusterVersionFromBackend() *semver.Version { return nil }
|
||||
func (b *backendMock) MustSaveClusterVersionToBackend(version *semver.Version) {}
|
||||
|
||||
func (b *backendMock) MustReadMembersFromBackend() (x map[types.ID]*Member, y map[types.ID]bool) {
|
||||
return
|
||||
}
|
||||
func (b *backendMock) MustSaveMemberToBackend(*Member) {}
|
||||
func (b *backendMock) TrimMembershipFromBackend() error { return nil }
|
||||
func (b *backendMock) MustDeleteMemberFromBackend(types.ID) {}
|
||||
|
||||
func (b *backendMock) MustSaveDowngradeToBackend(*DowngradeInfo) {}
|
||||
func (b *backendMock) DowngradeInfoFromBackend() *DowngradeInfo { return nil }
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
// Copyright 2021 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
@ -15,299 +15,36 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
attributesSuffix = "attributes"
|
||||
raftAttributesSuffix = "raftAttributes"
|
||||
|
||||
// the prefix for storing membership related information in store provided by store pkg.
|
||||
storePrefix = "/0"
|
||||
)
|
||||
|
||||
var (
|
||||
StoreMembersPrefix = path.Join(storePrefix, "members")
|
||||
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
|
||||
)
|
||||
|
||||
func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
|
||||
mkey := buckets.BackendMemberKey(m.ID)
|
||||
mvalue, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
lg.Panic("failed to marshal member", zap.Error(err))
|
||||
}
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(buckets.Members, mkey, mvalue)
|
||||
type MembershipBackend interface {
|
||||
ClusterVersionBackend
|
||||
MemberBackend
|
||||
DowngradeInfoBackend
|
||||
MustCreateBackendBuckets()
|
||||
}
|
||||
|
||||
// TrimClusterFromBackend removes all information about cluster (versions)
|
||||
// from the v3 backend.
|
||||
func TrimClusterFromBackend(be backend.Backend) error {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeDeleteBucket(buckets.Cluster)
|
||||
return nil
|
||||
type ClusterVersionBackend interface {
|
||||
ClusterVersionFromBackend() *semver.Version
|
||||
MustSaveClusterVersionToBackend(version *semver.Version)
|
||||
}
|
||||
|
||||
func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
|
||||
mkey := buckets.BackendMemberKey(id)
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeDelete(buckets.Members, mkey)
|
||||
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
|
||||
type MemberBackend interface {
|
||||
MustReadMembersFromBackend() (map[types.ID]*Member, map[types.ID]bool)
|
||||
MustSaveMemberToBackend(*Member)
|
||||
TrimMembershipFromBackend() error
|
||||
MustDeleteMemberFromBackend(types.ID)
|
||||
}
|
||||
|
||||
func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
|
||||
members := make(map[types.ID]*Member)
|
||||
removed := make(map[types.ID]bool)
|
||||
|
||||
tx := be.ReadTx()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
|
||||
memberId := mustParseMemberIDFromBytes(lg, k)
|
||||
m := &Member{ID: memberId}
|
||||
if err := json.Unmarshal(v, &m); err != nil {
|
||||
return err
|
||||
}
|
||||
members[memberId] = m
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err)
|
||||
}
|
||||
|
||||
err = tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
|
||||
memberId := mustParseMemberIDFromBytes(lg, k)
|
||||
removed[memberId] = true
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err)
|
||||
}
|
||||
return members, removed, nil
|
||||
}
|
||||
|
||||
func mustReadMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) {
|
||||
members, removed, err := readMembersFromBackend(lg, be)
|
||||
if err != nil {
|
||||
lg.Panic("couldn't read members from backend", zap.Error(err))
|
||||
}
|
||||
return members, removed
|
||||
}
|
||||
|
||||
// TrimMembershipFromBackend removes all information about members &
|
||||
// removed_members from the v3 backend.
|
||||
func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
|
||||
lg.Info("Trimming membership information from the backend...")
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
err := tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
|
||||
tx.UnsafeDelete(buckets.Members, k)
|
||||
lg.Debug("Removed member from the backend",
|
||||
zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.UnsafeForEach(buckets.MembersRemoved, func(k, v []byte) error {
|
||||
tx.UnsafeDelete(buckets.MembersRemoved, k)
|
||||
lg.Debug("Removed removed_member from the backend",
|
||||
zap.Stringer("member", mustParseMemberIDFromBytes(lg, k)))
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// TrimMembershipFromV2Store removes all information about members &
|
||||
// removed_members from the v2 store.
|
||||
func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
|
||||
members, removed := membersFromStore(lg, s)
|
||||
|
||||
for mID := range members {
|
||||
_, err := s.Delete(MemberStoreKey(mID), true, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for mID := range removed {
|
||||
_, err := s.Delete(RemovedMemberStoreKey(mID), true, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// The field is populated since etcd v3.5.
|
||||
func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
|
||||
ckey := buckets.ClusterClusterVersionKeyName
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(buckets.Cluster, ckey, []byte(ver.String()))
|
||||
}
|
||||
|
||||
// The field is populated since etcd v3.5.
|
||||
func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *DowngradeInfo) {
|
||||
dkey := buckets.ClusterDowngradeKeyName
|
||||
dvalue, err := json.Marshal(downgrade)
|
||||
if err != nil {
|
||||
lg.Panic("failed to marshal downgrade information", zap.Error(err))
|
||||
}
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(buckets.Cluster, dkey, dvalue)
|
||||
}
|
||||
|
||||
func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
|
||||
b, err := json.Marshal(m.RaftAttributes)
|
||||
if err != nil {
|
||||
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
|
||||
}
|
||||
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
|
||||
if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
|
||||
lg.Panic(
|
||||
"failed to save member to store",
|
||||
zap.String("path", p),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) {
|
||||
if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
|
||||
lg.Panic(
|
||||
"failed to delete member from store",
|
||||
zap.String("path", MemberStoreKey(id)),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
|
||||
lg.Panic(
|
||||
"failed to create removedMember",
|
||||
zap.String("path", RemovedMemberStoreKey(id)),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {
|
||||
b, err := json.Marshal(m.RaftAttributes)
|
||||
if err != nil {
|
||||
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
|
||||
}
|
||||
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
|
||||
if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
|
||||
lg.Panic(
|
||||
"failed to update raftAttributes",
|
||||
zap.String("path", p),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) {
|
||||
b, err := json.Marshal(m.Attributes)
|
||||
if err != nil {
|
||||
lg.Panic("failed to marshal attributes", zap.Error(err))
|
||||
}
|
||||
p := path.Join(MemberStoreKey(m.ID), attributesSuffix)
|
||||
if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
|
||||
lg.Panic(
|
||||
"failed to update attributes",
|
||||
zap.String("path", p),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) {
|
||||
if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
|
||||
lg.Panic(
|
||||
"failed to save cluster version to store",
|
||||
zap.String("path", StoreClusterVersionKey()),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// nodeToMember builds member from a key value node.
|
||||
// the child nodes of the given node MUST be sorted by key.
|
||||
func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) {
|
||||
m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)}
|
||||
attrs := make(map[string][]byte)
|
||||
raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
|
||||
attrKey := path.Join(n.Key, attributesSuffix)
|
||||
for _, nn := range n.Nodes {
|
||||
if nn.Key != raftAttrKey && nn.Key != attrKey {
|
||||
return nil, fmt.Errorf("unknown key %q", nn.Key)
|
||||
}
|
||||
attrs[nn.Key] = []byte(*nn.Value)
|
||||
}
|
||||
if data := attrs[raftAttrKey]; data != nil {
|
||||
if err := json.Unmarshal(data, &m.RaftAttributes); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("raftAttributes key doesn't exist")
|
||||
}
|
||||
if data := attrs[attrKey]; data != nil {
|
||||
if err := json.Unmarshal(data, &m.Attributes); err != nil {
|
||||
return m, fmt.Errorf("unmarshal attributes error: %v", err)
|
||||
}
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func mustCreateBackendBuckets(be backend.Backend) {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(buckets.Members)
|
||||
tx.UnsafeCreateBucket(buckets.MembersRemoved)
|
||||
tx.UnsafeCreateBucket(buckets.Cluster)
|
||||
}
|
||||
|
||||
func MemberStoreKey(id types.ID) string {
|
||||
return path.Join(StoreMembersPrefix, id.String())
|
||||
}
|
||||
|
||||
func StoreClusterVersionKey() string {
|
||||
return path.Join(storePrefix, "version")
|
||||
}
|
||||
|
||||
func MemberAttributesStorePath(id types.ID) string {
|
||||
return path.Join(MemberStoreKey(id), attributesSuffix)
|
||||
}
|
||||
|
||||
func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
|
||||
id, err := types.IDFromString(string(key))
|
||||
if err != nil {
|
||||
lg.Panic("failed to parse member id from key", zap.Error(err))
|
||||
}
|
||||
return id
|
||||
type DowngradeInfoBackend interface {
|
||||
MustSaveDowngradeToBackend(*DowngradeInfo)
|
||||
DowngradeInfoFromBackend() *DowngradeInfo
|
||||
}
|
||||
|
||||
func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID {
|
||||
@ -317,7 +54,3 @@ func MustParseMemberIDFromKey(lg *zap.Logger, key string) types.ID {
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
func RemovedMemberStoreKey(id types.ID) string {
|
||||
return path.Join(storeRemovedMembersPrefix, id.String())
|
||||
}
|
||||
|
@ -15,7 +15,28 @@
|
||||
package membership
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
"path"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
// the prefix for storing membership related information in store provided by store pkg.
|
||||
storePrefix = "/0"
|
||||
|
||||
attributesSuffix = "attributes"
|
||||
raftAttributesSuffix = "raftAttributes"
|
||||
)
|
||||
|
||||
var (
|
||||
StoreMembersPrefix = path.Join(storePrefix, "members")
|
||||
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
|
||||
)
|
||||
|
||||
// IsMetaStoreOnly verifies if the given `store` contains only
|
||||
@ -34,3 +55,155 @@ func IsMetaStoreOnly(store v2store.Store) (bool, error) {
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// TrimMembershipFromV2Store removes all information about members &
|
||||
// removed_members from the v2 store.
|
||||
func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
|
||||
members, removed := membersFromStore(lg, s)
|
||||
|
||||
for mID := range members {
|
||||
_, err := s.Delete(MemberStoreKey(mID), true, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for mID := range removed {
|
||||
_, err := s.Delete(RemovedMemberStoreKey(mID), true, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
|
||||
b, err := json.Marshal(m.RaftAttributes)
|
||||
if err != nil {
|
||||
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
|
||||
}
|
||||
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
|
||||
if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
|
||||
lg.Panic(
|
||||
"failed to save member to store",
|
||||
zap.String("path", p),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) {
|
||||
if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
|
||||
lg.Panic(
|
||||
"failed to delete member from store",
|
||||
zap.String("path", MemberStoreKey(id)),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
|
||||
lg.Panic(
|
||||
"failed to create removedMember",
|
||||
zap.String("path", RemovedMemberStoreKey(id)),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {
|
||||
b, err := json.Marshal(m.RaftAttributes)
|
||||
if err != nil {
|
||||
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
|
||||
}
|
||||
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
|
||||
if _, err := s.Update(p, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
|
||||
lg.Panic(
|
||||
"failed to update raftAttributes",
|
||||
zap.String("path", p),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func mustUpdateMemberAttrInStore(lg *zap.Logger, s v2store.Store, m *Member) {
|
||||
b, err := json.Marshal(m.Attributes)
|
||||
if err != nil {
|
||||
lg.Panic("failed to marshal attributes", zap.Error(err))
|
||||
}
|
||||
p := path.Join(MemberStoreKey(m.ID), attributesSuffix)
|
||||
if _, err := s.Set(p, false, string(b), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
|
||||
lg.Panic(
|
||||
"failed to update attributes",
|
||||
zap.String("path", p),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func mustSaveClusterVersionToStore(lg *zap.Logger, s v2store.Store, ver *semver.Version) {
|
||||
if _, err := s.Set(StoreClusterVersionKey(), false, ver.String(), v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
|
||||
lg.Panic(
|
||||
"failed to save cluster version to store",
|
||||
zap.String("path", StoreClusterVersionKey()),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// nodeToMember builds member from a key value node.
|
||||
// the child nodes of the given node MUST be sorted by key.
|
||||
func nodeToMember(lg *zap.Logger, n *v2store.NodeExtern) (*Member, error) {
|
||||
m := &Member{ID: MustParseMemberIDFromKey(lg, n.Key)}
|
||||
attrs := make(map[string][]byte)
|
||||
raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
|
||||
attrKey := path.Join(n.Key, attributesSuffix)
|
||||
for _, nn := range n.Nodes {
|
||||
if nn.Key != raftAttrKey && nn.Key != attrKey {
|
||||
return nil, fmt.Errorf("unknown key %q", nn.Key)
|
||||
}
|
||||
attrs[nn.Key] = []byte(*nn.Value)
|
||||
}
|
||||
if data := attrs[raftAttrKey]; data != nil {
|
||||
if err := json.Unmarshal(data, &m.RaftAttributes); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("raftAttributes key doesn't exist")
|
||||
}
|
||||
if data := attrs[attrKey]; data != nil {
|
||||
if err := json.Unmarshal(data, &m.Attributes); err != nil {
|
||||
return m, fmt.Errorf("unmarshal attributes error: %v", err)
|
||||
}
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func StoreClusterVersionKey() string {
|
||||
return path.Join(storePrefix, "version")
|
||||
}
|
||||
|
||||
func RemovedMemberStoreKey(id types.ID) string {
|
||||
return path.Join(storeRemovedMembersPrefix, id.String())
|
||||
}
|
||||
|
||||
func MemberStoreKey(id types.ID) string {
|
||||
return path.Join(StoreMembersPrefix, id.String())
|
||||
}
|
||||
|
||||
func MemberAttributesStorePath(id types.ID) string {
|
||||
return path.Join(MemberStoreKey(id), attributesSuffix)
|
||||
}
|
||||
|
||||
func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
|
||||
e, err := st.Get(path.Join(storePrefix, "version"), false, false)
|
||||
if err != nil {
|
||||
if isKeyNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
lg.Panic(
|
||||
"failed to get cluster version from store",
|
||||
zap.String("path", path.Join(storePrefix, "version")),
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
return semver.Must(semver.NewVersion(*e.Node.Value))
|
||||
}
|
||||
|
@ -67,6 +67,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/lease/leasehttp"
|
||||
"go.etcd.io/etcd/server/v3/mvcc"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
"go.etcd.io/etcd/server/v3/wal"
|
||||
)
|
||||
|
||||
@ -316,7 +317,7 @@ func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
|
||||
bh.confStateLock.Lock()
|
||||
defer bh.confStateLock.Unlock()
|
||||
if bh.confStateDirty {
|
||||
membership.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
|
||||
buckets.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
|
||||
// save bh.confState
|
||||
bh.confStateDirty = false
|
||||
}
|
||||
@ -432,7 +433,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
remotes = existingCluster.Members()
|
||||
cl.SetID(types.ID(0), existingCluster.ID())
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(be)
|
||||
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
|
||||
id, n, s, w = startNode(cfg, cl, nil)
|
||||
cl.SetID(id, existingCluster.ID())
|
||||
|
||||
@ -467,7 +468,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
}
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(be)
|
||||
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
|
||||
id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
|
||||
cl.SetID(id, cl.ID())
|
||||
|
||||
@ -537,7 +538,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
}
|
||||
|
||||
cl.SetStore(st)
|
||||
cl.SetBackend(be)
|
||||
cl.SetBackend(buckets.NewMembershipStore(cfg.Logger, be))
|
||||
cl.Recover(api.UpdateCapability)
|
||||
if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
|
||||
os.RemoveAll(bepath)
|
||||
@ -1313,7 +1314,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
|
||||
|
||||
lg.Info("restored v2 store")
|
||||
|
||||
s.cluster.SetBackend(newbe)
|
||||
s.cluster.SetBackend(buckets.NewMembershipStore(lg, newbe))
|
||||
|
||||
lg.Info("restoring cluster configuration")
|
||||
|
||||
|
@ -53,6 +53,7 @@ import (
|
||||
"go.etcd.io/etcd/server/v3/mock/mockwait"
|
||||
"go.etcd.io/etcd/server/v3/mvcc"
|
||||
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
@ -695,7 +696,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) {
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
srv.beHooks.OnPreCommitUnsafe(tx)
|
||||
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *membership.UnsafeConfStateFromBackend(lg, tx))
|
||||
assert.Equal(t, raftpb.ConfState{Voters: []uint64{2}}, *buckets.UnsafeConfStateFromBackend(lg, tx))
|
||||
})
|
||||
rindex, rterm := cindex.ReadConsistentIndex(be.BatchTx())
|
||||
assert.Equal(t, consistIndex, rindex)
|
||||
|
@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package membership
|
||||
package buckets
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@ -20,7 +20,6 @@ import (
|
||||
|
||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/buckets"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -32,20 +31,20 @@ func MustUnsafeSaveConfStateToBackend(lg *zap.Logger, tx backend.BatchTx, confSt
|
||||
lg.Panic("Cannot marshal raftpb.ConfState", zap.Stringer("conf-state", confState), zap.Error(err))
|
||||
}
|
||||
|
||||
tx.UnsafePut(buckets.Meta, buckets.MetaConfStateName, confStateBytes)
|
||||
tx.UnsafePut(Meta, MetaConfStateName, confStateBytes)
|
||||
}
|
||||
|
||||
// UnsafeConfStateFromBackend retrieves ConfState from the backend.
|
||||
// Returns nil if confState in backend is not persisted (e.g. backend writen by <v3.5).
|
||||
func UnsafeConfStateFromBackend(lg *zap.Logger, tx backend.ReadTx) *raftpb.ConfState {
|
||||
keys, vals := tx.UnsafeRange(buckets.Meta, buckets.MetaConfStateName, nil, 0)
|
||||
keys, vals := tx.UnsafeRange(Meta, MetaConfStateName, nil, 0)
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(keys) != 1 {
|
||||
lg.Panic(
|
||||
"unexpected number of key: "+string(buckets.MetaConfStateName)+" when getting cluster version from backend",
|
||||
"unexpected number of key: "+string(MetaConfStateName)+" when getting cluster version from backend",
|
||||
zap.Int("number-of-key", len(keys)),
|
||||
)
|
||||
}
|
@ -12,15 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package membership_test
|
||||
package buckets
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/raft/v3/raftpb"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
|
||||
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
@ -31,15 +29,15 @@ func TestConfStateFromBackendInOneTx(t *testing.T) {
|
||||
defer betesting.Close(t, be)
|
||||
|
||||
tx := be.BatchTx()
|
||||
cindex.CreateMetaBucket(tx)
|
||||
tx.UnsafeCreateBucket(Meta)
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
assert.Nil(t, membership.UnsafeConfStateFromBackend(lg, tx))
|
||||
assert.Nil(t, UnsafeConfStateFromBackend(lg, tx))
|
||||
|
||||
confState := raftpb.ConfState{Learners: []uint64{1, 2}, Voters: []uint64{3}, AutoLeave: false}
|
||||
membership.MustUnsafeSaveConfStateToBackend(lg, tx, &confState)
|
||||
MustUnsafeSaveConfStateToBackend(lg, tx, &confState)
|
||||
|
||||
assert.Equal(t, confState, *membership.UnsafeConfStateFromBackend(lg, tx))
|
||||
assert.Equal(t, confState, *UnsafeConfStateFromBackend(lg, tx))
|
||||
}
|
||||
|
||||
func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
|
||||
@ -49,7 +47,7 @@ func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
|
||||
|
||||
{
|
||||
tx := be.BatchTx()
|
||||
cindex.CreateMetaBucket(tx)
|
||||
tx.UnsafeCreateBucket(Meta)
|
||||
tx.Commit()
|
||||
}
|
||||
|
||||
@ -57,7 +55,7 @@ func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
|
||||
tx := be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
assert.Nil(t, membership.UnsafeConfStateFromBackend(lg, tx))
|
||||
assert.Nil(t, UnsafeConfStateFromBackend(lg, tx))
|
||||
})
|
||||
|
||||
confState := raftpb.ConfState{Learners: []uint64{1, 2}, Voters: []uint64{3}, AutoLeave: false}
|
||||
@ -65,7 +63,7 @@ func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
|
||||
t.Run("save", func(t *testing.T) {
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
membership.MustUnsafeSaveConfStateToBackend(lg, tx, &confState)
|
||||
MustUnsafeSaveConfStateToBackend(lg, tx, &confState)
|
||||
tx.Unlock()
|
||||
tx.Commit()
|
||||
})
|
||||
@ -74,6 +72,6 @@ func TestMustUnsafeSaveConfStateToBackend(t *testing.T) {
|
||||
tx := be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
assert.Equal(t, confState, *membership.UnsafeConfStateFromBackend(lg, tx))
|
||||
assert.Equal(t, confState, *UnsafeConfStateFromBackend(lg, tx))
|
||||
})
|
||||
}
|
237
server/mvcc/buckets/membership.go
Normal file
237
server/mvcc/buckets/membership.go
Normal file
@ -0,0 +1,237 @@
|
||||
// Copyright 2016 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package buckets
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/types"
|
||||
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
|
||||
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
MemberAttributesSuffix = "attributes"
|
||||
MemberRaftAttributesSuffix = "raftAttributes"
|
||||
)
|
||||
|
||||
type membershipStore struct {
|
||||
lg *zap.Logger
|
||||
be backend.Backend
|
||||
}
|
||||
|
||||
func NewMembershipStore(lg *zap.Logger, be backend.Backend) *membershipStore {
|
||||
return &membershipStore{
|
||||
lg: lg,
|
||||
be: be,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *membershipStore) MustSaveMemberToBackend(m *membership.Member) {
|
||||
mkey := BackendMemberKey(m.ID)
|
||||
mvalue, err := json.Marshal(m)
|
||||
if err != nil {
|
||||
s.lg.Panic("failed to marshal member", zap.Error(err))
|
||||
}
|
||||
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(Members, mkey, mvalue)
|
||||
}
|
||||
|
||||
// TrimClusterFromBackend removes all information about cluster (versions)
|
||||
// from the v3 backend.
|
||||
func (s *membershipStore) TrimClusterFromBackend() error {
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeDeleteBucket(Cluster)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *membershipStore) MustDeleteMemberFromBackend(id types.ID) {
|
||||
mkey := BackendMemberKey(id)
|
||||
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeDelete(Members, mkey)
|
||||
tx.UnsafePut(MembersRemoved, mkey, []byte("removed"))
|
||||
}
|
||||
|
||||
func (s *membershipStore) MustReadMembersFromBackend() (map[types.ID]*membership.Member, map[types.ID]bool) {
|
||||
members, removed, err := s.readMembersFromBackend()
|
||||
if err != nil {
|
||||
s.lg.Panic("couldn't read members from backend", zap.Error(err))
|
||||
}
|
||||
return members, removed
|
||||
}
|
||||
|
||||
func (s *membershipStore) readMembersFromBackend() (map[types.ID]*membership.Member, map[types.ID]bool, error) {
|
||||
members := make(map[types.ID]*membership.Member)
|
||||
removed := make(map[types.ID]bool)
|
||||
|
||||
tx := s.be.ReadTx()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
err := tx.UnsafeForEach(Members, func(k, v []byte) error {
|
||||
memberId := mustParseMemberIDFromBytes(s.lg, k)
|
||||
m := &membership.Member{ID: memberId}
|
||||
if err := json.Unmarshal(v, &m); err != nil {
|
||||
return err
|
||||
}
|
||||
members[memberId] = m
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("couldn't read members from backend: %w", err)
|
||||
}
|
||||
|
||||
err = tx.UnsafeForEach(MembersRemoved, func(k, v []byte) error {
|
||||
memberId := mustParseMemberIDFromBytes(s.lg, k)
|
||||
removed[memberId] = true
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("couldn't read members_removed from backend: %w", err)
|
||||
}
|
||||
return members, removed, nil
|
||||
}
|
||||
|
||||
// TrimMembershipFromBackend removes all information about members &
|
||||
// removed_members from the v3 backend.
|
||||
func (s *membershipStore) TrimMembershipFromBackend() error {
|
||||
s.lg.Info("Trimming membership information from the backend...")
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
err := tx.UnsafeForEach(Members, func(k, v []byte) error {
|
||||
tx.UnsafeDelete(Members, k)
|
||||
s.lg.Debug("Removed member from the backend",
|
||||
zap.Stringer("member", mustParseMemberIDFromBytes(s.lg, k)))
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.UnsafeForEach(MembersRemoved, func(k, v []byte) error {
|
||||
tx.UnsafeDelete(MembersRemoved, k)
|
||||
s.lg.Debug("Removed removed_member from the backend",
|
||||
zap.Stringer("member", mustParseMemberIDFromBytes(s.lg, k)))
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// MustSaveClusterVersionToBackend saves cluster version to backend.
|
||||
// The field is populated since etcd v3.5.
|
||||
func (s *membershipStore) MustSaveClusterVersionToBackend(ver *semver.Version) {
|
||||
ckey := ClusterClusterVersionKeyName
|
||||
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(Cluster, ckey, []byte(ver.String()))
|
||||
}
|
||||
|
||||
// MustSaveDowngradeToBackend saves downgrade info to backend.
|
||||
// The field is populated since etcd v3.5.
|
||||
func (s *membershipStore) MustSaveDowngradeToBackend(downgrade *membership.DowngradeInfo) {
|
||||
dkey := ClusterDowngradeKeyName
|
||||
dvalue, err := json.Marshal(downgrade)
|
||||
if err != nil {
|
||||
s.lg.Panic("failed to marshal downgrade information", zap.Error(err))
|
||||
}
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafePut(Cluster, dkey, dvalue)
|
||||
}
|
||||
|
||||
func (s *membershipStore) MustCreateBackendBuckets() {
|
||||
tx := s.be.BatchTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
tx.UnsafeCreateBucket(Members)
|
||||
tx.UnsafeCreateBucket(MembersRemoved)
|
||||
tx.UnsafeCreateBucket(Cluster)
|
||||
}
|
||||
|
||||
func mustParseMemberIDFromBytes(lg *zap.Logger, key []byte) types.ID {
|
||||
id, err := types.IDFromString(string(key))
|
||||
if err != nil {
|
||||
lg.Panic("failed to parse member id from key", zap.Error(err))
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
// ClusterVersionFromBackend reads cluster version from backend.
|
||||
// The field is populated since etcd v3.5.
|
||||
func (s *membershipStore) ClusterVersionFromBackend() *semver.Version {
|
||||
ckey := ClusterClusterVersionKeyName
|
||||
tx := s.be.ReadTx()
|
||||
tx.RLock()
|
||||
defer tx.RUnlock()
|
||||
keys, vals := tx.UnsafeRange(Cluster, ckey, nil, 0)
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
if len(keys) != 1 {
|
||||
s.lg.Panic(
|
||||
"unexpected number of keys when getting cluster version from backend",
|
||||
zap.Int("number-of-key", len(keys)),
|
||||
)
|
||||
}
|
||||
return semver.Must(semver.NewVersion(string(vals[0])))
|
||||
}
|
||||
|
||||
// DowngradeInfoFromBackend reads downgrade info from backend.
|
||||
// The field is populated since etcd v3.5.
|
||||
func (s *membershipStore) DowngradeInfoFromBackend() *membership.DowngradeInfo {
|
||||
dkey := ClusterDowngradeKeyName
|
||||
tx := s.be.ReadTx()
|
||||
tx.Lock()
|
||||
defer tx.Unlock()
|
||||
keys, vals := tx.UnsafeRange(Cluster, dkey, nil, 0)
|
||||
if len(keys) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(keys) != 1 {
|
||||
s.lg.Panic(
|
||||
"unexpected number of keys when getting cluster version from backend",
|
||||
zap.Int("number-of-key", len(keys)),
|
||||
)
|
||||
}
|
||||
var d membership.DowngradeInfo
|
||||
if err := json.Unmarshal(vals[0], &d); err != nil {
|
||||
s.lg.Panic("failed to unmarshal downgrade information", zap.Error(err))
|
||||
}
|
||||
|
||||
// verify the downgrade info from backend
|
||||
if d.Enabled {
|
||||
if _, err := semver.NewVersion(d.TargetVersion); err != nil {
|
||||
s.lg.Panic(
|
||||
"unexpected version format of the downgrade target version from backend",
|
||||
zap.String("target-version", d.TargetVersion),
|
||||
)
|
||||
}
|
||||
}
|
||||
return &d
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user