Merge pull request #13348 from serathius/sync

Fix for v3.5 Ensure that cluster members stored in v2store and backend are in sync
This commit is contained in:
Piotr Tabor 2021-09-25 17:33:55 +02:00 committed by GitHub
commit 4312298b73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 300 additions and 34 deletions

View File

@ -20,6 +20,7 @@ import (
"crypto/sha1"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"path"
"sort"
@ -32,6 +33,7 @@ import (
"go.etcd.io/etcd/pkg/v3/netutil"
"go.etcd.io/etcd/raft/v3"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
@ -254,12 +256,12 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()
if c.be != nil {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
} else {
if c.v2store != nil {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
} else {
c.version = clusterVersionFromBackend(c.lg, c.be)
c.members, c.removed = membersFromBackend(c.lg, c.be)
}
if c.be != nil {
@ -381,11 +383,37 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()
var v2Err, beErr error
if c.v2store != nil {
mustSaveMemberToStore(c.lg, c.v2store, m)
v2Err = unsafeSaveMemberToStore(c.lg, c.v2store, m)
if v2Err != nil {
if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeNodeExist {
c.lg.Panic(
"failed to save member to store",
zap.String("member-id", m.ID.String()),
zap.Error(v2Err),
)
}
}
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
beErr = unsafeSaveMemberToBackend(c.lg, c.be, m)
if beErr != nil && !errors.Is(beErr, errMemberAlreadyExist) {
c.lg.Panic(
"failed to save member to backend",
zap.String("member-id", m.ID.String()),
zap.Error(beErr),
)
}
}
// Panic if both storeV2 and backend report member already exist.
if v2Err != nil && (beErr != nil || c.be == nil) {
c.lg.Panic(
"failed to save member to store",
zap.String("member-id", m.ID.String()),
zap.Error(v2Err),
)
}
c.members[m.ID] = m
@ -404,11 +432,36 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
c.Lock()
defer c.Unlock()
var v2Err, beErr error
if c.v2store != nil {
mustDeleteMemberFromStore(c.lg, c.v2store, id)
v2Err = unsafeDeleteMemberFromStore(c.v2store, id)
if v2Err != nil {
if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeKeyNotFound {
c.lg.Panic(
"failed to delete member from store",
zap.String("member-id", id.String()),
zap.Error(v2Err),
)
}
}
}
if c.be != nil && shouldApplyV3 {
mustDeleteMemberFromBackend(c.be, id)
beErr = unsafeDeleteMemberFromBackend(c.be, id)
if beErr != nil && !errors.Is(beErr, errMemberNotFound) {
c.lg.Panic(
"failed to delete member from backend",
zap.String("member-id", id.String()),
zap.Error(beErr),
)
}
}
// Panic if both storeV2 and backend report member not found.
if v2Err != nil && (beErr != nil || c.be == nil) {
c.lg.Panic(
"failed to delete member from store",
zap.String("member-id", id.String()),
zap.Error(v2Err),
)
}
m, ok := c.members[id]
@ -443,7 +496,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
mustUpdateMemberAttrInStore(c.lg, c.v2store, m)
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, m)
unsafeSaveMemberToBackend(c.lg, c.be, m)
}
return
}
@ -476,7 +529,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
}
c.lg.Info(
@ -495,7 +548,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
}
if c.be != nil && shouldApplyV3 {
mustSaveMemberToBackend(c.lg, c.be, c.members[id])
unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
}
c.lg.Info(
@ -870,7 +923,7 @@ func (c *RaftCluster) PushMembershipToStorage() {
if c.be != nil {
TrimMembershipFromBackend(c.lg, c.be)
for _, m := range c.members {
mustSaveMemberToBackend(c.lg, c.be, m)
unsafeSaveMemberToBackend(c.lg, c.be, m)
}
}
if c.v2store != nil {

View File

@ -20,8 +20,12 @@ import (
"path"
"reflect"
"testing"
"time"
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
"go.etcd.io/etcd/client/pkg/v3/testutil"
@ -29,8 +33,6 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/mock/mockstore"
"go.uber.org/zap"
)
func TestClusterMember(t *testing.T) {
@ -1019,3 +1021,193 @@ func TestIsVersionChangable(t *testing.T) {
})
}
}
func TestAddMemberSyncsBackendAndStoreV2(t *testing.T) {
now := time.Now()
alice := NewMember("", nil, "alice", &now)
tcs := []struct {
name string
storeV2Nil bool
backendNil bool
storeV2Members []*Member
backendMembers []*Member
expectPanics bool
expectMembers map[types.ID]*Member
}{
{
name: "Adding new member should succeed",
},
{
name: "Adding member should succeed if it was only in storeV2",
storeV2Members: []*Member{alice},
},
{
name: "Adding member should succeed if it was only in backend",
backendMembers: []*Member{alice},
},
{
name: "Adding member should fail if it exists in both",
storeV2Members: []*Member{alice},
backendMembers: []*Member{alice},
expectPanics: true,
},
{
name: "Adding member should fail if it exists in storeV2 and backend is nil",
storeV2Members: []*Member{alice},
backendNil: true,
expectPanics: true,
},
{
name: "Adding member should succeed if it exists in backend and storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{alice},
},
{
name: "Adding new member should succeed if backend is nil",
storeV2Members: []*Member{},
backendNil: true,
},
{
name: "Adding new member should fail if storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer be.Close()
mustCreateBackendBuckets(be)
st := v2store.New()
for _, m := range tc.backendMembers {
unsafeSaveMemberToBackend(lg, be, m)
}
be.ForceCommit()
for _, m := range tc.storeV2Members {
mustSaveMemberToStore(lg, st, m)
}
cluster := NewCluster(lg)
if !tc.backendNil {
cluster.SetBackend(be)
}
if !tc.storeV2Nil {
cluster.SetStore(st)
}
if tc.expectPanics {
assert.Panics(t, func() {
cluster.AddMember(alice, ApplyBoth)
})
} else {
cluster.AddMember(alice, ApplyBoth)
}
if !tc.storeV2Nil {
storeV2Members, _ := membersFromStore(lg, st)
assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, storeV2Members)
}
if !tc.backendNil {
be.ForceCommit()
beMembers, _ := mustReadMembersFromBackend(lg, be)
assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, beMembers)
}
})
}
}
func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) {
now := time.Now()
alice := NewMember("", nil, "alice", &now)
tcs := []struct {
name string
storeV2Nil bool
backendNil bool
storeV2Members []*Member
backendMembers []*Member
expectMembers []*Member
expectPanics bool
}{
{
name: "Removing new member should fail",
expectPanics: true,
},
{
name: "Removing member should succeed if it was only in storeV2",
storeV2Members: []*Member{alice},
},
{
name: "Removing member should succeed if it was only in backend",
backendMembers: []*Member{alice},
},
{
name: "Removing member should succeed if it exists in both",
storeV2Members: []*Member{alice},
backendMembers: []*Member{alice},
},
{
name: "Removing new member should fail if backend is nil",
storeV2Members: []*Member{},
backendNil: true,
expectPanics: true,
},
{
name: "Removing new member should succeed if storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{},
},
{
name: "Removing member should succeed if it exists in v2storage and backend is nil",
storeV2Members: []*Member{alice},
backendNil: true,
},
{
name: "Removing member should succeed if it exists in backend and storageV2 is nil",
storeV2Nil: true,
backendMembers: []*Member{alice},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer be.Close()
mustCreateBackendBuckets(be)
st := v2store.New()
for _, m := range tc.backendMembers {
unsafeSaveMemberToBackend(lg, be, m)
}
be.ForceCommit()
for _, m := range tc.storeV2Members {
mustSaveMemberToStore(lg, st, m)
}
cluster := NewCluster(lg)
if !tc.backendNil {
cluster.SetBackend(be)
}
if !tc.storeV2Nil {
cluster.SetStore(st)
}
if tc.expectPanics {
assert.Panics(t, func() {
cluster.RemoveMember(alice.ID, ApplyBoth)
})
} else {
cluster.RemoveMember(alice.ID, ApplyBoth)
}
if !tc.storeV2Nil {
storeV2Members, _ := membersFromStore(lg, st)
assert.Equal(t, map[types.ID]*Member{}, storeV2Members)
}
if !tc.backendNil {
be.ForceCommit()
beMembers, _ := mustReadMembersFromBackend(lg, be)
assert.Equal(t, map[types.ID]*Member{}, beMembers)
}
})
}
}

View File

@ -15,6 +15,7 @@
package membership
import (
"bytes"
"encoding/json"
"fmt"
"path"
@ -39,9 +40,11 @@ const (
var (
StoreMembersPrefix = path.Join(storePrefix, "members")
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
errMemberAlreadyExist = fmt.Errorf("member already exists")
errMemberNotFound = fmt.Errorf("member not found")
)
func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) error {
mkey := backendMemberKey(m.ID)
mvalue, err := json.Marshal(m)
if err != nil {
@ -51,7 +54,11 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) {
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
if unsafeMemberExists(tx, mkey) {
return errMemberAlreadyExist
}
tx.UnsafePut(buckets.Members, mkey, mvalue)
return nil
}
// TrimClusterFromBackend removes all information about cluster (versions)
@ -64,14 +71,29 @@ func TrimClusterFromBackend(be backend.Backend) error {
return nil
}
func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error {
mkey := backendMemberKey(id)
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
tx.UnsafeDelete(buckets.Members, mkey)
tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed"))
if !unsafeMemberExists(tx, mkey) {
return errMemberNotFound
}
tx.UnsafeDelete(buckets.Members, mkey)
return nil
}
func unsafeMemberExists(tx backend.ReadTx, mkey []byte) bool {
var found bool
tx.UnsafeForEach(buckets.Members, func(k, v []byte) error {
if bytes.Equal(k, mkey) {
found = true
}
return nil
})
return found
}
func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) {
@ -182,35 +204,34 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D
}
func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) {
b, err := json.Marshal(m.RaftAttributes)
err := unsafeSaveMemberToStore(lg, s, m)
if err != nil {
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to save member to store",
zap.String("path", p),
zap.String("member-id", m.ID.String()),
zap.Error(err),
)
}
}
func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) {
func unsafeSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) error {
b, err := json.Marshal(m.RaftAttributes)
if err != nil {
lg.Panic("failed to marshal raftAttributes", zap.Error(err))
}
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
_, err = s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent})
return err
}
func unsafeDeleteMemberFromStore(s v2store.Store, id types.ID) error {
if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
lg.Panic(
"failed to delete member from store",
zap.String("path", MemberStoreKey(id)),
zap.Error(err),
)
return err
}
if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil {
lg.Panic(
"failed to create removedMember",
zap.String("path", RemovedMemberStoreKey(id)),
zap.Error(err),
)
return err
}
return nil
}
func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {