From 355d0ab2a6a160b6592acc063b5bb5ed0e4341ce Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Sat, 23 Mar 2019 20:13:58 -0700 Subject: [PATCH 01/15] *: add learner field in endpoint status Added learner field to endpoint status API. --- etcdctl/ctlv3/command/ep_command.go | 2 +- etcdctl/ctlv3/command/printer.go | 4 +++- etcdctl/ctlv3/command/printer_fields.go | 1 + etcdserver/api/membership/cluster.go | 19 +++++++++++++++++++ etcdserver/api/v3rpc/maintenance.go | 8 +++++++- etcdserver/server.go | 5 +++++ 6 files changed, 36 insertions(+), 3 deletions(-) diff --git a/etcdctl/ctlv3/command/ep_command.go b/etcdctl/ctlv3/command/ep_command.go index d540c4558..99ae1a756 100644 --- a/etcdctl/ctlv3/command/ep_command.go +++ b/etcdctl/ctlv3/command/ep_command.go @@ -60,7 +60,7 @@ func newEpStatusCommand() *cobra.Command { Use: "status", Short: "Prints out the status of endpoints specified in `--endpoints` flag", Long: `When --write-out is set to simple, this command prints out comma-separated status lists for each endpoint. -The items in the lists are endpoint, ID, version, db size, is leader, raft term, raft index. +The items in the lists are endpoint, ID, version, db size, is leader, is learner, raft term, raft index, raft applied index, errors. `, Run: epStatusCommandFunc, } diff --git a/etcdctl/ctlv3/command/printer.go b/etcdctl/ctlv3/command/printer.go index 55586a413..942668d9e 100644 --- a/etcdctl/ctlv3/command/printer.go +++ b/etcdctl/ctlv3/command/printer.go @@ -194,7 +194,8 @@ func makeEndpointHealthTable(healthList []epHealth) (hdr []string, rows [][]stri } func makeEndpointStatusTable(statusList []epStatus) (hdr []string, rows [][]string) { - hdr = []string{"endpoint", "ID", "version", "db size", "is leader", "raft term", "raft index", "raft applied index", "errors"} + hdr = []string{"endpoint", "ID", "version", "db size", "is leader", "is learner", "raft term", + "raft index", "raft applied index", "errors"} for _, status := range statusList { rows = append(rows, []string{ status.Ep, @@ -202,6 +203,7 @@ func makeEndpointStatusTable(statusList []epStatus) (hdr []string, rows [][]stri status.Resp.Version, humanize.Bytes(uint64(status.Resp.DbSize)), fmt.Sprint(status.Resp.Leader == status.Resp.Header.MemberId), + fmt.Sprint(status.Resp.IsLearner), fmt.Sprint(status.Resp.RaftTerm), fmt.Sprint(status.Resp.RaftIndex), fmt.Sprint(status.Resp.RaftAppliedIndex), diff --git a/etcdctl/ctlv3/command/printer_fields.go b/etcdctl/ctlv3/command/printer_fields.go index 220eb491f..38f5c7d93 100644 --- a/etcdctl/ctlv3/command/printer_fields.go +++ b/etcdctl/ctlv3/command/printer_fields.go @@ -158,6 +158,7 @@ func (p *fieldsPrinter) EndpointStatus(eps []epStatus) { fmt.Printf("\"Version\" : %q\n", ep.Resp.Version) fmt.Println(`"DBSize" :`, ep.Resp.DbSize) fmt.Println(`"Leader" :`, ep.Resp.Leader) + fmt.Println(`"IsLearner" :`, ep.Resp.IsLearner) fmt.Println(`"RaftIndex" :`, ep.Resp.RaftIndex) fmt.Println(`"RaftTerm" :`, ep.Resp.RaftTerm) fmt.Println(`"RaftAppliedIndex" :`, ep.Resp.RaftAppliedIndex) diff --git a/etcdserver/api/membership/cluster.go b/etcdserver/api/membership/cluster.go index cc8e171bd..bfe250cb5 100644 --- a/etcdserver/api/membership/cluster.go +++ b/etcdserver/api/membership/cluster.go @@ -693,3 +693,22 @@ func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version) { } } } + +// IsLearner returns if the local member is raft learner +func (c *RaftCluster) IsLearner() bool { + c.Lock() + defer c.Unlock() + localMember, ok := c.members[c.localID] + if !ok { + if c.lg != nil { + c.lg.Panic( + "failed to find local ID in cluster members", + zap.String("cluster-id", c.cid.String()), + zap.String("local-member-id", c.localID.String()), + ) + } else { + plog.Panicf("failed to find local ID %s in cluster %s", c.localID.String(), c.cid.String()) + } + } + return localMember.IsLearner +} diff --git a/etcdserver/api/v3rpc/maintenance.go b/etcdserver/api/v3rpc/maintenance.go index be777629c..bc5453f70 100644 --- a/etcdserver/api/v3rpc/maintenance.go +++ b/etcdserver/api/v3rpc/maintenance.go @@ -55,6 +55,10 @@ type AuthGetter interface { AuthStore() auth.AuthStore } +type ClusterStatusGetter interface { + IsLearner() bool +} + type maintenanceServer struct { lg *zap.Logger rg etcdserver.RaftStatusGetter @@ -63,10 +67,11 @@ type maintenanceServer struct { a Alarmer lt LeaderTransferrer hdr header + cs ClusterStatusGetter } func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer { - srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s)} + srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s} return &authMaintenanceServer{srv, s} } @@ -179,6 +184,7 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) ( RaftTerm: ms.rg.Term(), DbSize: ms.bg.Backend().Size(), DbSizeInUse: ms.bg.Backend().SizeInUse(), + IsLearner: ms.cs.IsLearner(), } if resp.Leader == raft.None { resp.Errors = append(resp.Errors, etcdserver.ErrNoLeader.Error()) diff --git a/etcdserver/server.go b/etcdserver/server.go index ed0941fd2..423a6e96c 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -2440,3 +2440,8 @@ func (s *EtcdServer) Alarms() []*pb.AlarmMember { func (s *EtcdServer) Logger() *zap.Logger { return s.lg } + +// IsLearner returns if the local member is raft learner +func (s *EtcdServer) IsLearner() bool { + return s.cluster.IsLearner() +} From 43ed94f7693cca7826cfa7595450e8104a6d770d Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Mon, 25 Mar 2019 17:05:14 -0700 Subject: [PATCH 02/15] etcdserver: filter rpc request to learner Hardcoded allowed rpc for learner node. Added filtering in grpc interceptor to check if rpc is allowed for learner node. --- etcdserver/api/v3rpc/interceptor.go | 9 +++++++++ etcdserver/api/v3rpc/rpctypes/error.go | 1 + etcdserver/api/v3rpc/util.go | 13 +++++++++++++ 3 files changed, 23 insertions(+) diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 53e6fb191..319d349b6 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -48,6 +48,11 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { return nil, rpctypes.ErrGRPCNotCapable } + // TODO: add test in clientv3/integration to verify behavior + if s.IsLearner() && !isRPCEnabledForLearner(req) { + return nil, rpctypes.ErrGPRCNotSupportedForLearner + } + md, ok := metadata.FromIncomingContext(ctx) if ok { if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { @@ -190,6 +195,10 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor return rpctypes.ErrGRPCNotCapable } + if s.IsLearner() { // learner does not support Watch and LeaseKeepAlive RPC + return rpctypes.ErrGPRCNotSupportedForLearner + } + md, ok := metadata.FromIncomingContext(ss.Context()) if ok { if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 9e45cea5b..3d1ee11b0 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -69,6 +69,7 @@ var ( ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err() ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err() ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err() + ErrGPRCNotSupportedForLearner = status.New(codes.FailedPrecondition, "etcdserver: rpc not supported for learner").Err() errStringToError = map[string]error{ ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey, diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 1784b97bd..37443406e 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -22,6 +22,7 @@ import ( "go.etcd.io/etcd/v3/etcdserver" "go.etcd.io/etcd/v3/etcdserver/api/membership" "go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes" + pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" "go.etcd.io/etcd/v3/lease" "go.etcd.io/etcd/v3/mvcc" @@ -116,3 +117,15 @@ func isClientCtxErr(ctxErr error, err error) bool { } return false } + +// in v3.4, learner is allowed to serve serializable read and endpoint status +func isRPCEnabledForLearner(req interface{}) bool { + switch r := req.(type) { + case *pb.StatusRequest: + return true + case *pb.RangeRequest: + return r.Serializable + default: + return false + } +} From 57a11eb1e11a976978924a2b174a05f868a31a1e Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Tue, 2 Apr 2019 19:10:59 -0700 Subject: [PATCH 03/15] integration: add TestKVForLearner Adding TestKVForLearner. Also adding test utility functions for clientv3 integration tests. --- clientv3/integration/cluster_test.go | 25 +----- clientv3/integration/kv_test.go | 76 +++++++++++++++++ integration/cluster.go | 120 +++++++++++++++++++++++++++ 3 files changed, 200 insertions(+), 21 deletions(-) diff --git a/clientv3/integration/cluster_test.go b/clientv3/integration/cluster_test.go index fbc35e51f..d9a03cbff 100644 --- a/clientv3/integration/cluster_test.go +++ b/clientv3/integration/cluster_test.go @@ -16,7 +16,6 @@ package integration import ( "context" - "fmt" "reflect" "strings" "testing" @@ -204,27 +203,11 @@ func TestMemberAddForLearner(t *testing.T) { t.Errorf("Added a member as learner, got resp.Member.IsLearner = %v", resp.Member.IsLearner) } - numOfLearners, err := getNumberOfLearners(clus) + learners, err := clus.GetLearnerMembers() if err != nil { - t.Fatalf("failed to get the number of learners in cluster: %v", err) + t.Fatalf("failed to get the learner members in cluster: %v", err) } - if numOfLearners != 1 { - t.Errorf("Added 1 learner node to cluster, got %d", numOfLearners) + if len(learners) != 1 { + t.Errorf("Added 1 learner node to cluster, got %d", len(learners)) } } - -// getNumberOfLearners return the number of learner nodes in cluster using MemberList API -func getNumberOfLearners(clus *integration.ClusterV3) (int, error) { - cli := clus.RandClient() - resp, err := cli.MemberList(context.Background()) - if err != nil { - return 0, fmt.Errorf("failed to list member %v", err) - } - numberOfLearners := 0 - for _, m := range resp.Members { - if m.IsLearner { - numberOfLearners++ - } - } - return numberOfLearners, nil -} diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 13a3a626c..432405ddf 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -971,3 +971,79 @@ func TestKVLargeRequests(t *testing.T) { clus.Terminate(t) } } + +// TestKVForLearner ensures learner member only accepts serializable read request. +func TestKVForLearner(t *testing.T) { + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // we have to add and launch learner member after initial cluster was created, because + // bootstrapping a cluster with learner member is not supported. + clus.AddAndLaunchLearnerMember(t) + + learners, err := clus.GetLearnerMembers() + if err != nil { + t.Fatalf("failed to get the learner members in cluster: %v", err) + } + if len(learners) != 1 { + t.Fatalf("added 1 learner to cluster, got %d", len(learners)) + } + + if len(clus.Members) != 4 { + t.Fatalf("expecting 4 members in cluster after adding the learner member, got %d", len(clus.Members)) + } + // note: + // 1. clus.Members[3] is the newly added learner member, which was appended to clus.Members + // 2. we are using member's grpcAddr instead of clientURLs as the endpoint for clientv3.Config, + // because the implementation of integration test has diverged from embed/etcd.go. + learnerEp := clus.Members[3].GRPCAddr() + cfg := clientv3.Config{ + Endpoints: []string{learnerEp}, + DialTimeout: 5 * time.Second, + DialOptions: []grpc.DialOption{grpc.WithBlock()}, + } + // this cli only has endpoint of the learner member + cli, err := clientv3.New(cfg) + if err != nil { + t.Fatalf("failed to create clientv3: %v", err) + } + defer cli.Close() + + tests := []struct { + op clientv3.Op + wErr bool + }{ + { + op: clientv3.OpGet("foo", clientv3.WithSerializable()), + wErr: false, + }, + { + op: clientv3.OpGet("foo"), + wErr: true, + }, + { + op: clientv3.OpPut("foo", "bar"), + wErr: true, + }, + { + op: clientv3.OpDelete("foo"), + wErr: true, + }, + { + op: clientv3.OpTxn([]clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision("foo"), "=", 0)}, nil, nil), + wErr: true, + }, + } + + for idx, test := range tests { + _, err := cli.Do(context.TODO(), test.op) + if err != nil && !test.wErr { + t.Errorf("%d: expect no error, got %v", idx, err) + } + if err == nil && test.wErr { + t.Errorf("%d: expect error, got nil", idx) + } + } +} diff --git a/integration/cluster.go b/integration/cluster.go index 9a5b7b83b..5973903eb 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -559,6 +559,8 @@ type member struct { clientMaxCallSendMsgSize int clientMaxCallRecvMsgSize int useIP bool + + isLearner bool } func (m *member) GRPCAddr() string { return m.grpcAddr } @@ -1272,3 +1274,121 @@ type grpcAPI struct { // Election is the election API for the client's connection. Election epb.ElectionClient } + +// GetLearnerMembers returns the list of learner members in cluster using MemberList API. +func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) { + cli := c.Client(0) + resp, err := cli.MemberList(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to list member %v", err) + } + var learners []*pb.Member + for _, m := range resp.Members { + if m.IsLearner { + learners = append(learners, m) + } + } + return learners, nil +} + +// AddAndLaunchLearnerMember creates a leaner member, adds it to cluster +// via v3 MemberAdd API, and then launches the new member. +func (c *ClusterV3) AddAndLaunchLearnerMember(t testing.TB) { + m := c.mustNewMember(t) + m.isLearner = true + + scheme := schemeFromTLSInfo(c.cfg.PeerTLS) + peerURLs := []string{scheme + "://" + m.PeerListeners[0].Addr().String()} + + cli := c.Client(0) + _, err := cli.MemberAddAsLearner(context.Background(), peerURLs) + if err != nil { + t.Fatalf("failed to add learner member %v", err) + } + + m.InitialPeerURLsMap = types.URLsMap{} + for _, mm := range c.Members { + m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs + } + m.InitialPeerURLsMap[m.Name] = m.PeerURLs + m.NewCluster = false + + if err := m.Launch(); err != nil { + t.Fatal(err) + } + + c.Members = append(c.Members, m) + + c.waitMembersMatch(t) +} + +// getMembers returns a list of members in cluster, in format of etcdserverpb.Member +func (c *ClusterV3) getMembers() []*pb.Member { + var mems []*pb.Member + for _, m := range c.Members { + mem := &pb.Member{ + Name: m.Name, + PeerURLs: m.PeerURLs.StringSlice(), + ClientURLs: m.ClientURLs.StringSlice(), + IsLearner: m.isLearner, + } + mems = append(mems, mem) + } + return mems +} + +// waitMembersMatch waits until v3rpc MemberList returns the 'same' members info as the +// local 'c.Members', which is the local recording of members in the testing cluster. With +// the exception that the local recording c.Members does not have info on Member.ID, which +// is generated when the member is been added to cluster. +// +// Note: +// A successful match means the Member.clientURLs are matched. This means member has already +// finished publishing its server attributes to cluster. Publishing attributes is a cluster-wide +// write request (in v2 server). Therefore, at this point, any raft log entries prior to this +// would have already been applied. +// +// If a new member was added to an existing cluster, at this point, it has finished publishing +// its own server attributes to the cluster. And therefore by the same argument, it has already +// applied the raft log entries (especially those of type raftpb.ConfChangeType). At this point, +// the new member has the correct view of the cluster configuration. +// +// Special note on learner member: +// Learner member is only added to a cluster via v3rpc MemberAdd API (as of v3.4). When starting +// the learner member, its initial view of the cluster created by peerURLs map does not have info +// on whether or not the new member itself is learner. But at this point, a successful match does +// indicate that the new learner member has applied the raftpb.ConfChangeAddLearnerNode entry +// which was used to add the learner itself to the cluster, and therefore it has the correct info +// on learner. +func (c *ClusterV3) waitMembersMatch(t testing.TB) { + wMembers := c.getMembers() + sort.Sort(SortableProtoMemberSliceByPeerURLs(wMembers)) + cli := c.Client(0) + for { + resp, err := cli.MemberList(context.Background()) + if err != nil { + t.Fatalf("failed to list member %v", err) + } + + if len(resp.Members) != len(wMembers) { + continue + } + sort.Sort(SortableProtoMemberSliceByPeerURLs(resp.Members)) + for _, m := range resp.Members { + m.ID = 0 + } + if reflect.DeepEqual(resp.Members, wMembers) { + return + } + + time.Sleep(tickDuration) + } +} + +type SortableProtoMemberSliceByPeerURLs []*pb.Member + +func (p SortableProtoMemberSliceByPeerURLs) Len() int { return len(p) } +func (p SortableProtoMemberSliceByPeerURLs) Less(i, j int) bool { + return p[i].PeerURLs[0] < p[j].PeerURLs[0] +} +func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] } From ba9fd620e89254f60c18b8fd3383fafb65ee183d Mon Sep 17 00:00:00 2001 From: WizardCXY Date: Thu, 21 Mar 2019 18:06:26 +0800 Subject: [PATCH 04/15] etcdserver: support MemberPromote for learner --- etcdserver/api/membership/cluster.go | 30 ++++++++++--- etcdserver/api/membership/errors.go | 9 ++-- etcdserver/api/v2v3/server.go | 8 ++++ etcdserver/api/v3rpc/member.go | 8 ++-- etcdserver/api/v3rpc/rpctypes/error.go | 1 + etcdserver/api/v3rpc/util.go | 1 + etcdserver/server.go | 61 +++++++++++++++++++++++++- etcdserver/server_test.go | 48 ++++++++++++++++++++ 8 files changed, 151 insertions(+), 15 deletions(-) diff --git a/etcdserver/api/membership/cluster.go b/etcdserver/api/membership/cluster.go index bfe250cb5..a8e55aacc 100644 --- a/etcdserver/api/membership/cluster.go +++ b/etcdserver/api/membership/cluster.go @@ -252,6 +252,16 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { } } +// IsPromoteChange checks if m is a promoteChange +func (c *RaftCluster) IsPromoteChange(m *Member) bool { + members, _ := membersFromStore(c.lg, c.v2store) + + if members[m.ID] != nil && members[m.ID].IsLearner && !m.IsLearner { + return true + } + return false +} + // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is still valid. func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { @@ -262,9 +272,6 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } switch cc.Type { case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode: - if members[id] != nil { - return ErrIDExists - } urls := make(map[string]bool) for _, m := range members { for _, u := range m.PeerURLs { @@ -279,12 +286,21 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { plog.Panicf("unmarshal member should never fail: %v", err) } } - for _, u := range m.PeerURLs { - if urls[u] { - return ErrPeerURLexists + + if members[id] != nil && members[id].IsLearner && cc.Type == raftpb.ConfChangeAddNode { + // TODO promote a learner node case check + } else { + // add a member leanrner or a follower case + if members[id] != nil { + return ErrIDExists + } + + for _, u := range m.PeerURLs { + if urls[u] { + return ErrPeerURLexists + } } } - case raftpb.ConfChangeRemoveNode: if members[id] == nil { return ErrIDNotFound diff --git a/etcdserver/api/membership/errors.go b/etcdserver/api/membership/errors.go index a92f9582b..f34e8849a 100644 --- a/etcdserver/api/membership/errors.go +++ b/etcdserver/api/membership/errors.go @@ -21,10 +21,11 @@ import ( ) var ( - ErrIDRemoved = errors.New("membership: ID removed") - ErrIDExists = errors.New("membership: ID exists") - ErrIDNotFound = errors.New("membership: ID not found") - ErrPeerURLexists = errors.New("membership: peerURL exists") + ErrIDRemoved = errors.New("membership: ID removed") + ErrIDExists = errors.New("membership: ID exists") + ErrIDNotFound = errors.New("membership: ID not found") + ErrPeerURLexists = errors.New("membership: peerURL exists") + ErrPromotionFailed = errors.New("membership: promotion failed") ) func isKeyNotFound(err error) bool { diff --git a/etcdserver/api/v2v3/server.go b/etcdserver/api/v2v3/server.go index fa9d66d75..0093f6e0a 100644 --- a/etcdserver/api/v2v3/server.go +++ b/etcdserver/api/v2v3/server.go @@ -79,6 +79,14 @@ func (s *v2v3Server) RemoveMember(ctx context.Context, id uint64) ([]*membership return v3MembersToMembership(resp.Members), nil } +func (s *v2v3Server) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { + resp, err := s.c.MemberPromote(ctx, id) + if err != nil { + return nil, err + } + return v3MembersToMembership(resp.Members), nil +} + func (s *v2v3Server) UpdateMember(ctx context.Context, m membership.Member) ([]*membership.Member, error) { resp, err := s.c.MemberUpdate(ctx, uint64(m.ID), m.PeerURLs) if err != nil { diff --git a/etcdserver/api/v3rpc/member.go b/etcdserver/api/v3rpc/member.go index 16813836d..d749f47db 100644 --- a/etcdserver/api/v3rpc/member.go +++ b/etcdserver/api/v3rpc/member.go @@ -16,7 +16,6 @@ package v3rpc import ( "context" - "errors" "time" "go.etcd.io/etcd/v3/etcdserver" @@ -94,8 +93,11 @@ func (cs *ClusterServer) MemberList(ctx context.Context, r *pb.MemberListRequest } func (cs *ClusterServer) MemberPromote(ctx context.Context, r *pb.MemberPromoteRequest) (*pb.MemberPromoteResponse, error) { - // TODO: implement - return nil, errors.New("not implemented") + membs, err := cs.server.PromoteMember(ctx, r.ID) + if err != nil { + return nil, togRPCError(err) + } + return &pb.MemberPromoteResponse{Header: cs.header(), Members: membersToProtoMembers(membs)}, nil } func (cs *ClusterServer) header() *pb.ResponseHeader { diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 3d1ee11b0..e2431318c 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -40,6 +40,7 @@ var ( ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err() ErrGRPCMemberBadURLs = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err() ErrGRPCMemberNotFound = status.New(codes.NotFound, "etcdserver: member not found").Err() + ErrGRPCMemberPromtotionFailed = status.New(codes.FailedPrecondition, "etcdserver: learner member promotion failed").Err() ErrGRPCRequestTooLarge = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err() ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err() diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 37443406e..6bbed1c45 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -35,6 +35,7 @@ var toGRPCErrorMap = map[error]error{ membership.ErrIDNotFound: rpctypes.ErrGRPCMemberNotFound, membership.ErrIDExists: rpctypes.ErrGRPCMemberExist, membership.ErrPeerURLexists: rpctypes.ErrGRPCPeerURLExist, + membership.ErrPromotionFailed: rpctypes.ErrGRPCMemberPromtotionFailed, etcdserver.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted, mvcc.ErrCompacted: rpctypes.ErrGRPCCompacted, diff --git a/etcdserver/server.go b/etcdserver/server.go index 423a6e96c..3b4e5f886 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -156,6 +156,10 @@ type Server interface { // UpdateMember attempts to update an existing member in the cluster. It will // return ErrIDNotFound if the member ID does not exist. UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error) + // PromoteMember attempts to promote a non-voting node to a voting node. It will + // return ErrIDNotFound if the member ID does not exist. + // return ErrPromotionFailed if the member can't be promoted. + PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) // ClusterVersion is the cluster-wide minimum major.minor version. // Cluster version is set to the min version that an etcd member is @@ -1611,6 +1615,56 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership return s.configure(ctx, cc) } +// PromoteMember promotes a learner node to a voting node. +func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { + if err := s.checkMembershipOperationPermission(ctx); err != nil { + return nil, err + } + + if err := s.mayPromoteMember(types.ID(id)); err != nil { + return nil, err + } + + var memb membership.Member + members := s.cluster.Members() + isExist := false + for _, member := range members { + if uint64(member.ID) == id { + memb = *member + isExist = true + break + } + } + + if !isExist { + return nil, membership.ErrIDNotFound + } + memb.IsLearner = false + + b, err := json.Marshal(memb) + if err != nil { + return nil, err + } + + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: id, + Context: b, + } + + return s.configure(ctx, cc) +} + +func (s *EtcdServer) mayPromoteMember(id types.ID) error { + if !s.Cfg.StrictReconfigCheck { + return nil + } + // TODO add more checks whether the member can be promoted. + // like learner progress check or if cluster is ready to promote a learner + + return nil +} + func (s *EtcdServer) mayRemoveMember(id types.ID) error { if !s.Cfg.StrictReconfigCheck { return nil @@ -2080,7 +2134,12 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con plog.Panicf("nodeID should always be equal to member ID") } } - s.cluster.AddMember(m) + if s.cluster.IsPromoteChange(m) { + s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) + } else { + s.cluster.AddMember(m) + } + if m.ID != s.id { s.r.transport.AddPeer(m.ID, m.PeerURLs) } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a8dbbb333..dbdeb1dbf 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1318,6 +1318,54 @@ func TestRemoveMember(t *testing.T) { } } +// TestPromoteMember tests PromoteMember can propose and perform learner node promotion. +func TestPromoteMember(t *testing.T) { + n := newNodeConfChangeCommitterRecorder() + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{RaftState: raft.StateLeader}, + } + cl := newTestCluster(nil) + st := v2store.New() + cl.SetStore(v2store.New()) + cl.AddMember(&membership.Member{ + ID: 1234, + RaftAttributes: membership.RaftAttributes{ + IsLearner: true, + }, + }) + r := newRaftNode(raftNodeConfig{ + lg: zap.NewExample(), + Node: n, + raftStorage: raft.NewMemoryStorage(), + storage: mockstorage.NewStorageRecorder(""), + transport: newNopTransporter(), + }) + s := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: zap.NewExample(), + r: *r, + v2store: st, + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + } + s.start() + _, err := s.PromoteMember(context.TODO(), 1234) + gaction := n.Action() + s.Stop() + + if err != nil { + t.Fatalf("PromoteMember error: %v", err) + } + wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeAddNode"}, {Name: "ApplyConfChange:ConfChangeAddNode"}} + if !reflect.DeepEqual(gaction, wactions) { + t.Errorf("action = %v, want %v", gaction, wactions) + } + if cl.Member(1234).IsLearner == true { + t.Errorf("member with id 1234 is not promoted") + } +} + // TestUpdateMember tests RemoveMember can propose and perform node update. func TestUpdateMember(t *testing.T) { n := newNodeConfChangeCommitterRecorder() From 7f9479acc17345ac27b1d536b75f7f9a54c78bdf Mon Sep 17 00:00:00 2001 From: WizardCXY Date: Fri, 22 Mar 2019 17:53:00 +0800 Subject: [PATCH 05/15] clientv3: add member promote --- clientv3/cluster.go | 9 ++-- etcdserver/api/membership/cluster.go | 64 ++++++++++++++++------- etcdserver/api/membership/cluster_test.go | 37 ++++++++++++- etcdserver/api/membership/errors.go | 11 ++-- etcdserver/api/v2http/client_test.go | 8 +++ etcdserver/api/v2http/http_test.go | 3 ++ etcdserver/api/v3rpc/rpctypes/error.go | 3 +- etcdserver/api/v3rpc/util.go | 3 +- etcdserver/server.go | 48 ++++++++--------- etcdserver/server_test.go | 40 ++++++++++---- 10 files changed, 160 insertions(+), 66 deletions(-) diff --git a/clientv3/cluster.go b/clientv3/cluster.go index 43ceb79c3..855465b7e 100644 --- a/clientv3/cluster.go +++ b/clientv3/cluster.go @@ -16,7 +16,6 @@ package clientv3 import ( "context" - "errors" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" "go.etcd.io/etcd/v3/pkg/types" @@ -133,6 +132,10 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { } func (c *cluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) { - // TODO: implement - return nil, errors.New("not implemented") + r := &pb.MemberPromoteRequest{ID: id} + resp, err := c.remote.MemberPromote(ctx, r, c.callOpts...) + if err != nil { + return nil, toErr(ctx, err) + } + return (*MemberPromoteResponse)(resp), nil } diff --git a/etcdserver/api/membership/cluster.go b/etcdserver/api/membership/cluster.go index a8e55aacc..2bbebf017 100644 --- a/etcdserver/api/membership/cluster.go +++ b/etcdserver/api/membership/cluster.go @@ -59,6 +59,12 @@ type RaftCluster struct { removed map[types.ID]bool } +// ConfigChangeContext represents a context for confChange. +type ConfigChangeContext struct { + Member + IsPromote bool `json:"isPromote"` +} + // NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating // cluster with raft learner member. func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) { @@ -252,16 +258,6 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { } } -// IsPromoteChange checks if m is a promoteChange -func (c *RaftCluster) IsPromoteChange(m *Member) bool { - members, _ := membersFromStore(c.lg, c.v2store) - - if members[m.ID] != nil && members[m.ID].IsLearner && !m.IsLearner { - return true - } - return false -} - // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is still valid. func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { @@ -278,24 +274,30 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { urls[u] = true } } - m := new(Member) - if err := json.Unmarshal(cc.Context, m); err != nil { + + confChangeContext := new(ConfigChangeContext) + if err := json.Unmarshal(cc.Context, confChangeContext); err != nil { if c.lg != nil { - c.lg.Panic("failed to unmarshal member", zap.Error(err)) + c.lg.Panic("failed to unmarshal confChangeContext", zap.Error(err)) } else { - plog.Panicf("unmarshal member should never fail: %v", err) + plog.Panicf("unmarshal confChangeContext should never fail: %v", err) } } - - if members[id] != nil && members[id].IsLearner && cc.Type == raftpb.ConfChangeAddNode { - // TODO promote a learner node case check + // A ConfChangeAddNode to a existing learner node promotes it to a voting member. + if confChangeContext.IsPromote { + if members[id] == nil { + return ErrIDNotFound + } + if !members[id].IsLearner { + return ErrMemberNotLearner + } } else { - // add a member leanrner or a follower case + // add a learner or a follower case if members[id] != nil { return ErrIDExists } - for _, u := range m.PeerURLs { + for _, u := range confChangeContext.PeerURLs { if urls[u] { return ErrPeerURLexists } @@ -450,6 +452,30 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { } } +// PromoteMember marks the member's IsLearner RaftAttributes to false. +func (c *RaftCluster) PromoteMember(id types.ID) { + c.Lock() + defer c.Unlock() + + c.members[id].RaftAttributes.IsLearner = false + if c.v2store != nil { + mustUpdateMemberInStore(c.v2store, c.members[id]) + } + if c.be != nil { + mustSaveMemberToBackend(c.be, c.members[id]) + } + + if c.lg != nil { + c.lg.Info( + "promote member", + zap.String("cluster-id", c.cid.String()), + zap.String("local-member-id", c.localID.String()), + ) + } else { + plog.Noticef("promote member %s in cluster %s", id, c.cid) + } +} + func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { c.Lock() defer c.Unlock() diff --git a/etcdserver/api/membership/cluster_test.go b/etcdserver/api/membership/cluster_test.go index e6a23e35c..d501d40e5 100644 --- a/etcdserver/api/membership/cluster_test.go +++ b/etcdserver/api/membership/cluster_test.go @@ -290,6 +290,12 @@ func TestClusterValidateConfigurationChange(t *testing.T) { t.Fatal(err) } + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} + ctx1, err := json.Marshal(&Member{ID: types.ID(1), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}} ctx5, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr}) if err != nil { @@ -308,6 +314,16 @@ func TestClusterValidateConfigurationChange(t *testing.T) { t.Fatal(err) } + ctx3, err := json.Marshal(&ConfigChangeContext{Member: Member{ID: types.ID(3), RaftAttributes: attr}, IsPromote: true}) + if err != nil { + t.Fatal(err) + } + + ctx6, err := json.Marshal(&ConfigChangeContext{Member: Member{ID: types.ID(6), RaftAttributes: attr}, IsPromote: true}) + if err != nil { + t.Fatal(err) + } + tests := []struct { cc raftpb.ConfChange werr error @@ -335,8 +351,9 @@ func TestClusterValidateConfigurationChange(t *testing.T) { }, { raftpb.ConfChange{ - Type: raftpb.ConfChangeAddNode, - NodeID: 1, + Type: raftpb.ConfChangeAddNode, + NodeID: 1, + Context: ctx1, }, ErrIDExists, }, @@ -388,6 +405,22 @@ func TestClusterValidateConfigurationChange(t *testing.T) { }, nil, }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: 3, + Context: ctx3, + }, + ErrMemberNotLearner, + }, + { + raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: 6, + Context: ctx6, + }, + ErrIDNotFound, + }, } for i, tt := range tests { err := cl.ValidateConfigurationChange(tt.cc) diff --git a/etcdserver/api/membership/errors.go b/etcdserver/api/membership/errors.go index f34e8849a..dc905aef4 100644 --- a/etcdserver/api/membership/errors.go +++ b/etcdserver/api/membership/errors.go @@ -21,11 +21,12 @@ import ( ) var ( - ErrIDRemoved = errors.New("membership: ID removed") - ErrIDExists = errors.New("membership: ID exists") - ErrIDNotFound = errors.New("membership: ID not found") - ErrPeerURLexists = errors.New("membership: peerURL exists") - ErrPromotionFailed = errors.New("membership: promotion failed") + ErrIDRemoved = errors.New("membership: ID removed") + ErrIDExists = errors.New("membership: ID exists") + ErrIDNotFound = errors.New("membership: ID not found") + ErrPeerURLexists = errors.New("membership: peerURL exists") + ErrMemberNotLearner = errors.New("membership: can only promote a learner member") + ErrLearnerNotReady = errors.New("membership: can only promote a learner member which catches up with leader") ) func isKeyNotFound(err error) bool { diff --git a/etcdserver/api/v2http/client_test.go b/etcdserver/api/v2http/client_test.go index 08e13de29..89407097c 100644 --- a/etcdserver/api/v2http/client_test.go +++ b/etcdserver/api/v2http/client_test.go @@ -132,6 +132,11 @@ func (s *serverRecorder) UpdateMember(_ context.Context, m membership.Member) ([ return nil, nil } +func (s *serverRecorder) PromoteMember(_ context.Context, id uint64) ([]*membership.Member, error) { + s.actions = append(s.actions, action{name: "PromoteMember", params: []interface{}{id}}) + return nil, nil +} + type action struct { name string params []interface{} @@ -168,6 +173,9 @@ func (rs *resServer) RemoveMember(_ context.Context, _ uint64) ([]*membership.Me func (rs *resServer) UpdateMember(_ context.Context, _ membership.Member) ([]*membership.Member, error) { return nil, nil } +func (rs *resServer) PromoteMember(_ context.Context, _ uint64) ([]*membership.Member, error) { + return nil, nil +} func boolp(b bool) *bool { return &b } diff --git a/etcdserver/api/v2http/http_test.go b/etcdserver/api/v2http/http_test.go index e23918b2f..a5cdbcad4 100644 --- a/etcdserver/api/v2http/http_test.go +++ b/etcdserver/api/v2http/http_test.go @@ -74,6 +74,9 @@ func (fs *errServer) RemoveMember(ctx context.Context, id uint64) ([]*membership func (fs *errServer) UpdateMember(ctx context.Context, m membership.Member) ([]*membership.Member, error) { return nil, fs.err } +func (fs *errServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) { + return nil, fs.err +} func TestWriteError(t *testing.T) { // nil error should not panic diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index e2431318c..d7d51e98b 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -40,7 +40,8 @@ var ( ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err() ErrGRPCMemberBadURLs = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err() ErrGRPCMemberNotFound = status.New(codes.NotFound, "etcdserver: member not found").Err() - ErrGRPCMemberPromtotionFailed = status.New(codes.FailedPrecondition, "etcdserver: learner member promotion failed").Err() + ErrGRPCMemberNotLearner = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member which catches up with peers").Err() + ErrGRPCLearnerNotReady = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member").Err() ErrGRPCRequestTooLarge = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err() ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err() diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 6bbed1c45..1c8a20c3b 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -35,7 +35,8 @@ var toGRPCErrorMap = map[error]error{ membership.ErrIDNotFound: rpctypes.ErrGRPCMemberNotFound, membership.ErrIDExists: rpctypes.ErrGRPCMemberExist, membership.ErrPeerURLexists: rpctypes.ErrGRPCPeerURLExist, - membership.ErrPromotionFailed: rpctypes.ErrGRPCMemberPromtotionFailed, + membership.ErrMemberNotLearner: rpctypes.ErrGRPCMemberNotLearner, + membership.ErrLearnerNotReady: rpctypes.ErrGRPCLearnerNotReady, etcdserver.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted, mvcc.ErrCompacted: rpctypes.ErrGRPCCompacted, diff --git a/etcdserver/server.go b/etcdserver/server.go index 3b4e5f886..4a2b62f77 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -158,7 +158,8 @@ type Server interface { UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error) // PromoteMember attempts to promote a non-voting node to a voting node. It will // return ErrIDNotFound if the member ID does not exist. - // return ErrPromotionFailed if the member can't be promoted. + // return ErrLearnerNotReady if the member are not ready. + // return ErrMemberNotLearner if the member is not a learner. PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) // ClusterVersion is the cluster-wide minimum major.minor version. @@ -1621,27 +1622,20 @@ func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membershi return nil, err } + // check if we can promote this learner if err := s.mayPromoteMember(types.ID(id)); err != nil { return nil, err } - var memb membership.Member - members := s.cluster.Members() - isExist := false - for _, member := range members { - if uint64(member.ID) == id { - memb = *member - isExist = true - break - } + // build the context for the promote confChange. mark IsLearner to false and IsPromote to true. + promoteChangeContext := membership.ConfigChangeContext{ + Member: membership.Member{ + ID: types.ID(id), + }, + IsPromote: true, } - if !isExist { - return nil, membership.ErrIDNotFound - } - memb.IsLearner = false - - b, err := json.Marshal(memb) + b, err := json.Marshal(promoteChangeContext) if err != nil { return nil, err } @@ -1661,6 +1655,8 @@ func (s *EtcdServer) mayPromoteMember(id types.ID) error { } // TODO add more checks whether the member can be promoted. // like learner progress check or if cluster is ready to promote a learner + // this is an example to get progress + fmt.Printf("raftStatus, %#v\n", raftStatus()) return nil } @@ -2115,33 +2111,33 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con *confState = *s.r.ApplyConfChange(cc) switch cc.Type { case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode: - m := new(membership.Member) - if err := json.Unmarshal(cc.Context, m); err != nil { + confChangeContext := new(membership.ConfigChangeContext) + if err := json.Unmarshal(cc.Context, confChangeContext); err != nil { if lg != nil { lg.Panic("failed to unmarshal member", zap.Error(err)) } else { plog.Panicf("unmarshal member should never fail: %v", err) } } - if cc.NodeID != uint64(m.ID) { + if cc.NodeID != uint64(confChangeContext.Member.ID) { if lg != nil { lg.Panic( "got different member ID", zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()), - zap.String("member-id-from-message", m.ID.String()), + zap.String("member-id-from-message", confChangeContext.Member.ID.String()), ) } else { plog.Panicf("nodeID should always be equal to member ID") } } - if s.cluster.IsPromoteChange(m) { - s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes) + if confChangeContext.IsPromote { + s.cluster.PromoteMember(confChangeContext.Member.ID) } else { - s.cluster.AddMember(m) - } + s.cluster.AddMember(&confChangeContext.Member) - if m.ID != s.id { - s.r.transport.AddPeer(m.ID, m.PeerURLs) + if confChangeContext.Member.ID != s.id { + s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs) + } } case raftpb.ConfChangeRemoveNode: diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index dbdeb1dbf..a2f51ab8f 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -508,35 +508,57 @@ func TestApplyConfChangeError(t *testing.T) { } cl.RemoveMember(4) + attr := membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}} + ctx, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 4)}} + ctx4, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + + attr = membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}} + ctx5, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr}) + if err != nil { + t.Fatal(err) + } + tests := []struct { cc raftpb.ConfChange werr error }{ { raftpb.ConfChange{ - Type: raftpb.ConfChangeAddNode, - NodeID: 4, + Type: raftpb.ConfChangeAddNode, + NodeID: 4, + Context: ctx4, }, membership.ErrIDRemoved, }, { raftpb.ConfChange{ - Type: raftpb.ConfChangeUpdateNode, - NodeID: 4, + Type: raftpb.ConfChangeUpdateNode, + NodeID: 4, + Context: ctx4, }, membership.ErrIDRemoved, }, { raftpb.ConfChange{ - Type: raftpb.ConfChangeAddNode, - NodeID: 1, + Type: raftpb.ConfChangeAddNode, + NodeID: 1, + Context: ctx, }, membership.ErrIDExists, }, { raftpb.ConfChange{ - Type: raftpb.ConfChangeRemoveNode, - NodeID: 5, + Type: raftpb.ConfChangeRemoveNode, + NodeID: 5, + Context: ctx5, }, membership.ErrIDNotFound, }, @@ -553,7 +575,7 @@ func TestApplyConfChangeError(t *testing.T) { if err != tt.werr { t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr) } - cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None} + cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None, Context: tt.cc.Context} w := []testutil.Action{ { Name: "ApplyConfChange", From 44d935e90aa208f1a5d054eadb2ab9c738521899 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Thu, 11 Apr 2019 19:11:06 -0700 Subject: [PATCH 06/15] etcdserver: exclude learner from leader transfer 1. Maintenance API MoveLeader() returns ErrBadLeaderTransferee if transferee does not exist or is raft learner. 2. etcdserver TransferLeadership() only choose voting member as transferee. --- etcdserver/api/membership/cluster.go | 22 ++++++++++++++++++++++ etcdserver/api/v3rpc/rpctypes/error.go | 3 +++ etcdserver/api/v3rpc/util.go | 1 + etcdserver/errors.go | 1 + etcdserver/server.go | 16 ++++++++-------- 5 files changed, 35 insertions(+), 8 deletions(-) diff --git a/etcdserver/api/membership/cluster.go b/etcdserver/api/membership/cluster.go index 2bbebf017..946052d4f 100644 --- a/etcdserver/api/membership/cluster.go +++ b/etcdserver/api/membership/cluster.go @@ -754,3 +754,25 @@ func (c *RaftCluster) IsLearner() bool { } return localMember.IsLearner } + +// IsMemberExist returns if the member with the given id exists in cluster. +func (c *RaftCluster) IsMemberExist(id types.ID) bool { + c.Lock() + defer c.Unlock() + _, ok := c.members[id] + return ok +} + +// VotingMemberIDs returns the ID of voting members in cluster. +func (c *RaftCluster) VotingMemberIDs() []types.ID { + c.Lock() + defer c.Unlock() + var ids []types.ID + for _, m := range c.members { + if !m.IsLearner { + ids = append(ids, m.ID) + } + } + sort.Sort(types.IDSlice(ids)) + return ids +} diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index d7d51e98b..84683ed3c 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -72,6 +72,7 @@ var ( ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err() ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err() ErrGPRCNotSupportedForLearner = status.New(codes.FailedPrecondition, "etcdserver: rpc not supported for learner").Err() + ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err() errStringToError = map[string]error{ ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey, @@ -123,6 +124,7 @@ var ( ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost, ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy, ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt, + ErrorDesc(ErrGRPCBadLeaderTransferee): ErrGRPCBadLeaderTransferee, } ) @@ -176,6 +178,7 @@ var ( ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost) ErrUnhealthy = Error(ErrGRPCUnhealthy) ErrCorrupt = Error(ErrGRPCCorrupt) + ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee) ) // EtcdError defines gRPC server errors. diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 1c8a20c3b..be93c934b 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -55,6 +55,7 @@ var toGRPCErrorMap = map[error]error{ etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy, etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound, etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt, + etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee, lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound, lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist, diff --git a/etcdserver/errors.go b/etcdserver/errors.go index 8cec52a17..90f714b3c 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -37,6 +37,7 @@ var ( ErrUnhealthy = errors.New("etcdserver: unhealthy cluster") ErrKeyNotFound = errors.New("etcdserver: key not found") ErrCorrupt = errors.New("etcdserver: corrupt cluster") + ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee") ) type DiscoveryError struct { diff --git a/etcdserver/server.go b/etcdserver/server.go index 4a2b62f77..b2bdf8516 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1377,16 +1377,16 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { ep.snapi = ep.appliedi } -func (s *EtcdServer) isMultiNode() bool { - return s.cluster != nil && len(s.cluster.MemberIDs()) > 1 -} - func (s *EtcdServer) isLeader() bool { return uint64(s.ID()) == s.Lead() } // MoveLeader transfers the leader to the given transferee. func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error { + if !s.cluster.IsMemberExist(types.ID(transferee)) || s.cluster.Member(types.ID(transferee)).IsLearner { + return ErrBadLeaderTransferee + } + now := time.Now() interval := time.Duration(s.Cfg.TickMs) * time.Millisecond @@ -1440,20 +1440,20 @@ func (s *EtcdServer) TransferLeadership() error { return nil } - if !s.isMultiNode() { + if s.cluster == nil || len(s.cluster.VotingMemberIDs()) <= 1 { if lg := s.getLogger(); lg != nil { lg.Info( - "skipped leadership transfer; it's a single-node cluster", + "skipped leadership transfer for single voting member cluster", zap.String("local-member-id", s.ID().String()), zap.String("current-leader-member-id", types.ID(s.Lead()).String()), ) } else { - plog.Printf("skipped leadership transfer for single member cluster") + plog.Printf("skipped leadership transfer for single voting member cluster") } return nil } - transferee, ok := longestConnected(s.r.transport, s.cluster.MemberIDs()) + transferee, ok := longestConnected(s.r.transport, s.cluster.VotingMemberIDs()) if !ok { return ErrUnhealthy } From e8dc4c5c25a5795c5b07c702952f42e752c48303 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Fri, 12 Apr 2019 17:48:08 -0700 Subject: [PATCH 07/15] integration: add TestMoveLeaderToLearnerError Adding integration test TestMoveLeaderToLearnerError, which ensures that leader transfer to learner member will fail. --- integration/v3_leadership_test.go | 33 +++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/integration/v3_leadership_test.go b/integration/v3_leadership_test.go index f25eb1519..0cb83539c 100644 --- a/integration/v3_leadership_test.go +++ b/integration/v3_leadership_test.go @@ -16,6 +16,7 @@ package integration import ( "context" + "strings" "testing" "time" @@ -106,3 +107,35 @@ func TestMoveLeaderError(t *testing.T) { t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCNotLeader) } } + +// TestMoveLeaderToLearnerError ensures that leader transfer to learner member will fail. +func TestMoveLeaderToLearnerError(t *testing.T) { + defer testutil.AfterTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + // we have to add and launch learner member after initial cluster was created, because + // bootstrapping a cluster with learner member is not supported. + clus.AddAndLaunchLearnerMember(t) + + learners, err := clus.GetLearnerMembers() + if err != nil { + t.Fatalf("failed to get the learner members in cluster: %v", err) + } + if len(learners) != 1 { + t.Fatalf("added 1 learner to cluster, got %d", len(learners)) + } + + learnerID := learners[0].ID + leaderIdx := clus.WaitLeader(t) + cli := clus.Client(leaderIdx) + _, err = cli.MoveLeader(context.Background(), uint64(learnerID)) + if err == nil { + t.Fatalf("expecting leader transfer to learner to fail, got no error") + } + expectedErrKeywords := "bad leader transferee" + if !strings.Contains(err.Error(), expectedErrKeywords) { + t.Errorf("expecting error to contain %s, got %s", expectedErrKeywords, err.Error()) + } +} From bd7f42855b9157d97380a0eeddb8c71d22c341ef Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Mon, 15 Apr 2019 15:39:35 -0700 Subject: [PATCH 08/15] integration: add TestTransferLeadershipWithLearner Adding integration test TestTransferLeadershipWithLearner, which ensures that TransferLeadership does not timeout due to learner is automatically picked by leader as transferee. --- integration/v3_leadership_test.go | 35 +++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/integration/v3_leadership_test.go b/integration/v3_leadership_test.go index 0cb83539c..4053d419e 100644 --- a/integration/v3_leadership_test.go +++ b/integration/v3_leadership_test.go @@ -139,3 +139,38 @@ func TestMoveLeaderToLearnerError(t *testing.T) { t.Errorf("expecting error to contain %s, got %s", expectedErrKeywords, err.Error()) } } + +// TestTransferLeadershipWithLearner ensures TransferLeadership does not timeout due to learner is +// automatically picked by leader as transferee. +func TestTransferLeadershipWithLearner(t *testing.T) { + defer testutil.AfterTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + clus.AddAndLaunchLearnerMember(t) + + learners, err := clus.GetLearnerMembers() + if err != nil { + t.Fatalf("failed to get the learner members in cluster: %v", err) + } + if len(learners) != 1 { + t.Fatalf("added 1 learner to cluster, got %d", len(learners)) + } + + leaderIdx := clus.WaitLeader(t) + errCh := make(chan error, 1) + go func() { + // note that this cluster has 1 leader and 1 learner. TransferLeadership should return nil. + // Leadership transfer is skipped in cluster with 1 voting member. + errCh <- clus.Members[leaderIdx].s.TransferLeadership() + }() + select { + case err := <-errCh: + if err != nil { + t.Errorf("got error during leadership transfer: %v", err) + } + case <-time.After(5 * time.Second): + t.Error("timed out waiting for leader transition") + } +} From a039f2efb8ee6349c5d0aec4ed616f26e738605b Mon Sep 17 00:00:00 2001 From: WizardCXY Date: Tue, 2 Apr 2019 12:31:51 +0800 Subject: [PATCH 09/15] clientv3, etcdctl: MemberPromote for learner --- clientv3/integration/cluster_test.go | 35 +++++++++++++++++++++++++ etcdctl/ctlv3/command/member_command.go | 35 +++++++++++++++++++++++++ etcdctl/ctlv3/command/printer.go | 1 + etcdctl/ctlv3/command/printer_simple.go | 4 +++ etcdserver/api/v3rpc/rpctypes/error.go | 8 ++++-- 5 files changed, 81 insertions(+), 2 deletions(-) diff --git a/clientv3/integration/cluster_test.go b/clientv3/integration/cluster_test.go index d9a03cbff..a1fb9d0a0 100644 --- a/clientv3/integration/cluster_test.go +++ b/clientv3/integration/cluster_test.go @@ -202,6 +202,27 @@ func TestMemberAddForLearner(t *testing.T) { if !resp.Member.IsLearner { t.Errorf("Added a member as learner, got resp.Member.IsLearner = %v", resp.Member.IsLearner) } +} + +func TestMemberPromoteForLearner(t *testing.T) { + // TODO test not ready learner promotion. + defer testutil.AfterTest(t) + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + // TODO change the random client to client that talk to leader directly. + capi := clus.RandClient() + + urls := []string{"http://127.0.0.1:1234"} + isLearner := true + resp, err := capi.MemberAddAsLearner(context.Background(), urls) + if err != nil { + t.Fatalf("failed to add member %v", err) + } + + if !resp.Member.IsLearner { + t.Errorf("Added a member as learner, got resp.Member.IsLearner = %v", resp.Member.IsLearner) + } learners, err := clus.GetLearnerMembers() if err != nil { @@ -210,4 +231,18 @@ func TestMemberAddForLearner(t *testing.T) { if len(learners) != 1 { t.Errorf("Added 1 learner node to cluster, got %d", len(learners)) } + _, err = capi.MemberPromote(context.Background(), resp.Member.ID) + + if err != nil { + t.Fatalf("failed to promote member error: %v", err) + } + + learners, err = clus.GetLearnerMembers() + if err != nil { + t.Fatalf("failed to get the number of learners in cluster: %v", err) + } + if len(learners) != 0 { + t.Errorf("learner promoted, expect 0 learner, got %d", len(learners)) + } + } diff --git a/etcdctl/ctlv3/command/member_command.go b/etcdctl/ctlv3/command/member_command.go index 5c4ca2a72..8a2c05278 100644 --- a/etcdctl/ctlv3/command/member_command.go +++ b/etcdctl/ctlv3/command/member_command.go @@ -40,6 +40,7 @@ func NewMemberCommand() *cobra.Command { mc.AddCommand(NewMemberRemoveCommand()) mc.AddCommand(NewMemberUpdateCommand()) mc.AddCommand(NewMemberListCommand()) + mc.AddCommand(NewMemberPromoteCommand()) return mc } @@ -100,6 +101,20 @@ The items in the lists are ID, Status, Name, Peer Addrs, Client Addrs, Is Learne return cc } +// NewMemberPromoteCommand returns the cobra command for "member promote". +func NewMemberPromoteCommand() *cobra.Command { + cc := &cobra.Command{ + Use: "promote ", + Short: "Promotes a non-voting member in the cluster", + Long: `Promotes a non-voting learner member to a voting one in the cluster. +`, + + Run: memberPromoteCommandFunc, + } + + return cc +} + // memberAddCommandFunc executes the "member add" command. func memberAddCommandFunc(cmd *cobra.Command, args []string) { if len(args) < 1 { @@ -238,3 +253,23 @@ func memberListCommandFunc(cmd *cobra.Command, args []string) { display.MemberList(*resp) } + +// memberPromoteCommandFunc executes the "member promote" command. +func memberPromoteCommandFunc(cmd *cobra.Command, args []string) { + if len(args) != 1 { + ExitWithError(ExitBadArgs, fmt.Errorf("member ID is not provided")) + } + + id, err := strconv.ParseUint(args[0], 16, 64) + if err != nil { + ExitWithError(ExitBadArgs, fmt.Errorf("bad member ID arg (%v), expecting ID in Hex", err)) + } + + ctx, cancel := commandCtx(cmd) + resp, err := mustClientFromCmd(cmd).MemberPromote(ctx, id) + cancel() + if err != nil { + ExitWithError(ExitError, err) + } + display.MemberPromote(id, *resp) +} diff --git a/etcdctl/ctlv3/command/printer.go b/etcdctl/ctlv3/command/printer.go index 942668d9e..5e844f150 100644 --- a/etcdctl/ctlv3/command/printer.go +++ b/etcdctl/ctlv3/command/printer.go @@ -42,6 +42,7 @@ type printer interface { MemberAdd(v3.MemberAddResponse) MemberRemove(id uint64, r v3.MemberRemoveResponse) MemberUpdate(id uint64, r v3.MemberUpdateResponse) + MemberPromote(id uint64, r v3.MemberPromoteResponse) MemberList(v3.MemberListResponse) EndpointHealth([]epHealth) diff --git a/etcdctl/ctlv3/command/printer_simple.go b/etcdctl/ctlv3/command/printer_simple.go index 3881ed4b2..8223e87da 100644 --- a/etcdctl/ctlv3/command/printer_simple.go +++ b/etcdctl/ctlv3/command/printer_simple.go @@ -136,6 +136,10 @@ func (s *simplePrinter) MemberUpdate(id uint64, r v3.MemberUpdateResponse) { fmt.Printf("Member %16x updated in cluster %16x\n", id, r.Header.ClusterId) } +func (s *simplePrinter) MemberPromote(id uint64, r v3.MemberPromoteResponse) { + fmt.Printf("Member %16x promoted in cluster %16x\n", id, r.Header.ClusterId) +} + func (s *simplePrinter) MemberList(resp v3.MemberListResponse) { _, rows := makeMemberListTable(resp) for _, row := range rows { diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 84683ed3c..ebf8504b7 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -40,8 +40,8 @@ var ( ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err() ErrGRPCMemberBadURLs = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err() ErrGRPCMemberNotFound = status.New(codes.NotFound, "etcdserver: member not found").Err() - ErrGRPCMemberNotLearner = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member which catches up with peers").Err() - ErrGRPCLearnerNotReady = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member").Err() + ErrGRPCMemberNotLearner = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member").Err() + ErrGRPCLearnerNotReady = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member which catches up with leader").Err() ErrGRPCRequestTooLarge = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err() ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err() @@ -95,6 +95,8 @@ var ( ErrorDesc(ErrGRPCMemberNotEnoughStarted): ErrGRPCMemberNotEnoughStarted, ErrorDesc(ErrGRPCMemberBadURLs): ErrGRPCMemberBadURLs, ErrorDesc(ErrGRPCMemberNotFound): ErrGRPCMemberNotFound, + ErrorDesc(ErrGRPCMemberNotLearner): ErrGRPCMemberNotLearner, + ErrorDesc(ErrGRPCLearnerNotReady): ErrGRPCLearnerNotReady, ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge, ErrorDesc(ErrGRPCRequestTooManyRequests): ErrGRPCRequestTooManyRequests, @@ -149,6 +151,8 @@ var ( ErrMemberNotEnoughStarted = Error(ErrGRPCMemberNotEnoughStarted) ErrMemberBadURLs = Error(ErrGRPCMemberBadURLs) ErrMemberNotFound = Error(ErrGRPCMemberNotFound) + ErrMemberNotLearner = Error(ErrGRPCMemberNotLearner) + ErrMemberLearnerNotReady = Error(ErrGRPCLearnerNotReady) ErrRequestTooLarge = Error(ErrGRPCRequestTooLarge) ErrTooManyRequests = Error(ErrGRPCRequestTooManyRequests) From c55519b3a5312a1b53d3ca01b89be734ad19636b Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Wed, 17 Apr 2019 14:45:29 -0700 Subject: [PATCH 10/15] words: whitelist words to fix goword test. --- .words | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.words b/.words index 0e9b4992b..b1eaddc2e 100644 --- a/.words +++ b/.words @@ -104,3 +104,5 @@ PermitWithoutStream __lostleader ErrConnClosing unfreed +grpcAddr +clientURLs From c836e37a83cae1d87fd380d4ca8e4bfb4d776544 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Wed, 17 Apr 2019 14:47:19 -0700 Subject: [PATCH 11/15] etcdserver: remove unnecessary bool comparison Fixes 'gosimple' test. --- etcdserver/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index a2f51ab8f..0ba62e076 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1383,7 +1383,7 @@ func TestPromoteMember(t *testing.T) { if !reflect.DeepEqual(gaction, wactions) { t.Errorf("action = %v, want %v", gaction, wactions) } - if cl.Member(1234).IsLearner == true { + if cl.Member(1234).IsLearner { t.Errorf("member with id 1234 is not promoted") } } From ac057951cc229a7cc7b1a48877a3fa987a0f84c7 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Wed, 17 Apr 2019 14:49:07 -0700 Subject: [PATCH 12/15] integration: remove unnecessary type conversion Fixes go 'unconvert' test. --- integration/v3_leadership_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/v3_leadership_test.go b/integration/v3_leadership_test.go index 4053d419e..d3bc03b16 100644 --- a/integration/v3_leadership_test.go +++ b/integration/v3_leadership_test.go @@ -130,7 +130,7 @@ func TestMoveLeaderToLearnerError(t *testing.T) { learnerID := learners[0].ID leaderIdx := clus.WaitLeader(t) cli := clus.Client(leaderIdx) - _, err = cli.MoveLeader(context.Background(), uint64(learnerID)) + _, err = cli.MoveLeader(context.Background(), learnerID) if err == nil { t.Fatalf("expecting leader transfer to learner to fail, got no error") } From b23c8f3e8f880bd8d5a7af85a801ca6bcf28b60c Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Thu, 18 Apr 2019 11:07:34 -0700 Subject: [PATCH 13/15] clientv3/integration: fix cluster tests Fixes TestMemberAddForLearner and TestMemberPromoteForLearner. --- clientv3/integration/cluster_test.go | 53 +++++++++++++++++----------- 1 file changed, 33 insertions(+), 20 deletions(-) diff --git a/clientv3/integration/cluster_test.go b/clientv3/integration/cluster_test.go index a1fb9d0a0..489e0c9e4 100644 --- a/clientv3/integration/cluster_test.go +++ b/clientv3/integration/cluster_test.go @@ -202,6 +202,16 @@ func TestMemberAddForLearner(t *testing.T) { if !resp.Member.IsLearner { t.Errorf("Added a member as learner, got resp.Member.IsLearner = %v", resp.Member.IsLearner) } + + numberOfLearners := 0 + for _, m := range resp.Members { + if m.IsLearner { + numberOfLearners++ + } + } + if numberOfLearners != 1 { + t.Errorf("Added 1 learner node to cluster, got %d", numberOfLearners) + } } func TestMemberPromoteForLearner(t *testing.T) { @@ -214,35 +224,38 @@ func TestMemberPromoteForLearner(t *testing.T) { capi := clus.RandClient() urls := []string{"http://127.0.0.1:1234"} - isLearner := true - resp, err := capi.MemberAddAsLearner(context.Background(), urls) + memberAddResp, err := capi.MemberAddAsLearner(context.Background(), urls) if err != nil { t.Fatalf("failed to add member %v", err) } - if !resp.Member.IsLearner { - t.Errorf("Added a member as learner, got resp.Member.IsLearner = %v", resp.Member.IsLearner) + if !memberAddResp.Member.IsLearner { + t.Fatalf("Added a member as learner, got resp.Member.IsLearner = %v", memberAddResp.Member.IsLearner) + } + learnerID := memberAddResp.Member.ID + + numberOfLearners := 0 + for _, m := range memberAddResp.Members { + if m.IsLearner { + numberOfLearners++ + } + } + if numberOfLearners != 1 { + t.Fatalf("Added 1 learner node to cluster, got %d", numberOfLearners) } - learners, err := clus.GetLearnerMembers() + memberPromoteResp, err := capi.MemberPromote(context.Background(), learnerID) if err != nil { - t.Fatalf("failed to get the learner members in cluster: %v", err) - } - if len(learners) != 1 { - t.Errorf("Added 1 learner node to cluster, got %d", len(learners)) - } - _, err = capi.MemberPromote(context.Background(), resp.Member.ID) - - if err != nil { - t.Fatalf("failed to promote member error: %v", err) + t.Fatalf("failed to promote member: %v", err) } - learners, err = clus.GetLearnerMembers() - if err != nil { - t.Fatalf("failed to get the number of learners in cluster: %v", err) + numberOfLearners = 0 + for _, m := range memberPromoteResp.Members { + if m.IsLearner { + numberOfLearners++ + } } - if len(learners) != 0 { - t.Errorf("learner promoted, expect 0 learner, got %d", len(learners)) + if numberOfLearners != 0 { + t.Errorf("learner promoted, expect 0 learner, got %d", numberOfLearners) } - } From 90d28c0de7c3f33e0e24dca057e4e4903622bfd9 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Thu, 18 Apr 2019 14:37:01 -0700 Subject: [PATCH 14/15] clientv3/integration: deflake TestKVForLearner Adding delay in the test for the newly started learner member to catch up applying config change entries in raft log. --- clientv3/integration/kv_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 432405ddf..9379b6016 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -1004,13 +1004,16 @@ func TestKVForLearner(t *testing.T) { DialTimeout: 5 * time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()}, } - // this cli only has endpoint of the learner member + // this client only has endpoint of the learner member cli, err := clientv3.New(cfg) if err != nil { t.Fatalf("failed to create clientv3: %v", err) } defer cli.Close() + // waiting for learner member to catch up applying the config change entries in raft log. + time.Sleep(3 * time.Second) + tests := []struct { op clientv3.Op wErr bool From 23f1d02391136aac578df1845c296e19777d3816 Mon Sep 17 00:00:00 2001 From: Jingyi Hu Date: Wed, 15 May 2019 15:57:09 -0700 Subject: [PATCH 15/15] *: address comments --- clientv3/integration/kv_test.go | 1 + etcdserver/api/membership/cluster.go | 22 ++++++++++++---------- etcdserver/api/membership/errors.go | 2 +- etcdserver/api/v3rpc/interceptor.go | 2 +- etcdserver/api/v3rpc/rpctypes/error.go | 2 +- etcdserver/api/v3rpc/util.go | 2 +- etcdserver/server.go | 11 ++++++----- 7 files changed, 23 insertions(+), 19 deletions(-) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 9379b6016..6f70a3ad8 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -1011,6 +1011,7 @@ func TestKVForLearner(t *testing.T) { } defer cli.Close() + // TODO: expose servers's ReadyNotify() in test and use it instead. // waiting for learner member to catch up applying the config change entries in raft log. time.Sleep(3 * time.Second) diff --git a/etcdserver/api/membership/cluster.go b/etcdserver/api/membership/cluster.go index 946052d4f..d5019f7f0 100644 --- a/etcdserver/api/membership/cluster.go +++ b/etcdserver/api/membership/cluster.go @@ -62,6 +62,9 @@ type RaftCluster struct { // ConfigChangeContext represents a context for confChange. type ConfigChangeContext struct { Member + // IsPromote indicates if the config change is for promoting a learner member. + // This flag is needed because both adding a new member and promoting a learner member + // uses the same config change type 'ConfChangeAddNode'. IsPromote bool `json:"isPromote"` } @@ -268,13 +271,6 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { } switch cc.Type { case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode: - urls := make(map[string]bool) - for _, m := range members { - for _, u := range m.PeerURLs { - urls[u] = true - } - } - confChangeContext := new(ConfigChangeContext) if err := json.Unmarshal(cc.Context, confChangeContext); err != nil { if c.lg != nil { @@ -297,7 +293,13 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { return ErrIDExists } - for _, u := range confChangeContext.PeerURLs { + urls := make(map[string]bool) + for _, m := range members { + for _, u := range m.PeerURLs { + urls[u] = true + } + } + for _, u := range confChangeContext.Member.PeerURLs { if urls[u] { return ErrPeerURLexists } @@ -736,8 +738,8 @@ func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version) { } } -// IsLearner returns if the local member is raft learner -func (c *RaftCluster) IsLearner() bool { +// IsLocalMemberLearner returns if the local member is raft learner +func (c *RaftCluster) IsLocalMemberLearner() bool { c.Lock() defer c.Unlock() localMember, ok := c.members[c.localID] diff --git a/etcdserver/api/membership/errors.go b/etcdserver/api/membership/errors.go index dc905aef4..c682ad5fd 100644 --- a/etcdserver/api/membership/errors.go +++ b/etcdserver/api/membership/errors.go @@ -26,7 +26,7 @@ var ( ErrIDNotFound = errors.New("membership: ID not found") ErrPeerURLexists = errors.New("membership: peerURL exists") ErrMemberNotLearner = errors.New("membership: can only promote a learner member") - ErrLearnerNotReady = errors.New("membership: can only promote a learner member which catches up with leader") + ErrLearnerNotReady = errors.New("membership: can only promote a learner member which is in sync with leader") ) func isKeyNotFound(err error) bool { diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 319d349b6..8d20207fb 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -49,7 +49,7 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { } // TODO: add test in clientv3/integration to verify behavior - if s.IsLearner() && !isRPCEnabledForLearner(req) { + if s.IsLearner() && !isRPCSupportedForLearner(req) { return nil, rpctypes.ErrGPRCNotSupportedForLearner } diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index ebf8504b7..a3cc87486 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -41,7 +41,7 @@ var ( ErrGRPCMemberBadURLs = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err() ErrGRPCMemberNotFound = status.New(codes.NotFound, "etcdserver: member not found").Err() ErrGRPCMemberNotLearner = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member").Err() - ErrGRPCLearnerNotReady = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member which catches up with leader").Err() + ErrGRPCLearnerNotReady = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member which is in sync with leader").Err() ErrGRPCRequestTooLarge = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err() ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err() diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index be93c934b..446d02bc5 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -122,7 +122,7 @@ func isClientCtxErr(ctxErr error, err error) bool { } // in v3.4, learner is allowed to serve serializable read and endpoint status -func isRPCEnabledForLearner(req interface{}) bool { +func isRPCSupportedForLearner(req interface{}) bool { switch r := req.(type) { case *pb.StatusRequest: return true diff --git a/etcdserver/server.go b/etcdserver/server.go index b2bdf8516..6e9a9cdb4 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -1377,6 +1377,10 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { ep.snapi = ep.appliedi } +func (s *EtcdServer) hasMultipleVotingMembers() bool { + return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1 +} + func (s *EtcdServer) isLeader() bool { return uint64(s.ID()) == s.Lead() } @@ -1440,7 +1444,7 @@ func (s *EtcdServer) TransferLeadership() error { return nil } - if s.cluster == nil || len(s.cluster.VotingMemberIDs()) <= 1 { + if !s.hasMultipleVotingMembers() { if lg := s.getLogger(); lg != nil { lg.Info( "skipped leadership transfer for single voting member cluster", @@ -1654,9 +1658,6 @@ func (s *EtcdServer) mayPromoteMember(id types.ID) error { return nil } // TODO add more checks whether the member can be promoted. - // like learner progress check or if cluster is ready to promote a learner - // this is an example to get progress - fmt.Printf("raftStatus, %#v\n", raftStatus()) return nil } @@ -2498,5 +2499,5 @@ func (s *EtcdServer) Logger() *zap.Logger { // IsLearner returns if the local member is raft learner func (s *EtcdServer) IsLearner() bool { - return s.cluster.IsLearner() + return s.cluster.IsLocalMemberLearner() }