diff --git a/etcdserver/apply.go b/etcdserver/apply.go index 22e0e8543..006df9bc7 100644 --- a/etcdserver/apply.go +++ b/etcdserver/apply.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/go-semver/semver" "go.etcd.io/etcd/auth" "go.etcd.io/etcd/etcdserver/api" + "go.etcd.io/etcd/etcdserver/api/membership" "go.etcd.io/etcd/etcdserver/api/membership/membershippb" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" "go.etcd.io/etcd/lease" @@ -53,6 +54,7 @@ type applyResult struct { // applierV3Internal is the interface for processing internal V3 raft request type applierV3Internal interface { ClusterVersionSet(r *membershippb.ClusterVersionSetRequest) + ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) } // applierV3 is the interface for processing V3 raft messages @@ -185,6 +187,8 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest) *applyResult { ar.resp, ar.err = a.s.applyV3.RoleList(r.AuthRoleList) case r.ClusterVersionSet != nil: a.s.applyV3Internal.ClusterVersionSet(r.ClusterVersionSet) + case r.ClusterMemberAttrSet != nil: + a.s.applyV3Internal.ClusterMemberAttrSet(r.ClusterMemberAttrSet) default: panic("not implemented") } @@ -852,6 +856,16 @@ func (a *applierV3backend) ClusterVersionSet(r *membershippb.ClusterVersionSetRe a.s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Ver)), api.UpdateCapability) } +func (a *applierV3backend) ClusterMemberAttrSet(r *membershippb.ClusterMemberAttrSetRequest) { + a.s.cluster.UpdateAttributes( + types.ID(r.Member_ID), + membership.Attributes{ + Name: r.MemberAttributes.Name, + ClientURLs: r.MemberAttributes.ClientUrls, + }, + ) +} + type quotaApplierV3 struct { applierV3 q Quota diff --git a/etcdserver/server.go b/etcdserver/server.go index dbcf058ee..b434ea2d5 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -732,7 +732,7 @@ func (s *EtcdServer) adjustTicks() { func (s *EtcdServer) Start() { s.start() s.goAttach(func() { s.adjustTicks() }) - s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) + s.goAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) }) s.goAttach(s.purgeFile) s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) }) s.goAttach(s.monitorVersions) @@ -1992,6 +1992,67 @@ func (s *EtcdServer) sync(timeout time.Duration) { }) } +// publishV3 registers server information into the cluster using v3 request. The +// information is the JSON representation of this server's member struct, updated +// with the static clientURLs of the server. +// The function keeps attempting to register until it succeeds, +// or its server is stopped. +func (s *EtcdServer) publishV3(timeout time.Duration) { + req := &membershippb.ClusterMemberAttrSetRequest{ + Member_ID: uint64(s.id), + MemberAttributes: &membershippb.Attributes{ + Name: s.attributes.Name, + ClientUrls: s.attributes.ClientURLs, + }, + } + + for { + select { + case <-s.stopping: + if lg := s.getLogger(); lg != nil { + lg.Warn( + "stopped publish because server is stopping", + zap.String("local-member-id", s.ID().String()), + zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), + zap.Duration("publish-timeout", timeout), + ) + } + return + + default: + } + + ctx, cancel := context.WithTimeout(s.ctx, timeout) + _, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterMemberAttrSet: req}) + cancel() + switch err { + case nil: + close(s.readych) + if lg := s.getLogger(); lg != nil { + lg.Info( + "published local member to cluster through raft", + zap.String("local-member-id", s.ID().String()), + zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), + zap.String("cluster-id", s.cluster.ID().String()), + zap.Duration("publish-timeout", timeout), + ) + } + return + + default: + if lg := s.getLogger(); lg != nil { + lg.Warn( + "failed to publish local member to cluster through raft", + zap.String("local-member-id", s.ID().String()), + zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), + zap.Duration("publish-timeout", timeout), + zap.Error(err), + ) + } + } + } +} + // publish registers server information into the cluster. The information // is the JSON representation of this server's member struct, updated with the // static clientURLs of the server.