mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #5000 from xiang90/clu_storage
membership: save/update/delete member when backend is provided
This commit is contained in:
commit
1c12b66e35
@ -29,6 +29,7 @@ import (
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/raft"
|
||||
"github.com/coreos/etcd/raft/raftpb"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/etcd/version"
|
||||
"github.com/coreos/go-semver/semver"
|
||||
@ -38,7 +39,9 @@ import (
|
||||
type RaftCluster struct {
|
||||
id types.ID
|
||||
token string
|
||||
|
||||
store store.Store
|
||||
be backend.Backend
|
||||
|
||||
sync.Mutex // guards the fields below
|
||||
version *semver.Version
|
||||
@ -279,15 +282,12 @@ func (c *RaftCluster) AddMember(m *Member) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
if c.store != nil {
|
||||
b, err := json.Marshal(m.RaftAttributes)
|
||||
if err != nil {
|
||||
plog.Panicf("marshal raftAttributes should never fail: %v", err)
|
||||
}
|
||||
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
|
||||
if _, err := c.store.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
|
||||
plog.Panicf("create raftAttributes should never fail: %v", err)
|
||||
}
|
||||
mustSaveMemberToStore(c.store, m)
|
||||
}
|
||||
if c.be != nil {
|
||||
mustSaveMemberToBackend(c.be, m)
|
||||
}
|
||||
|
||||
c.members[m.ID] = m
|
||||
}
|
||||
|
||||
@ -297,16 +297,13 @@ func (c *RaftCluster) RemoveMember(id types.ID) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
if c.store != nil {
|
||||
if _, err := c.store.Delete(MemberStoreKey(id), true, true); err != nil {
|
||||
plog.Panicf("delete member should never fail: %v", err)
|
||||
}
|
||||
mustDeleteMemberFromStore(c.store, id)
|
||||
}
|
||||
if c.be != nil {
|
||||
mustDeleteMemberFromBackend(c.be, id)
|
||||
}
|
||||
|
||||
delete(c.members, id)
|
||||
if c.store != nil {
|
||||
if _, err := c.store.Create(RemovedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
|
||||
plog.Panicf("create removedMember should never fail: %v", err)
|
||||
}
|
||||
}
|
||||
c.removed[id] = true
|
||||
}
|
||||
|
||||
@ -331,17 +328,13 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes)
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
if c.store != nil {
|
||||
b, err := json.Marshal(raftAttr)
|
||||
if err != nil {
|
||||
plog.Panicf("marshal raftAttributes should never fail: %v", err)
|
||||
}
|
||||
p := path.Join(MemberStoreKey(id), raftAttributesSuffix)
|
||||
if _, err := c.store.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
|
||||
plog.Panicf("update raftAttributes should never fail: %v", err)
|
||||
}
|
||||
}
|
||||
c.members[id].RaftAttributes = raftAttr
|
||||
if c.store != nil {
|
||||
mustUpdateMemberInStore(c.store, c.members[id])
|
||||
}
|
||||
if c.be != nil {
|
||||
mustSaveMemberToBackend(c.be, c.members[id])
|
||||
}
|
||||
}
|
||||
|
||||
func (c *RaftCluster) Version() *semver.Version {
|
||||
|
@ -17,33 +17,17 @@ package membership
|
||||
import (
|
||||
"crypto/sha1"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"path"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/store"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
)
|
||||
|
||||
var (
|
||||
plog = capnslog.NewPackageLogger("github.com/coreos/etcd/etcdserver", "membership")
|
||||
|
||||
StoreMembersPrefix = path.Join(storePrefix, "members")
|
||||
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
|
||||
)
|
||||
|
||||
const (
|
||||
// TODO: make this private after moving all membership storage logic
|
||||
// from etcdserver pkg
|
||||
AttributesSuffix = "attributes"
|
||||
raftAttributesSuffix = "raftAttributes"
|
||||
|
||||
// the prefix for stroing membership related information in store provided by store pkg.
|
||||
storePrefix = "/0"
|
||||
)
|
||||
|
||||
// RaftAttributes represents the raft related attributes of an etcd member.
|
||||
@ -123,54 +107,6 @@ func (m *Member) IsStarted() bool {
|
||||
return len(m.Name) != 0
|
||||
}
|
||||
|
||||
func MemberStoreKey(id types.ID) string {
|
||||
return path.Join(StoreMembersPrefix, id.String())
|
||||
}
|
||||
|
||||
func MemberAttributesStorePath(id types.ID) string {
|
||||
return path.Join(MemberStoreKey(id), AttributesSuffix)
|
||||
}
|
||||
|
||||
func MustParseMemberIDFromKey(key string) types.ID {
|
||||
id, err := types.IDFromString(path.Base(key))
|
||||
if err != nil {
|
||||
plog.Panicf("unexpected parse member id error: %v", err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
func RemovedMemberStoreKey(id types.ID) string {
|
||||
return path.Join(storeRemovedMembersPrefix, id.String())
|
||||
}
|
||||
|
||||
// NodeToMember builds member from a key value node.
|
||||
// the child nodes of the given node MUST be sorted by key.
|
||||
func nodeToMember(n *store.NodeExtern) (*Member, error) {
|
||||
m := &Member{ID: MustParseMemberIDFromKey(n.Key)}
|
||||
attrs := make(map[string][]byte)
|
||||
raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
|
||||
attrKey := path.Join(n.Key, AttributesSuffix)
|
||||
for _, nn := range n.Nodes {
|
||||
if nn.Key != raftAttrKey && nn.Key != attrKey {
|
||||
return nil, fmt.Errorf("unknown key %q", nn.Key)
|
||||
}
|
||||
attrs[nn.Key] = []byte(*nn.Value)
|
||||
}
|
||||
if data := attrs[raftAttrKey]; data != nil {
|
||||
if err := json.Unmarshal(data, &m.RaftAttributes); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("raftAttributes key doesn't exist")
|
||||
}
|
||||
if data := attrs[attrKey]; data != nil {
|
||||
if err := json.Unmarshal(data, &m.Attributes); err != nil {
|
||||
return m, fmt.Errorf("unmarshal attributes error: %v", err)
|
||||
}
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// MembersByID implements sort by ID interface
|
||||
type MembersByID []*Member
|
||||
|
||||
|
149
etcdserver/membership/store.go
Normal file
149
etcdserver/membership/store.go
Normal file
@ -0,0 +1,149 @@
|
||||
// Copyright 2016 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package membership
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
|
||||
"github.com/coreos/etcd/pkg/types"
|
||||
"github.com/coreos/etcd/storage/backend"
|
||||
"github.com/coreos/etcd/store"
|
||||
)
|
||||
|
||||
const (
|
||||
// TODO: make this private after moving all membership storage logic
|
||||
// from etcdserver pkg
|
||||
AttributesSuffix = "attributes"
|
||||
raftAttributesSuffix = "raftAttributes"
|
||||
|
||||
// the prefix for stroing membership related information in store provided by store pkg.
|
||||
storePrefix = "/0"
|
||||
)
|
||||
|
||||
var (
|
||||
membersBucketName = []byte("members")
|
||||
membersRemovedBuckedName = []byte("members_removed")
|
||||
|
||||
StoreMembersPrefix = path.Join(storePrefix, "members")
|
||||
storeRemovedMembersPrefix = path.Join(storePrefix, "removed_members")
|
||||
)
|
||||
|
||||
func mustSaveMemberToBackend(be backend.Backend, m *Member) {
|
||||
mkey := backendMemberKey(m.ID)
|
||||
mvalue, err := json.Marshal(m.RaftAttributes)
|
||||
if err != nil {
|
||||
plog.Panicf("marshal raftAttributes should never fail: %v", err)
|
||||
}
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.UnsafePut(membersBucketName, mkey, mvalue)
|
||||
tx.Unlock()
|
||||
}
|
||||
|
||||
func mustDeleteMemberFromBackend(be backend.Backend, id types.ID) {
|
||||
mkey := backendMemberKey(id)
|
||||
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
tx.UnsafeDelete(membersBucketName, mkey)
|
||||
tx.UnsafePut(membersRemovedBuckedName, mkey, []byte("removed"))
|
||||
tx.Unlock()
|
||||
}
|
||||
|
||||
func mustSaveMemberToStore(s store.Store, m *Member) {
|
||||
b, err := json.Marshal(m.RaftAttributes)
|
||||
if err != nil {
|
||||
plog.Panicf("marshal raftAttributes should never fail: %v", err)
|
||||
}
|
||||
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
|
||||
if _, err := s.Create(p, false, string(b), false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
|
||||
plog.Panicf("create raftAttributes should never fail: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func mustDeleteMemberFromStore(s store.Store, id types.ID) {
|
||||
if _, err := s.Delete(MemberStoreKey(id), true, true); err != nil {
|
||||
plog.Panicf("delete member should never fail: %v", err)
|
||||
}
|
||||
if _, err := s.Create(RemovedMemberStoreKey(id), false, "", false, store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
|
||||
plog.Panicf("create removedMember should never fail: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func mustUpdateMemberInStore(s store.Store, m *Member) {
|
||||
b, err := json.Marshal(m.RaftAttributes)
|
||||
if err != nil {
|
||||
plog.Panicf("marshal raftAttributes should never fail: %v", err)
|
||||
}
|
||||
p := path.Join(MemberStoreKey(m.ID), raftAttributesSuffix)
|
||||
if _, err := s.Update(p, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
|
||||
plog.Panicf("update raftAttributes should never fail: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// nodeToMember builds member from a key value node.
|
||||
// the child nodes of the given node MUST be sorted by key.
|
||||
func nodeToMember(n *store.NodeExtern) (*Member, error) {
|
||||
m := &Member{ID: MustParseMemberIDFromKey(n.Key)}
|
||||
attrs := make(map[string][]byte)
|
||||
raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
|
||||
attrKey := path.Join(n.Key, AttributesSuffix)
|
||||
for _, nn := range n.Nodes {
|
||||
if nn.Key != raftAttrKey && nn.Key != attrKey {
|
||||
return nil, fmt.Errorf("unknown key %q", nn.Key)
|
||||
}
|
||||
attrs[nn.Key] = []byte(*nn.Value)
|
||||
}
|
||||
if data := attrs[raftAttrKey]; data != nil {
|
||||
if err := json.Unmarshal(data, &m.RaftAttributes); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal raftAttributes error: %v", err)
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("raftAttributes key doesn't exist")
|
||||
}
|
||||
if data := attrs[attrKey]; data != nil {
|
||||
if err := json.Unmarshal(data, &m.Attributes); err != nil {
|
||||
return m, fmt.Errorf("unmarshal attributes error: %v", err)
|
||||
}
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func backendMemberKey(id types.ID) []byte {
|
||||
return []byte(path.Join(id.String(), raftAttributesSuffix))
|
||||
}
|
||||
|
||||
func MemberStoreKey(id types.ID) string {
|
||||
return path.Join(StoreMembersPrefix, id.String())
|
||||
}
|
||||
|
||||
func MemberAttributesStorePath(id types.ID) string {
|
||||
return path.Join(MemberStoreKey(id), AttributesSuffix)
|
||||
}
|
||||
|
||||
func MustParseMemberIDFromKey(key string) types.ID {
|
||||
id, err := types.IDFromString(path.Base(key))
|
||||
if err != nil {
|
||||
plog.Panicf("unexpected parse member id error: %v", err)
|
||||
}
|
||||
return id
|
||||
}
|
||||
|
||||
func RemovedMemberStoreKey(id types.ID) string {
|
||||
return path.Join(storeRemovedMembersPrefix, id.String())
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user