From 29623cccb2d41062ed7e4ad9d4532239423d85e2 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 7 Apr 2016 16:30:34 -0700 Subject: [PATCH] membership: save/update/delete member when backend is provided --- etcdserver/membership/cluster.go | 45 ++++------ etcdserver/membership/member.go | 64 ------------- etcdserver/membership/store.go | 149 +++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+), 90 deletions(-) create mode 100644 etcdserver/membership/store.go diff --git a/etcdserver/membership/cluster.go b/etcdserver/membership/cluster.go index 7a6108454..0771e874f 100644 --- a/etcdserver/membership/cluster.go +++ b/etcdserver/membership/cluster.go @@ -29,6 +29,7 @@ import ( "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/storage/backend" "github.com/coreos/etcd/store" "github.com/coreos/etcd/version" "github.com/coreos/go-semver/semver" @@ -38,7 +39,9 @@ import ( type RaftCluster struct { id types.ID token string + store store.Store + be backend.Backend sync.Mutex // guards the fields below version *semver.Version @@ -279,15 +282,12 @@ func (c *RaftCluster) AddMember(m *Member) { c.Lock() defer c.Unlock() if c.store != nil { - b, err := json.Marshal(m.RaftAttributes) - if err != nil { - plog.Panicf("marshal raftAttributes should never fail: %v", err) - } - p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) - if _, err := c.store.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { - plog.Panicf("create raftAttributes should never fail: %v", err) - } + mustSaveMemberToStore(c.store, m) } + if c.be != nil { + mustSaveMemberToBackend(c.be, m) + } + c.members[m.ID] = m } @@ -297,16 +297,13 @@ func (c *RaftCluster) RemoveMember(id types.ID) { c.Lock() defer c.Unlock() if c.store != nil { - if _, err := c.store.Delete(MemberStoreKey(id), true, true); err != nil { - plog.Panicf("delete member should never fail: %v", err) - } + mustDeleteMemberFromStore(c.store, id) } + if c.be != nil { + mustDeleteMemberFromBackend(c.be, id) + } + delete(c.members, id) - if c.store != nil { - if _, err := c.store.Create(RemovedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { - plog.Panicf("create removedMember should never fail: %v", err) - } - } c.removed[id] = true } @@ -331,17 +328,13 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) c.Lock() defer c.Unlock() - if c.store != nil { - b, err := json.Marshal(raftAttr) - if err != nil { - plog.Panicf("marshal raftAttributes should never fail: %v", err) - } - p := path.Join(MemberStoreKey(id), raftAttributesSuffix) - if _, err := c.store.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { - plog.Panicf("update raftAttributes should never fail: %v", err) - } - } c.members[id].RaftAttributes = raftAttr + if c.store != nil { + mustUpdateMemberInStore(c.store, c.members[id]) + } + if c.be != nil { + mustSaveMemberToBackend(c.be, c.members[id]) + } } func (c *RaftCluster) Version() *semver.Version { diff --git a/etcdserver/membership/member.go b/etcdserver/membership/member.go index 7fdb39986..fbf2c0702 100644 --- a/etcdserver/membership/member.go +++ b/etcdserver/membership/member.go @@ -17,33 +17,17 @@ package membership import ( "crypto/sha1" "encoding/binary" - "encoding/json" "fmt" "math/rand" - "path" "sort" "time" "github.com/coreos/etcd/pkg/types" - "github.com/coreos/etcd/store" "github.com/coreos/pkg/capnslog" ) var ( plog = capnslog.NewPackageLogger("github.com/coreos/etcd/etcdserver", "membership") - - StoreMembersPrefix = path.Join(storePrefix, "members") - storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members") -) - -const ( - // TODO: make this private after moving all membership storage logic - // from etcdserver pkg - AttributesSuffix = "attributes" - raftAttributesSuffix = "raftAttributes" - - // the prefix for stroing membership related information in store provided by store pkg. - storePrefix = "/0" ) // RaftAttributes represents the raft related attributes of an etcd member. @@ -123,54 +107,6 @@ func (m *Member) IsStarted() bool { return len(m.Name) != 0 } -func MemberStoreKey(id types.ID) string { - return path.Join(StoreMembersPrefix, id.String()) -} - -func MemberAttributesStorePath(id types.ID) string { - return path.Join(MemberStoreKey(id), AttributesSuffix) -} - -func MustParseMemberIDFromKey(key string) types.ID { - id, err := types.IDFromString(path.Base(key)) - if err != nil { - plog.Panicf("unexpected parse member id error: %v", err) - } - return id -} - -func RemovedMemberStoreKey(id types.ID) string { - return path.Join(storeRemovedMembersPrefix, id.String()) -} - -// NodeToMember builds member from a key value node. -// the child nodes of the given node MUST be sorted by key. -func nodeToMember(n *store.NodeExtern) (*Member, error) { - m := &Member{ID: MustParseMemberIDFromKey(n.Key)} - attrs := make(map[string][]byte) - raftAttrKey := path.Join(n.Key, raftAttributesSuffix) - attrKey := path.Join(n.Key, AttributesSuffix) - for _, nn := range n.Nodes { - if nn.Key != raftAttrKey && nn.Key != attrKey { - return nil, fmt.Errorf("unknown key %q", nn.Key) - } - attrs[nn.Key] = []byte(*nn.Value) - } - if data := attrs[raftAttrKey]; data != nil { - if err := json.Unmarshal(data, &m.RaftAttributes); err != nil { - return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err) - } - } else { - return nil, fmt.Errorf("raftAttributes key doesn't exist") - } - if data := attrs[attrKey]; data != nil { - if err := json.Unmarshal(data, &m.Attributes); err != nil { - return m, fmt.Errorf("unmarshal attributes error: %v", err) - } - } - return m, nil -} - // MembersByID implements sort by ID interface type MembersByID []*Member diff --git a/etcdserver/membership/store.go b/etcdserver/membership/store.go new file mode 100644 index 000000000..cdd16371f --- /dev/null +++ b/etcdserver/membership/store.go @@ -0,0 +1,149 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package membership + +import ( + "encoding/json" + "fmt" + "path" + + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/storage/backend" + "github.com/coreos/etcd/store" +) + +const ( + // TODO: make this private after moving all membership storage logic + // from etcdserver pkg + AttributesSuffix = "attributes" + raftAttributesSuffix = "raftAttributes" + + // the prefix for stroing membership related information in store provided by store pkg. + storePrefix = "/0" +) + +var ( + membersBucketName = []byte("members") + membersRemovedBuckedName = []byte("members_removed") + + StoreMembersPrefix = path.Join(storePrefix, "members") + storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members") +) + +func mustSaveMemberToBackend(be backend.Backend, m *Member) { + mkey := backendMemberKey(m.ID) + mvalue, err := json.Marshal(m.RaftAttributes) + if err != nil { + plog.Panicf("marshal raftAttributes should never fail: %v", err) + } + + tx := be.BatchTx() + tx.Lock() + tx.UnsafePut(membersBucketName, mkey, mvalue) + tx.Unlock() +} + +func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) { + mkey := backendMemberKey(id) + + tx := be.BatchTx() + tx.Lock() + tx.UnsafeDelete(membersBucketName, mkey) + tx.UnsafePut(membersRemovedBuckedName, mkey, []byte("removed")) + tx.Unlock() +} + +func mustSaveMemberToStore(s store.Store, m *Member) { + b, err := json.Marshal(m.RaftAttributes) + if err != nil { + plog.Panicf("marshal raftAttributes should never fail: %v", err) + } + p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) + if _, err := s.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { + plog.Panicf("create raftAttributes should never fail: %v", err) + } +} + +func mustDeleteMemberFromStore(s store.Store, id types.ID) { + if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil { + plog.Panicf("delete member should never fail: %v", err) + } + if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { + plog.Panicf("create removedMember should never fail: %v", err) + } +} + +func mustUpdateMemberInStore(s store.Store, m *Member) { + b, err := json.Marshal(m.RaftAttributes) + if err != nil { + plog.Panicf("marshal raftAttributes should never fail: %v", err) + } + p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix) + if _, err := s.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil { + plog.Panicf("update raftAttributes should never fail: %v", err) + } +} + +// nodeToMember builds member from a key value node. +// the child nodes of the given node MUST be sorted by key. +func nodeToMember(n *store.NodeExtern) (*Member, error) { + m := &Member{ID: MustParseMemberIDFromKey(n.Key)} + attrs := make(map[string][]byte) + raftAttrKey := path.Join(n.Key, raftAttributesSuffix) + attrKey := path.Join(n.Key, AttributesSuffix) + for _, nn := range n.Nodes { + if nn.Key != raftAttrKey && nn.Key != attrKey { + return nil, fmt.Errorf("unknown key %q", nn.Key) + } + attrs[nn.Key] = []byte(*nn.Value) + } + if data := attrs[raftAttrKey]; data != nil { + if err := json.Unmarshal(data, &m.RaftAttributes); err != nil { + return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err) + } + } else { + return nil, fmt.Errorf("raftAttributes key doesn't exist") + } + if data := attrs[attrKey]; data != nil { + if err := json.Unmarshal(data, &m.Attributes); err != nil { + return m, fmt.Errorf("unmarshal attributes error: %v", err) + } + } + return m, nil +} + +func backendMemberKey(id types.ID) []byte { + return []byte(path.Join(id.String(), raftAttributesSuffix)) +} + +func MemberStoreKey(id types.ID) string { + return path.Join(StoreMembersPrefix, id.String()) +} + +func MemberAttributesStorePath(id types.ID) string { + return path.Join(MemberStoreKey(id), AttributesSuffix) +} + +func MustParseMemberIDFromKey(key string) types.ID { + id, err := types.IDFromString(path.Base(key)) + if err != nil { + plog.Panicf("unexpected parse member id error: %v", err) + } + return id +} + +func RemovedMemberStoreKey(id types.ID) string { + return path.Join(storeRemovedMembersPrefix, id.String()) +}