From 651de5a057c00e204aa74407f93e8f2b5e9ebc00 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Sun, 3 Apr 2022 20:08:30 +0200 Subject: [PATCH] Rename EtcdServer.Id with EtcdServer.MemberId. It was misleading and error prone vs. ClusterId. --- server/embed/etcd.go | 2 +- server/etcdserver/adapters.go | 2 +- server/etcdserver/api/v3rpc/header.go | 2 +- server/etcdserver/api/v3rpc/interceptor.go | 4 +- server/etcdserver/api/v3rpc/maintenance.go | 2 +- server/etcdserver/api/v3rpc/member.go | 2 +- server/etcdserver/api/v3rpc/quota.go | 4 +- server/etcdserver/api/v3rpc/watch.go | 2 +- server/etcdserver/apply.go | 2 +- server/etcdserver/corrupt.go | 22 +++--- server/etcdserver/server.go | 78 ++++++++++----------- server/etcdserver/server_test.go | 22 +++--- tests/framework/integration/cluster.go | 22 +++--- tests/integration/cluster_test.go | 18 ++--- tests/integration/network_partition_test.go | 2 +- tests/integration/v3_leadership_test.go | 6 +- tests/integration/v3_watch_restore_test.go | 2 +- 17 files changed, 97 insertions(+), 97 deletions(-) diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 663e082d3..ffd239c79 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -275,7 +275,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { e.cfg.logger.Info( "now serving peer/client/metrics", - zap.String("local-member-id", e.Server.ID().String()), + zap.String("local-member-id", e.Server.MemberId().String()), zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()), zap.Strings("listen-peer-urls", e.cfg.getLPURLs()), zap.Strings("advertise-client-urls", e.cfg.getACURLs()), diff --git a/server/etcdserver/adapters.go b/server/etcdserver/adapters.go index f864507bf..8a95b9488 100644 --- a/server/etcdserver/adapters.go +++ b/server/etcdserver/adapters.go @@ -70,7 +70,7 @@ func (s *serverVersionAdapter) GetDowngradeInfo() *serverversion.DowngradeInfo { } func (s *serverVersionAdapter) GetMembersVersions() map[string]*version.Versions { - return getMembersVersions(s.lg, s.cluster, s.id, s.peerRt, s.Cfg.ReqTimeout()) + return getMembersVersions(s.lg, s.cluster, s.MemberId(), s.peerRt, s.Cfg.ReqTimeout()) } func (s *serverVersionAdapter) GetStorageVersion() *semver.Version { diff --git a/server/etcdserver/api/v3rpc/header.go b/server/etcdserver/api/v3rpc/header.go index 112cc922e..488862292 100644 --- a/server/etcdserver/api/v3rpc/header.go +++ b/server/etcdserver/api/v3rpc/header.go @@ -29,7 +29,7 @@ type header struct { func newHeader(s *etcdserver.EtcdServer) header { return header{ clusterID: int64(s.Cluster().ID()), - memberID: int64(s.ID()), + memberID: int64(s.MemberId()), sg: s, rev: func() int64 { return s.KV().Rev() }, } diff --git a/server/etcdserver/api/v3rpc/interceptor.go b/server/etcdserver/api/v3rpc/interceptor.go index 47f75654e..805781255 100644 --- a/server/etcdserver/api/v3rpc/interceptor.go +++ b/server/etcdserver/api/v3rpc/interceptor.go @@ -49,7 +49,7 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { return nil, rpctypes.ErrGRPCNotCapable } - if s.IsMemberExist(s.ID()) && s.IsLearner() && !isRPCSupportedForLearner(req) { + if s.IsMemberExist(s.MemberId()) && s.IsLearner() && !isRPCSupportedForLearner(req) { return nil, rpctypes.ErrGRPCNotSupportedForLearner } @@ -218,7 +218,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor return rpctypes.ErrGRPCNotCapable } - if s.IsMemberExist(s.ID()) && s.IsLearner() && info.FullMethod != snapshotMethod { // learner does not support stream RPC except Snapshot + if s.IsMemberExist(s.MemberId()) && s.IsLearner() && info.FullMethod != snapshotMethod { // learner does not support stream RPC except Snapshot return rpctypes.ErrGRPCNotSupportedForLearner } diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index 6eea479f0..10f03d19f 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -250,7 +250,7 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) ( } func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) { - if ms.rg.ID() != ms.rg.Leader() { + if ms.rg.MemberId() != ms.rg.Leader() { return nil, rpctypes.ErrGRPCNotLeader } diff --git a/server/etcdserver/api/v3rpc/member.go b/server/etcdserver/api/v3rpc/member.go index 54fcc2484..001eba9d4 100644 --- a/server/etcdserver/api/v3rpc/member.go +++ b/server/etcdserver/api/v3rpc/member.go @@ -106,7 +106,7 @@ func (cs *ClusterServer) MemberPromote(ctx context.Context, r *pb.MemberPromoteR } func (cs *ClusterServer) header() *pb.ResponseHeader { - return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.ID()), RaftTerm: cs.server.Term()} + return &pb.ResponseHeader{ClusterId: uint64(cs.cluster.ID()), MemberId: uint64(cs.server.MemberId()), RaftTerm: cs.server.Term()} } func membersToProtoMembers(membs []*membership.Member) []*pb.Member { diff --git a/server/etcdserver/api/v3rpc/quota.go b/server/etcdserver/api/v3rpc/quota.go index fd41bc133..9af5fdae7 100644 --- a/server/etcdserver/api/v3rpc/quota.go +++ b/server/etcdserver/api/v3rpc/quota.go @@ -53,7 +53,7 @@ func (qa *quotaAlarmer) check(ctx context.Context, r interface{}) error { func NewQuotaKVServer(s *etcdserver.EtcdServer) pb.KVServer { return "aKVServer{ NewKVServer(s), - quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "kv"), s, s.ID()}, + quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "kv"), s, s.MemberId()}, } } @@ -86,6 +86,6 @@ func (s *quotaLeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequ func NewQuotaLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer { return "aLeaseServer{ NewLeaseServer(s), - quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "lease"), s, s.ID()}, + quotaAlarmer{storage.NewBackendQuota(s.Cfg, s.Backend(), "lease"), s, s.MemberId()}, } } diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index b8466354b..4da07274e 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -52,7 +52,7 @@ func NewWatchServer(s *etcdserver.EtcdServer) pb.WatchServer { lg: s.Cfg.Logger, clusterID: int64(s.Cluster().ID()), - memberID: int64(s.ID()), + memberID: int64(s.MemberId()), maxRequestBytes: int(s.Cfg.MaxRequestBytes + grpcOverheadBytes), diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 1436bc338..7057c57ae 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -442,7 +442,7 @@ func removeNeedlessRangeReqs(txn *pb.TxnRequest) { func newHeader(s *EtcdServer) *pb.ResponseHeader { return &pb.ResponseHeader{ ClusterId: uint64(s.Cluster().ID()), - MemberId: uint64(s.ID()), + MemberId: uint64(s.MemberId()), Revision: s.KV().Rev(), RaftTerm: s.Term(), } diff --git a/server/etcdserver/corrupt.go b/server/etcdserver/corrupt.go index 81288d5cb..a1f06796c 100644 --- a/server/etcdserver/corrupt.go +++ b/server/etcdserver/corrupt.go @@ -45,13 +45,13 @@ func (s *EtcdServer) CheckInitialHashKV() error { lg.Info( "starting initial corruption check", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.Duration("timeout", s.Cfg.ReqTimeout()), ) h, rev, crev, err := s.kv.HashByRev(0) if err != nil { - return fmt.Errorf("%s failed to fetch hash (%v)", s.ID(), err) + return fmt.Errorf("%s failed to fetch hash (%v)", s.MemberId(), err) } peers := s.getPeerHashKVs(rev) mismatch := 0 @@ -59,7 +59,7 @@ func (s *EtcdServer) CheckInitialHashKV() error { if p.resp != nil { peerID := types.ID(p.resp.Header.MemberId) fields := []zap.Field{ - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), zap.Uint32("local-member-hash", h), @@ -87,7 +87,7 @@ func (s *EtcdServer) CheckInitialHashKV() error { case rpctypes.ErrFutureRev: lg.Warn( "cannot fetch hash from slow remote peer", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), zap.Uint32("local-member-hash", h), @@ -98,7 +98,7 @@ func (s *EtcdServer) CheckInitialHashKV() error { case rpctypes.ErrCompacted: lg.Warn( "cannot fetch hash from remote peer; local member is behind", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.Int64("local-member-revision", rev), zap.Int64("local-member-compact-revision", crev), zap.Uint32("local-member-hash", h), @@ -110,12 +110,12 @@ func (s *EtcdServer) CheckInitialHashKV() error { } } if mismatch > 0 { - return fmt.Errorf("%s found data inconsistency with peers", s.ID()) + return fmt.Errorf("%s found data inconsistency with peers", s.MemberId()) } lg.Info( "initial corruption checking passed; no corruption", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), ) return nil } @@ -129,7 +129,7 @@ func (s *EtcdServer) monitorKVHash() { lg := s.Logger() lg.Info( "enabled corruption checking", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.Duration("interval", t), ) @@ -195,7 +195,7 @@ func (s *EtcdServer) checkHashKV() error { zap.Int64("compact-revision-2", crev2), zap.Uint32("hash-2", h2), ) - mismatch(uint64(s.ID())) + mismatch(uint64(s.MemberId())) } checkedCount := 0 @@ -262,7 +262,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp { members := s.cluster.Members() peers := make([]peerInfo, 0, len(members)) for _, m := range members { - if m.ID == s.ID() { + if m.ID == s.MemberId() { continue } peers = append(peers, peerInfo{id: m.ID, eps: m.PeerURLs}) @@ -288,7 +288,7 @@ func (s *EtcdServer) getPeerHashKVs(rev int64) []*peerHashKVResp { } lg.Warn( "failed hash kv request", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.Int64("requested-revision", rev), zap.String("remote-peer-endpoint", ep), zap.Error(lastErr), diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 0ac4d178d..0e76eabc1 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -240,7 +240,7 @@ type EtcdServer struct { leaderChanged *notify.Notifier errorc chan error - id types.ID + memberId types.ID attributes membership.Attributes cluster *membership.RaftCluster @@ -323,7 +323,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { v2store: b.storage.st, snapshotter: b.ss, r: *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl), - id: b.cluster.nodeID, + memberId: b.cluster.nodeID, attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, cluster: b.cluster.cl, stats: sstats, @@ -461,7 +461,7 @@ func (s *EtcdServer) adjustTicks() { ticks := s.Cfg.ElectionTicks - 1 lg.Info( "started as single-node; fast-forwarding election ticks", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.Int("forward-ticks", ticks), zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)), zap.Int("election-ticks", s.Cfg.ElectionTicks), @@ -500,7 +500,7 @@ func (s *EtcdServer) adjustTicks() { lg.Info( "initialized peer connections; fast-forwarding election ticks", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.Int("forward-ticks", ticks), zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)), zap.Int("election-ticks", s.Cfg.ElectionTicks), @@ -566,7 +566,7 @@ func (s *EtcdServer) start() { if s.ClusterVersion() != nil { lg.Info( "starting etcd server", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("local-server-version", version.Version), zap.String("cluster-id", s.Cluster().ID().String()), zap.String("cluster-version", version.Cluster(s.ClusterVersion().String())), @@ -575,7 +575,7 @@ func (s *EtcdServer) start() { } else { lg.Info( "starting etcd server", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("local-server-version", version.Version), zap.String("cluster-version", "to_be_decided"), ) @@ -695,7 +695,7 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if s.cluster.IsIDRemoved(types.ID(m.From)) { lg.Warn( "rejected Raft message from removed member", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("removed-member-id", types.ID(m.From).String()), ) return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member") @@ -1057,7 +1057,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Info("adding peers from new cluster configuration") for _, m := range s.cluster.Members() { - if m.ID == s.ID() { + if m.ID == s.MemberId() { continue } s.r.transport.AddPeer(m.ID, m.PeerURLs) @@ -1116,7 +1116,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { lg := s.Logger() lg.Info( "triggering snapshot", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.Uint64("local-member-applied-index", ep.appliedi), zap.Uint64("local-member-snapshot-index", ep.snapi), zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), @@ -1137,7 +1137,7 @@ func (s *EtcdServer) hasMultipleVotingMembers() bool { } func (s *EtcdServer) isLeader() bool { - return uint64(s.ID()) == s.Lead() + return uint64(s.MemberId()) == s.Lead() } // MoveLeader transfers the leader to the given transferee. @@ -1152,7 +1152,7 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er lg := s.Logger() lg.Info( "leadership transfer starting", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("current-leader-member-id", types.ID(lead).String()), zap.String("transferee-member-id", types.ID(transferee).String()), ) @@ -1169,7 +1169,7 @@ func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) er // TODO: drain all requests, or drop all messages to the old leader lg.Info( "leadership transfer finished", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("old-leader-member-id", types.ID(lead).String()), zap.String("new-leader-member-id", types.ID(transferee).String()), zap.Duration("took", time.Since(now)), @@ -1183,7 +1183,7 @@ func (s *EtcdServer) TransferLeadership() error { if !s.isLeader() { lg.Info( "skipped leadership transfer; local server is not leader", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("current-leader-member-id", types.ID(s.Lead()).String()), ) return nil @@ -1192,7 +1192,7 @@ func (s *EtcdServer) TransferLeadership() error { if !s.hasMultipleVotingMembers() { lg.Info( "skipped leadership transfer for single voting member cluster", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("current-leader-member-id", types.ID(s.Lead()).String()), ) return nil @@ -1229,7 +1229,7 @@ func (s *EtcdServer) HardStop() { func (s *EtcdServer) Stop() { lg := s.Logger() if err := s.TransferLeadership(); err != nil { - lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err)) + lg.Warn("leadership transfer failed", zap.String("local-member-id", s.MemberId().String()), zap.Error(err)) } s.HardStop() } @@ -1317,17 +1317,17 @@ func (s *EtcdServer) mayAddMember(memb membership.Member) error { if !memb.IsLearner && !s.cluster.IsReadyToAddVotingMember() { lg.Warn( "rejecting member add request; not enough healthy members", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("requested-member-add", fmt.Sprintf("%+v", memb)), zap.Error(ErrNotEnoughStartedMembers), ) return ErrNotEnoughStartedMembers } - if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.VotingMembers()) { + if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberId(), s.cluster.VotingMembers()) { lg.Warn( "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("requested-member-add", fmt.Sprintf("%+v", memb)), zap.Error(ErrUnhealthy), ) @@ -1446,7 +1446,7 @@ func (s *EtcdServer) mayPromoteMember(id types.ID) error { if !s.cluster.IsReadyToPromoteMember(uint64(id)) { lg.Warn( "rejecting member promote request; not enough healthy members", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("requested-member-remove-id", id.String()), zap.Error(ErrNotEnoughStartedMembers), ) @@ -1505,7 +1505,7 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error { if !s.cluster.IsReadyToRemoveVotingMember(uint64(id)) { lg.Warn( "rejecting member remove request; not enough healthy members", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("requested-member-remove-id", id.String()), zap.Error(ErrNotEnoughStartedMembers), ) @@ -1513,17 +1513,17 @@ func (s *EtcdServer) mayRemoveMember(id types.ID) error { } // downed member is safe to remove since it's not part of the active quorum - if t := s.r.transport.ActiveSince(id); id != s.ID() && t.IsZero() { + if t := s.r.transport.ActiveSince(id); id != s.MemberId() && t.IsZero() { return nil } // protect quorum if some members are down m := s.cluster.VotingMembers() - active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m) + active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.MemberId(), m) if (active - 1) < 1+((len(m)-1)/2) { lg.Warn( "rejecting member remove request; local member has not been connected to all peers, reconfigure breaks active quorum", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("requested-member-remove", id.String()), zap.Int("active-peers", active), zap.Error(ErrUnhealthy), @@ -1597,14 +1597,14 @@ func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} { // RaftStatusGetter represents etcd server and Raft progress. type RaftStatusGetter interface { - ID() types.ID + MemberId() types.ID Leader() types.ID CommittedIndex() uint64 AppliedIndex() uint64 Term() uint64 } -func (s *EtcdServer) ID() types.ID { return s.id } +func (s *EtcdServer) MemberId() types.ID { return s.memberId } func (s *EtcdServer) Leader() types.ID { return types.ID(s.getLead()) } @@ -1643,7 +1643,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*me resp := x.(*confChangeResponse) lg.Info( "applied a configuration change through raft", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("raft-conf-change", cc.Type.String()), zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()), ) @@ -1684,7 +1684,7 @@ func (s *EtcdServer) sync(timeout time.Duration) { // or its server is stopped. func (s *EtcdServer) publishV3(timeout time.Duration) { req := &membershippb.ClusterMemberAttrSetRequest{ - Member_ID: uint64(s.id), + Member_ID: uint64(s.MemberId()), MemberAttributes: &membershippb.Attributes{ Name: s.attributes.Name, ClientUrls: s.attributes.ClientURLs, @@ -1696,7 +1696,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { case <-s.stopping: lg.Warn( "stopped publish because server is stopping", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), zap.Duration("publish-timeout", timeout), ) @@ -1713,7 +1713,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { close(s.readych) lg.Info( "published local member to cluster through raft", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), zap.String("cluster-id", s.cluster.ID().String()), zap.Duration("publish-timeout", timeout), @@ -1723,7 +1723,7 @@ func (s *EtcdServer) publishV3(timeout time.Duration) { default: lg.Warn( "failed to publish local member to cluster through raft", - zap.String("local-member-id", s.ID().String()), + zap.String("local-member-id", s.MemberId().String()), zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)), zap.Duration("publish-timeout", timeout), zap.Error(err), @@ -1737,7 +1737,7 @@ func (s *EtcdServer) sendMergedSnap(merged snap.Message) { lg := s.Logger() fields := []zap.Field{ - zap.String("from", s.ID().String()), + zap.String("from", s.MemberId().String()), zap.String("to", types.ID(merged.To).String()), zap.Int64("bytes", merged.TotalSize), zap.String("size", humanize.Bytes(uint64(merged.TotalSize))), @@ -1917,7 +1917,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { s.GoAttach(func() { a := &pb.AlarmRequest{ - MemberID: uint64(s.ID()), + MemberID: uint64(s.MemberId()), Action: pb.AlarmRequest_ACTIVATE, Alarm: pb.AlarmType_NOSPACE, } @@ -1963,13 +1963,13 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con } else { s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3) - if confChangeContext.Member.ID != s.id { + if confChangeContext.Member.ID != s.MemberId() { s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs) } } // update the isLearner metric when this server id is equal to the id in raft member confChange - if confChangeContext.Member.ID == s.id { + if confChangeContext.Member.ID == s.MemberId() { if cc.Type == raftpb.ConfChangeAddLearnerNode { isLearner.Set(1) } else { @@ -1980,7 +1980,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con case raftpb.ConfChangeRemoveNode: id := types.ID(cc.NodeID) s.cluster.RemoveMember(id, shouldApplyV3) - if id == s.id { + if id == s.MemberId() { return true, nil } s.r.transport.RemovePeer(id) @@ -1998,7 +1998,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con ) } s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3) - if m.ID != s.id { + if m.ID != s.MemberId() { s.r.transport.UpdatePeer(m.ID, m.PeerURLs) } } @@ -2133,7 +2133,7 @@ func (s *EtcdServer) monitorClusterVersions() { return } - if s.Leader() != s.ID() { + if s.Leader() != s.MemberId() { continue } monitor.UpdateClusterVersionIfNeeded() @@ -2268,8 +2268,8 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { switch lead { case types.ID(raft.None): // TODO: return error to specify it happens because the cluster does not have leader now - case s.ID(): - if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) { + case s.MemberId(): + if !isConnectedToQuorumSince(s.r.transport, start, s.MemberId(), s.cluster.Members()) { return ErrTimeoutDueToConnectionLost } default: diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 3294b1365..dcad5c23c 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -610,12 +610,12 @@ func TestApplyConfChangeShouldStop(t *testing.T) { }) lg := zaptest.NewLogger(t) srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: lg, - id: 1, - r: *r, - cluster: cl, - beHooks: serverstorage.NewBackendHooks(lg, nil), + lgMu: new(sync.RWMutex), + lg: lg, + memberId: 1, + r: *r, + cluster: cl, + beHooks: serverstorage.NewBackendHooks(lg, nil), } cc := raftpb.ConfChange{ Type: raftpb.ConfChangeRemoveNode, @@ -658,7 +658,7 @@ func TestApplyConfigChangeUpdatesConsistIndex(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: lg, - id: 1, + memberId: 1, r: *realisticRaftNode(lg), cluster: cl, w: wait.New(), @@ -739,7 +739,7 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: lg, - id: 2, + memberId: 2, r: *r, cluster: cl, w: wait.New(), @@ -1487,7 +1487,7 @@ func TestPublishV3(t *testing.T) { lg: lg, readych: make(chan struct{}), Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000}, - id: 1, + memberId: 1, r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}), attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://a", "http://b"}}, cluster: &membership.RaftCluster{}, @@ -1557,7 +1557,7 @@ func TestPublishV3Retry(t *testing.T) { lg: lg, readych: make(chan struct{}), Cfg: config.ServerConfig{Logger: lg, TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries, MaxRequestBytes: 1000}, - id: 1, + memberId: 1, r: *newRaftNode(raftNodeConfig{lg: lg, Node: n}), w: mockwait.NewNop(), stopping: make(chan struct{}), @@ -1604,7 +1604,7 @@ func TestUpdateVersion(t *testing.T) { srv := &EtcdServer{ lgMu: new(sync.RWMutex), lg: zaptest.NewLogger(t), - id: 1, + memberId: 1, Cfg: config.ServerConfig{Logger: zaptest.NewLogger(t), TickMs: 1, SnapshotCatchUpEntries: DefaultSnapshotCatchUpEntries}, r: *newRaftNode(raftNodeConfig{lg: zaptest.NewLogger(t), Node: n}), attributes: membership.Attributes{Name: "node1", ClientURLs: []string{"http://node1.com"}}, diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index 125e228bc..a59499701 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -352,7 +352,7 @@ func (c *Cluster) RemoveMember(t testutil.TB, cc *clientv3.Client, id uint64) er } newMembers := make([]*Member, 0) for _, m := range c.Members { - if uint64(m.Server.ID()) != id { + if uint64(m.Server.MemberId()) != id { newMembers = append(newMembers, m) } else { m.Client.Close() @@ -363,7 +363,7 @@ func (c *Cluster) RemoveMember(t testutil.TB, cc *clientv3.Client, id uint64) er // TODO: remove connection write timeout by selecting on http response closeNotifier // blocking on https://github.com/golang/go/issues/9524 case <-time.After(time.Second + time.Duration(ElectionTicks)*TickDuration + time.Second + rafthttp.ConnWriteTimeout): - t.Fatalf("failed to remove member %s in time", m.Server.ID()) + t.Fatalf("failed to remove member %s in time", m.Server.MemberId()) } } } @@ -436,7 +436,7 @@ func (c *Cluster) waitMembersForLeader(ctx context.Context, t testutil.TB, membs possibleLead := make(map[uint64]bool) var lead uint64 for _, m := range membs { - possibleLead[uint64(m.Server.ID())] = true + possibleLead[uint64(m.Server.MemberId())] = true } cc, err := c.ClusterClient() if err != nil { @@ -470,7 +470,7 @@ func (c *Cluster) waitMembersForLeader(ctx context.Context, t testutil.TB, membs } for i, m := range membs { - if uint64(m.Server.ID()) == lead { + if uint64(m.Server.MemberId()) == lead { t.Logf("waitMembersForLeader found leader. Member: %v lead: %x", i, lead) return i } @@ -841,7 +841,7 @@ func (m *Member) ElectionTimeout() time.Duration { return time.Duration(m.Server.Cfg.ElectionTicks*int(m.Server.Cfg.TickMs)) * time.Millisecond } -func (m *Member) ID() types.ID { return m.Server.ID() } +func (m *Member) ID() types.ID { return m.Server.MemberId() } // NewClientV3 creates a new grpc client connection to the member func NewClientV3(m *Member) (*clientv3.Client, error) { @@ -1307,18 +1307,18 @@ func (m *Member) Metric(metricName string, expectLabels ...string) (string, erro // InjectPartition drops connections from m to others, vice versa. func (m *Member) InjectPartition(t testutil.TB, others ...*Member) { for _, other := range others { - m.Server.CutPeer(other.Server.ID()) - other.Server.CutPeer(m.Server.ID()) - t.Logf("network partition injected between: %v <-> %v", m.Server.ID(), other.Server.ID()) + m.Server.CutPeer(other.Server.MemberId()) + other.Server.CutPeer(m.Server.MemberId()) + t.Logf("network partition injected between: %v <-> %v", m.Server.MemberId(), other.Server.MemberId()) } } // RecoverPartition recovers connections from m to others, vice versa. func (m *Member) RecoverPartition(t testutil.TB, others ...*Member) { for _, other := range others { - m.Server.MendPeer(other.Server.ID()) - other.Server.MendPeer(m.Server.ID()) - t.Logf("network partition between: %v <-> %v", m.Server.ID(), other.Server.ID()) + m.Server.MendPeer(other.Server.MemberId()) + other.Server.MendPeer(m.Server.MemberId()) + t.Logf("network partition between: %v <-> %v", m.Server.MemberId(), other.Server.MemberId()) } } diff --git a/tests/integration/cluster_test.go b/tests/integration/cluster_test.go index 5a7ff4592..50e52cc78 100644 --- a/tests/integration/cluster_test.go +++ b/tests/integration/cluster_test.go @@ -101,7 +101,7 @@ func testDecreaseClusterSize(t *testing.T, size int) { // TODO: remove the last but one member for i := 0; i < size-1; i++ { - id := c.Members[len(c.Members)-1].Server.ID() + id := c.Members[len(c.Members)-1].Server.MemberId() // may hit second leader election on slow machines if err := c.RemoveMember(t, c.Members[0].Client, uint64(id)); err != nil { if strings.Contains(err.Error(), "no leader") { @@ -179,7 +179,7 @@ func TestAddMemberAfterClusterFullRotation(t *testing.T) { // remove all the previous three members and add in three new members. for i := 0; i < 3; i++ { - if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[1].Server.ID())); err != nil { + if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[1].Server.MemberId())); err != nil { t.Fatal(err) } c.WaitMembersForLeader(t, c.Members) @@ -200,7 +200,7 @@ func TestIssue2681(t *testing.T) { c := integration.NewCluster(t, &integration.ClusterConfig{Size: 5}) defer c.Terminate(t) - if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[4].Server.ID())); err != nil { + if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[4].Server.MemberId())); err != nil { t.Fatal(err) } c.WaitMembersForLeader(t, c.Members) @@ -226,7 +226,7 @@ func testIssue2746(t *testing.T, members int) { clusterMustProgress(t, c.Members) } - if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[members-1].Server.ID())); err != nil { + if err := c.RemoveMember(t, c.Members[0].Client, uint64(c.Members[members-1].Server.MemberId())); err != nil { t.Fatal(err) } c.WaitMembersForLeader(t, c.Members) @@ -251,7 +251,7 @@ func TestIssue2904(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), integration.RequestTimeout) // the proposal is not committed because member 1 is stopped, but the // proposal is appended to leader'Server raft log. - c.Members[0].Client.MemberRemove(ctx, uint64(c.Members[2].Server.ID())) + c.Members[0].Client.MemberRemove(ctx, uint64(c.Members[2].Server.MemberId())) cancel() // restart member, and expect it to send UpdateAttributes request. @@ -381,7 +381,7 @@ func TestRejectUnhealthyRemove(t *testing.T) { leader := c.WaitLeader(t) // reject remove active member since (3,2)-(1,0) => (2,2) lacks quorum - err := c.RemoveMember(t, c.Members[leader].Client, uint64(c.Members[2].Server.ID())) + err := c.RemoveMember(t, c.Members[leader].Client, uint64(c.Members[2].Server.MemberId())) if err == nil { t.Fatalf("should reject quorum breaking remove: %s", err) } @@ -394,7 +394,7 @@ func TestRejectUnhealthyRemove(t *testing.T) { time.Sleep(time.Duration(integration.ElectionTicks * int(integration.TickDuration))) // permit remove dead member since (3,2) - (0,1) => (3,1) has quorum - if err = c.RemoveMember(t, c.Members[2].Client, uint64(c.Members[0].Server.ID())); err != nil { + if err = c.RemoveMember(t, c.Members[2].Client, uint64(c.Members[0].Server.MemberId())); err != nil { t.Fatalf("should accept removing down member: %s", err) } @@ -405,7 +405,7 @@ func TestRejectUnhealthyRemove(t *testing.T) { time.Sleep((3 * etcdserver.HealthInterval) / 2) // accept remove member since (4,1)-(1,0) => (3,1) has quorum - if err = c.RemoveMember(t, c.Members[1].Client, uint64(c.Members[0].Server.ID())); err != nil { + if err = c.RemoveMember(t, c.Members[1].Client, uint64(c.Members[0].Server.MemberId())); err != nil { t.Fatalf("expected to remove member, got error %v", err) } } @@ -429,7 +429,7 @@ func TestRestartRemoved(t *testing.T) { firstMember.KeepDataDirTerminate = true // 3. remove first member, shut down without deleting data - if err := c.RemoveMember(t, c.Members[1].Client, uint64(firstMember.Server.ID())); err != nil { + if err := c.RemoveMember(t, c.Members[1].Client, uint64(firstMember.Server.MemberId())); err != nil { t.Fatalf("expected to remove member, got error %v", err) } c.WaitLeader(t) diff --git a/tests/integration/network_partition_test.go b/tests/integration/network_partition_test.go index b5d73a30a..c3b08f23c 100644 --- a/tests/integration/network_partition_test.go +++ b/tests/integration/network_partition_test.go @@ -96,7 +96,7 @@ func testNetworkPartition5MembersLeaderInMajority(t *testing.T) error { // leader must be hold in majority leadIndex2 := clus.WaitMembersForLeader(t, majorityMembers) - leadID, leadID2 := clus.Members[leadIndex].Server.ID(), majorityMembers[leadIndex2].Server.ID() + leadID, leadID2 := clus.Members[leadIndex].Server.MemberId(), majorityMembers[leadIndex2].Server.MemberId() if leadID != leadID2 { return fmt.Errorf("unexpected leader change from %s, got %s", leadID, leadID2) } diff --git a/tests/integration/v3_leadership_test.go b/tests/integration/v3_leadership_test.go index 84bd97d32..519bb0a8f 100644 --- a/tests/integration/v3_leadership_test.go +++ b/tests/integration/v3_leadership_test.go @@ -37,7 +37,7 @@ func testMoveLeader(t *testing.T, auto bool) { defer clus.Terminate(t) oldLeadIdx := clus.WaitLeader(t) - oldLeadID := uint64(clus.Members[oldLeadIdx].Server.ID()) + oldLeadID := uint64(clus.Members[oldLeadIdx].Server.MemberId()) // ensure followers go through leader transition while leadership transfer idc := make(chan uint64) @@ -55,7 +55,7 @@ func testMoveLeader(t *testing.T, auto bool) { } } - target := uint64(clus.Members[(oldLeadIdx+1)%3].Server.ID()) + target := uint64(clus.Members[(oldLeadIdx+1)%3].Server.MemberId()) if auto { err := clus.Members[oldLeadIdx].Server.TransferLeadership() if err != nil { @@ -107,7 +107,7 @@ func TestMoveLeaderError(t *testing.T) { oldLeadIdx := clus.WaitLeader(t) followerIdx := (oldLeadIdx + 1) % 3 - target := uint64(clus.Members[(oldLeadIdx+2)%3].Server.ID()) + target := uint64(clus.Members[(oldLeadIdx+2)%3].Server.MemberId()) mvc := integration.ToGRPC(clus.Client(followerIdx)).Maintenance _, err := mvc.MoveLeader(context.TODO(), &pb.MoveLeaderRequest{TargetID: target}) diff --git a/tests/integration/v3_watch_restore_test.go b/tests/integration/v3_watch_restore_test.go index 2f3271eb9..39fd91879 100644 --- a/tests/integration/v3_watch_restore_test.go +++ b/tests/integration/v3_watch_restore_test.go @@ -81,7 +81,7 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { clus.Members[0].InjectPartition(t, clus.Members[1:]...) initialLead := clus.WaitMembersForLeader(t, clus.Members[1:]) - t.Logf("elected lead: %v", clus.Members[initialLead].Server.ID()) + t.Logf("elected lead: %v", clus.Members[initialLead].Server.MemberId()) t.Logf("sleeping for 2 seconds") time.Sleep(2 * time.Second) t.Logf("sleeping for 2 seconds DONE")