mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
membership: update attr in membership pkg
This commit is contained in:
parent
29623cccb2
commit
b13b77f362
@ -307,21 +307,21 @@ func (c *RaftCluster) RemoveMember(id types.ID) {
|
|||||||
c.removed[id] = true
|
c.removed[id] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) bool {
|
func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
|
||||||
c.Lock()
|
c.Lock()
|
||||||
defer c.Unlock()
|
defer c.Unlock()
|
||||||
if m, ok := c.members[id]; ok {
|
if m, ok := c.members[id]; ok {
|
||||||
m.Attributes = attr
|
m.Attributes = attr
|
||||||
return true
|
if c.store != nil {
|
||||||
|
mustUpdateMemberAttrInStore(c.store, m)
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
_, ok := c.removed[id]
|
_, ok := c.removed[id]
|
||||||
if ok {
|
if !ok {
|
||||||
plog.Warningf("skipped updating attributes of removed member %s", id)
|
|
||||||
} else {
|
|
||||||
plog.Panicf("error updating attributes of unknown member %s", id)
|
plog.Panicf("error updating attributes of unknown member %s", id)
|
||||||
}
|
}
|
||||||
// TODO: update store in this function
|
plog.Warningf("skipped updating attributes of removed member %s", id)
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
|
func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
|
||||||
|
@ -25,9 +25,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// TODO: make this private after moving all membership storage logic
|
attributesSuffix = "attributes"
|
||||||
// from etcdserver pkg
|
|
||||||
AttributesSuffix = "attributes"
|
|
||||||
raftAttributesSuffix = "raftAttributes"
|
raftAttributesSuffix = "raftAttributes"
|
||||||
|
|
||||||
// the prefix for stroing membership related information in store provided by store pkg.
|
// the prefix for stroing membership related information in store provided by store pkg.
|
||||||
@ -96,13 +94,24 @@ func mustUpdateMemberInStore(s store.Store, m *Member) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func mustUpdateMemberAttrInStore(s store.Store, m *Member) {
|
||||||
|
b, err := json.Marshal(m.Attributes)
|
||||||
|
if err != nil {
|
||||||
|
plog.Panicf("marshal raftAttributes should never fail: %v", err)
|
||||||
|
}
|
||||||
|
p := path.Join(MemberStoreKey(m.ID), attributesSuffix)
|
||||||
|
if _, err := s.Set(p, false, string(b), store.TTLOptionSet{ExpireTime: store.Permanent}); err != nil {
|
||||||
|
plog.Panicf("update raftAttributes should never fail: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// nodeToMember builds member from a key value node.
|
// nodeToMember builds member from a key value node.
|
||||||
// the child nodes of the given node MUST be sorted by key.
|
// the child nodes of the given node MUST be sorted by key.
|
||||||
func nodeToMember(n *store.NodeExtern) (*Member, error) {
|
func nodeToMember(n *store.NodeExtern) (*Member, error) {
|
||||||
m := &Member{ID: MustParseMemberIDFromKey(n.Key)}
|
m := &Member{ID: MustParseMemberIDFromKey(n.Key)}
|
||||||
attrs := make(map[string][]byte)
|
attrs := make(map[string][]byte)
|
||||||
raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
|
raftAttrKey := path.Join(n.Key, raftAttributesSuffix)
|
||||||
attrKey := path.Join(n.Key, AttributesSuffix)
|
attrKey := path.Join(n.Key, attributesSuffix)
|
||||||
for _, nn := range n.Nodes {
|
for _, nn := range n.Nodes {
|
||||||
if nn.Key != raftAttrKey && nn.Key != attrKey {
|
if nn.Key != raftAttrKey && nn.Key != attrKey {
|
||||||
return nil, fmt.Errorf("unknown key %q", nn.Key)
|
return nil, fmt.Errorf("unknown key %q", nn.Key)
|
||||||
@ -133,7 +142,7 @@ func MemberStoreKey(id types.ID) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func MemberAttributesStorePath(id types.ID) string {
|
func MemberAttributesStorePath(id types.ID) string {
|
||||||
return path.Join(MemberStoreKey(id), AttributesSuffix)
|
return path.Join(MemberStoreKey(id), attributesSuffix)
|
||||||
}
|
}
|
||||||
|
|
||||||
func MustParseMemberIDFromKey(key string) types.ID {
|
func MustParseMemberIDFromKey(key string) types.ID {
|
||||||
|
@ -1091,18 +1091,15 @@ func (s *EtcdServer) applyRequest(r pb.Request) Response {
|
|||||||
case r.PrevIndex > 0 || r.PrevValue != "":
|
case r.PrevIndex > 0 || r.PrevValue != "":
|
||||||
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
return f(s.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
|
||||||
default:
|
default:
|
||||||
// TODO (yicheng): cluster should be the owner of cluster prefix store
|
|
||||||
// we should not modify cluster store here.
|
|
||||||
if storeMemberAttributeRegexp.MatchString(r.Path) {
|
if storeMemberAttributeRegexp.MatchString(r.Path) {
|
||||||
id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
|
id := membership.MustParseMemberIDFromKey(path.Dir(r.Path))
|
||||||
var attr membership.Attributes
|
var attr membership.Attributes
|
||||||
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
|
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
|
||||||
plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
|
plog.Panicf("unmarshal %s should never fail: %v", r.Val, err)
|
||||||
}
|
}
|
||||||
ok := s.cluster.UpdateAttributes(id, attr)
|
s.cluster.UpdateAttributes(id, attr)
|
||||||
if !ok {
|
// return an empty response since there is no consumer.
|
||||||
return Response{}
|
return Response{}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if r.Path == path.Join(StoreClusterPrefix, "version") {
|
if r.Path == path.Join(StoreClusterPrefix, "version") {
|
||||||
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
|
s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)))
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -466,7 +465,7 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
|
|||||||
req := pb.Request{
|
req := pb.Request{
|
||||||
Method: "PUT",
|
Method: "PUT",
|
||||||
ID: 1,
|
ID: 1,
|
||||||
Path: path.Join(membership.StoreMembersPrefix, strconv.FormatUint(1, 16), membership.AttributesSuffix),
|
Path: membership.MemberAttributesStorePath(1),
|
||||||
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
|
Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`,
|
||||||
}
|
}
|
||||||
srv.applyRequest(req)
|
srv.applyRequest(req)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user