etcdserver: use v3 to publish member attr

This commit is contained in:
Jingyi Hu 2019-11-26 17:44:34 -08:00 committed by yoyinzyc
parent 0c3401fa76
commit 5cd2502ab1
2 changed files with 76 additions and 1 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/coreos/go-semver/semver"
"go.etcd.io/etcd/auth"
"go.etcd.io/etcd/etcdserver/api"
"go.etcd.io/etcd/etcdserver/api/membership"
"go.etcd.io/etcd/etcdserver/api/membership/membershippb"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease"
@ -53,6 +54,7 @@ type applyResult struct {
// applierV3Internal is the interface for processing internal V3 raft request
type applierV3Internal interface {
ClusterVersionSet(r *membershippb.ClusterVersionSetRequest)
ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest)
}
// applierV3 is the interface for processing V3 raft messages
@ -185,6 +187,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult {
ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList)
case r.ClusterVersionSet != nil:
a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet)
case r.ClusterMemberAttrSet != nil:
a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet)
default:
panic("not implemented")
}
@ -852,6 +856,16 @@ func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRe
a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability)
}
func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) {
a.s.cluster.UpdateAttributes(
types.ID(r.Member_ID),
membership.Attributes{
Name: r.MemberAttributes.Name,
ClientURLs: r.MemberAttributes.ClientUrls,
},
)
}
type quotaApplierV3 struct {
applierV3
q Quota

View File

@ -732,7 +732,7 @@ func (s *EtcdServer) adjustTicks() {
func (s *EtcdServer) Start() {
s.start()
s.goAttach(func() { s.adjustTicks() })
s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
s.goAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
s.goAttach(s.purgeFile)
s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
s.goAttach(s.monitorVersions)
@ -1992,6 +1992,67 @@ func (s *EtcdServer) sync(timeout time.Duration) {
})
}
// publishV3 registers server information into the cluster using v3 request. The
// information is the JSON representation of this server's member struct, updated
// with the static clientURLs of the server.
// The function keeps attempting to register until it succeeds,
// or its server is stopped.
func (s *EtcdServer) publishV3(timeout time.Duration) {
req := &membershippb.ClusterMemberAttrSetRequest{
Member_ID: uint64(s.id),
MemberAttributes: &membershippb.Attributes{
Name: s.attributes.Name,
ClientUrls: s.attributes.ClientURLs,
},
}
for {
select {
case <-s.stopping:
if lg := s.getLogger(); lg != nil {
lg.Warn(
"stopped publish because server is stopping",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.Duration("publish-timeout", timeout),
)
}
return
default:
}
ctx, cancel := context.WithTimeout(s.ctx, timeout)
_, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterMemberAttrSet: req})
cancel()
switch err {
case nil:
close(s.readych)
if lg := s.getLogger(); lg != nil {
lg.Info(
"published local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.String("cluster-id", s.cluster.ID().String()),
zap.Duration("publish-timeout", timeout),
)
}
return
default:
if lg := s.getLogger(); lg != nil {
lg.Warn(
"failed to publish local member to cluster through raft",
zap.String("local-member-id", s.ID().String()),
zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
zap.Duration("publish-timeout", timeout),
zap.Error(err),
)
}
}
}
}
// publish registers server information into the cluster. The information
// is the JSON representation of this server's member struct, updated with the
// static clientURLs of the server.