diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 3df9588be..e177a6302 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -20,6 +20,7 @@ import ( "crypto/sha1" "encoding/binary" "encoding/json" + "errors" "fmt" "path" "sort" @@ -32,6 +33,7 @@ import ( "go.etcd.io/etcd/pkg/v3/netutil" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/raftpb" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2error" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/mvcc/backend" "go.etcd.io/etcd/server/v3/mvcc/buckets" @@ -254,12 +256,12 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { c.Lock() defer c.Unlock() - if c.be != nil { - c.version = clusterVersionFromBackend(c.lg, c.be) - c.members, c.removed = membersFromBackend(c.lg, c.be) - } else { + if c.v2store != nil { c.version = clusterVersionFromStore(c.lg, c.v2store) c.members, c.removed = membersFromStore(c.lg, c.v2store) + } else { + c.version = clusterVersionFromBackend(c.lg, c.be) + c.members, c.removed = membersFromBackend(c.lg, c.be) } if c.be != nil { @@ -381,11 +383,37 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() + + var v2Err, beErr error if c.v2store != nil { - mustSaveMemberToStore(c.lg, c.v2store, m) + v2Err = unsafeSaveMemberToStore(c.lg, c.v2store, m) + if v2Err != nil { + if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeNodeExist { + c.lg.Panic( + "failed to save member to store", + zap.String("member-id", m.ID.String()), + zap.Error(v2Err), + ) + } + } } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, m) + beErr = unsafeSaveMemberToBackend(c.lg, c.be, m) + if beErr != nil && !errors.Is(beErr, errMemberAlreadyExist) { + c.lg.Panic( + "failed to save member to backend", + zap.String("member-id", m.ID.String()), + zap.Error(beErr), + ) + } + } + // Panic if both storeV2 and backend report member already exist. + if v2Err != nil && (beErr != nil || c.be == nil) { + c.lg.Panic( + "failed to save member to store", + zap.String("member-id", m.ID.String()), + zap.Error(v2Err), + ) } c.members[m.ID] = m @@ -404,11 +432,36 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) { func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) { c.Lock() defer c.Unlock() + var v2Err, beErr error if c.v2store != nil { - mustDeleteMemberFromStore(c.lg, c.v2store, id) + v2Err = unsafeDeleteMemberFromStore(c.v2store, id) + if v2Err != nil { + if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeKeyNotFound { + c.lg.Panic( + "failed to delete member from store", + zap.String("member-id", id.String()), + zap.Error(v2Err), + ) + } + } } if c.be != nil && shouldApplyV3 { - mustDeleteMemberFromBackend(c.be, id) + beErr = unsafeDeleteMemberFromBackend(c.be, id) + if beErr != nil && !errors.Is(beErr, errMemberNotFound) { + c.lg.Panic( + "failed to delete member from backend", + zap.String("member-id", id.String()), + zap.Error(beErr), + ) + } + } + // Panic if both storeV2 and backend report member not found. + if v2Err != nil && (beErr != nil || c.be == nil) { + c.lg.Panic( + "failed to delete member from store", + zap.String("member-id", id.String()), + zap.Error(v2Err), + ) } m, ok := c.members[id] @@ -443,7 +496,7 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply mustUpdateMemberAttrInStore(c.lg, c.v2store, m) } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, m) + unsafeSaveMemberToBackend(c.lg, c.be, m) } return } @@ -476,7 +529,7 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) { mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, c.members[id]) + unsafeSaveMemberToBackend(c.lg, c.be, c.members[id]) } c.lg.Info( @@ -495,7 +548,7 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, mustUpdateMemberInStore(c.lg, c.v2store, c.members[id]) } if c.be != nil && shouldApplyV3 { - mustSaveMemberToBackend(c.lg, c.be, c.members[id]) + unsafeSaveMemberToBackend(c.lg, c.be, c.members[id]) } c.lg.Info( @@ -870,7 +923,7 @@ func (c *RaftCluster) PushMembershipToStorage() { if c.be != nil { TrimMembershipFromBackend(c.lg, c.be) for _, m := range c.members { - mustSaveMemberToBackend(c.lg, c.be, m) + unsafeSaveMemberToBackend(c.lg, c.be, m) } } if c.v2store != nil { diff --git a/server/etcdserver/api/membership/cluster_test.go b/server/etcdserver/api/membership/cluster_test.go index 23d81fec1..1793caa6a 100644 --- a/server/etcdserver/api/membership/cluster_test.go +++ b/server/etcdserver/api/membership/cluster_test.go @@ -20,8 +20,12 @@ import ( "path" "reflect" "testing" + "time" "github.com/coreos/go-semver/semver" + "github.com/stretchr/testify/assert" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" + "go.uber.org/zap" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/client/pkg/v3/testutil" @@ -29,8 +33,6 @@ import ( "go.etcd.io/etcd/raft/v3/raftpb" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/mock/mockstore" - - "go.uber.org/zap" ) func TestClusterMember(t *testing.T) { @@ -1019,3 +1021,193 @@ func TestIsVersionChangable(t *testing.T) { }) } } + +func TestAddMemberSyncsBackendAndStoreV2(t *testing.T) { + now := time.Now() + alice := NewMember("", nil, "alice", &now) + + tcs := []struct { + name string + + storeV2Nil bool + backendNil bool + storeV2Members []*Member + backendMembers []*Member + + expectPanics bool + expectMembers map[types.ID]*Member + }{ + { + name: "Adding new member should succeed", + }, + { + name: "Adding member should succeed if it was only in storeV2", + storeV2Members: []*Member{alice}, + }, + { + name: "Adding member should succeed if it was only in backend", + backendMembers: []*Member{alice}, + }, + { + name: "Adding member should fail if it exists in both", + storeV2Members: []*Member{alice}, + backendMembers: []*Member{alice}, + expectPanics: true, + }, + { + name: "Adding member should fail if it exists in storeV2 and backend is nil", + storeV2Members: []*Member{alice}, + backendNil: true, + expectPanics: true, + }, + { + name: "Adding member should succeed if it exists in backend and storageV2 is nil", + storeV2Nil: true, + backendMembers: []*Member{alice}, + }, + { + name: "Adding new member should succeed if backend is nil", + storeV2Members: []*Member{}, + backendNil: true, + }, + { + name: "Adding new member should fail if storageV2 is nil", + storeV2Nil: true, + backendMembers: []*Member{}, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + lg := zaptest.NewLogger(t) + be, _ := betesting.NewDefaultTmpBackend(t) + defer be.Close() + mustCreateBackendBuckets(be) + st := v2store.New() + for _, m := range tc.backendMembers { + unsafeSaveMemberToBackend(lg, be, m) + } + be.ForceCommit() + for _, m := range tc.storeV2Members { + mustSaveMemberToStore(lg, st, m) + } + cluster := NewCluster(lg) + if !tc.backendNil { + cluster.SetBackend(be) + } + if !tc.storeV2Nil { + cluster.SetStore(st) + } + if tc.expectPanics { + assert.Panics(t, func() { + cluster.AddMember(alice, ApplyBoth) + }) + } else { + cluster.AddMember(alice, ApplyBoth) + } + if !tc.storeV2Nil { + storeV2Members, _ := membersFromStore(lg, st) + assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, storeV2Members) + } + if !tc.backendNil { + be.ForceCommit() + beMembers, _ := mustReadMembersFromBackend(lg, be) + assert.Equal(t, map[types.ID]*Member{alice.ID: alice}, beMembers) + } + }) + } +} + +func TestRemoveMemberSyncsBackendAndStoreV2(t *testing.T) { + now := time.Now() + alice := NewMember("", nil, "alice", &now) + + tcs := []struct { + name string + + storeV2Nil bool + backendNil bool + storeV2Members []*Member + backendMembers []*Member + + expectMembers []*Member + expectPanics bool + }{ + { + name: "Removing new member should fail", + expectPanics: true, + }, + { + name: "Removing member should succeed if it was only in storeV2", + storeV2Members: []*Member{alice}, + }, + { + name: "Removing member should succeed if it was only in backend", + backendMembers: []*Member{alice}, + }, + { + name: "Removing member should succeed if it exists in both", + storeV2Members: []*Member{alice}, + backendMembers: []*Member{alice}, + }, + { + name: "Removing new member should fail if backend is nil", + storeV2Members: []*Member{}, + backendNil: true, + expectPanics: true, + }, + { + name: "Removing new member should succeed if storageV2 is nil", + storeV2Nil: true, + backendMembers: []*Member{}, + }, + { + name: "Removing member should succeed if it exists in v2storage and backend is nil", + storeV2Members: []*Member{alice}, + backendNil: true, + }, + { + name: "Removing member should succeed if it exists in backend and storageV2 is nil", + storeV2Nil: true, + backendMembers: []*Member{alice}, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + lg := zaptest.NewLogger(t) + be, _ := betesting.NewDefaultTmpBackend(t) + defer be.Close() + mustCreateBackendBuckets(be) + st := v2store.New() + for _, m := range tc.backendMembers { + unsafeSaveMemberToBackend(lg, be, m) + } + be.ForceCommit() + for _, m := range tc.storeV2Members { + mustSaveMemberToStore(lg, st, m) + } + cluster := NewCluster(lg) + if !tc.backendNil { + cluster.SetBackend(be) + } + if !tc.storeV2Nil { + cluster.SetStore(st) + } + if tc.expectPanics { + assert.Panics(t, func() { + cluster.RemoveMember(alice.ID, ApplyBoth) + }) + } else { + cluster.RemoveMember(alice.ID, ApplyBoth) + } + if !tc.storeV2Nil { + storeV2Members, _ := membersFromStore(lg, st) + assert.Equal(t, map[types.ID]*Member{}, storeV2Members) + } + if !tc.backendNil { + be.ForceCommit() + beMembers, _ := mustReadMembersFromBackend(lg, be) + assert.Equal(t, map[types.ID]*Member{}, beMembers) + } + }) + } +} diff --git a/server/etcdserver/api/membership/store.go b/server/etcdserver/api/membership/store.go index 0bab3e42e..a0cdf370a 100644 --- a/server/etcdserver/api/membership/store.go +++ b/server/etcdserver/api/membership/store.go @@ -15,6 +15,7 @@ package membership import ( + "bytes" "encoding/json" "fmt" "path" @@ -39,9 +40,11 @@ const ( var ( StoreMembersPrefix = path.Join(storePrefix, "members") storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members") + errMemberAlreadyExist = fmt.Errorf("member already exists") + errMemberNotFound = fmt.Errorf("member not found") ) -func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { +func unsafeSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) error { mkey := backendMemberKey(m.ID) mvalue, err := json.Marshal(m) if err != nil { @@ -51,7 +54,11 @@ func mustSaveMemberToBackend(lg *zap.Logger, be backend.Backend, m *Member) { tx := be.BatchTx() tx.Lock() defer tx.Unlock() + if unsafeMemberExists(tx, mkey) { + return errMemberAlreadyExist + } tx.UnsafePut(buckets.Members, mkey, mvalue) + return nil } // TrimClusterFromBackend removes all information about cluster (versions) @@ -64,14 +71,29 @@ func TrimClusterFromBackend(be backend.Backend) error { return nil } -func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { +func unsafeDeleteMemberFromBackend(be backend.Backend, id types.ID) error { mkey := backendMemberKey(id) tx := be.BatchTx() tx.Lock() defer tx.Unlock() - tx.UnsafeDelete(buckets.Members, mkey) tx.UnsafePut(buckets.MembersRemoved, mkey, []byte("removed")) + if !unsafeMemberExists(tx, mkey) { + return errMemberNotFound + } + tx.UnsafeDelete(buckets.Members, mkey) + return nil +} + +func unsafeMemberExists(tx backend.ReadTx, mkey []byte) bool { + var found bool + tx.UnsafeForEach(buckets.Members, func(k, v []byte) error { + if bytes.Equal(k, mkey) { + found = true + } + return nil + }) + return found } func readMembersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool, error) { @@ -182,35 +204,34 @@ func mustSaveDowngradeToBackend(lg *zap.Logger, be backend.Backend, downgrade *D } func mustSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) { - b, err := json.Marshal(m.RaftAttributes) + err := unsafeSaveMemberToStore(lg, s, m) if err != nil { - lg.Panic("failed to marshal raftAttributes", zap.Error(err)) - } - p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) - if _, err := s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { lg.Panic( "failed to save member to store", - zap.String("path", p), + zap.String("member-id", m.ID.String()), zap.Error(err), ) } } -func mustDeleteMemberFromStore(lg *zap.Logger, s v2store.Store, id types.ID) { +func unsafeSaveMemberToStore(lg *zap.Logger, s v2store.Store, m *Member) error { + b, err := json.Marshal(m.RaftAttributes) + if err != nil { + lg.Panic("failed to marshal raftAttributes", zap.Error(err)) + } + p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) + _, err = s.Create(p, false, string(b), false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}) + return err +} + +func unsafeDeleteMemberFromStore(s v2store.Store, id types.ID) error { if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil { - lg.Panic( - "failed to delete member from store", - zap.String("path", MemberStoreKey(id)), - zap.Error(err), - ) + return err } if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, v2store.TTLOptionSet{ExpireTime: v2store.Permanent}); err != nil { - lg.Panic( - "failed to create removedMember", - zap.String("path", RemovedMemberStoreKey(id)), - zap.Error(err), - ) + return err } + return nil } func mustUpdateMemberInStore(lg *zap.Logger, s v2store.Store, m *Member) {