Merge pull request #10727 from jingyih/learner_part2

*: support raft learner in etcd - part 2
This commit is contained in:
Xiang Li 2019-05-15 16:41:08 -07:00 committed by GitHub
commit d4cdbb1ea0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 733 additions and 72 deletions

2
.words
View File

@ -104,3 +104,5 @@ PermitWithoutStream
__lostleader
ErrConnClosing
unfreed
grpcAddr
clientURLs

View File

@ -16,7 +16,6 @@ package clientv3
import (
"context"
"errors"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
"go.etcd.io/etcd/v3/pkg/types"
@ -133,6 +132,10 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
}
func (c *cluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) {
// TODO: implement
return nil, errors.New("not implemented")
r := &pb.MemberPromoteRequest{ID: id}
resp, err := c.remote.MemberPromote(ctx, r, c.callOpts...)
if err != nil {
return nil, toErr(ctx, err)
}
return (*MemberPromoteResponse)(resp), nil
}

View File

@ -16,7 +16,6 @@ package integration
import (
"context"
"fmt"
"reflect"
"strings"
"testing"
@ -204,27 +203,59 @@ func TestMemberAddForLearner(t *testing.T) {
t.Errorf("Added a member as learner, got resp.Member.IsLearner = %v", resp.Member.IsLearner)
}
numOfLearners, err := getNumberOfLearners(clus)
if err != nil {
t.Fatalf("failed to get the number of learners in cluster: %v", err)
}
if numOfLearners != 1 {
t.Errorf("Added 1 learner node to cluster, got %d", numOfLearners)
}
}
// getNumberOfLearners return the number of learner nodes in cluster using MemberList API
func getNumberOfLearners(clus *integration.ClusterV3) (int, error) {
cli := clus.RandClient()
resp, err := cli.MemberList(context.Background())
if err != nil {
return 0, fmt.Errorf("failed to list member %v", err)
}
numberOfLearners := 0
for _, m := range resp.Members {
if m.IsLearner {
numberOfLearners++
}
}
return numberOfLearners, nil
if numberOfLearners != 1 {
t.Errorf("Added 1 learner node to cluster, got %d", numberOfLearners)
}
}
func TestMemberPromoteForLearner(t *testing.T) {
// TODO test not ready learner promotion.
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
// TODO change the random client to client that talk to leader directly.
capi := clus.RandClient()
urls := []string{"http://127.0.0.1:1234"}
memberAddResp, err := capi.MemberAddAsLearner(context.Background(), urls)
if err != nil {
t.Fatalf("failed to add member %v", err)
}
if !memberAddResp.Member.IsLearner {
t.Fatalf("Added a member as learner, got resp.Member.IsLearner = %v", memberAddResp.Member.IsLearner)
}
learnerID := memberAddResp.Member.ID
numberOfLearners := 0
for _, m := range memberAddResp.Members {
if m.IsLearner {
numberOfLearners++
}
}
if numberOfLearners != 1 {
t.Fatalf("Added 1 learner node to cluster, got %d", numberOfLearners)
}
memberPromoteResp, err := capi.MemberPromote(context.Background(), learnerID)
if err != nil {
t.Fatalf("failed to promote member: %v", err)
}
numberOfLearners = 0
for _, m := range memberPromoteResp.Members {
if m.IsLearner {
numberOfLearners++
}
}
if numberOfLearners != 0 {
t.Errorf("learner promoted, expect 0 learner, got %d", numberOfLearners)
}
}

View File

@ -971,3 +971,83 @@ func TestKVLargeRequests(t *testing.T) {
clus.Terminate(t)
}
}
// TestKVForLearner ensures learner member only accepts serializable read request.
func TestKVForLearner(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
// we have to add and launch learner member after initial cluster was created, because
// bootstrapping a cluster with learner member is not supported.
clus.AddAndLaunchLearnerMember(t)
learners, err := clus.GetLearnerMembers()
if err != nil {
t.Fatalf("failed to get the learner members in cluster: %v", err)
}
if len(learners) != 1 {
t.Fatalf("added 1 learner to cluster, got %d", len(learners))
}
if len(clus.Members) != 4 {
t.Fatalf("expecting 4 members in cluster after adding the learner member, got %d", len(clus.Members))
}
// note:
// 1. clus.Members[3] is the newly added learner member, which was appended to clus.Members
// 2. we are using member's grpcAddr instead of clientURLs as the endpoint for clientv3.Config,
// because the implementation of integration test has diverged from embed/etcd.go.
learnerEp := clus.Members[3].GRPCAddr()
cfg := clientv3.Config{
Endpoints: []string{learnerEp},
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
// this client only has endpoint of the learner member
cli, err := clientv3.New(cfg)
if err != nil {
t.Fatalf("failed to create clientv3: %v", err)
}
defer cli.Close()
// TODO: expose servers's ReadyNotify() in test and use it instead.
// waiting for learner member to catch up applying the config change entries in raft log.
time.Sleep(3 * time.Second)
tests := []struct {
op clientv3.Op
wErr bool
}{
{
op: clientv3.OpGet("foo", clientv3.WithSerializable()),
wErr: false,
},
{
op: clientv3.OpGet("foo"),
wErr: true,
},
{
op: clientv3.OpPut("foo", "bar"),
wErr: true,
},
{
op: clientv3.OpDelete("foo"),
wErr: true,
},
{
op: clientv3.OpTxn([]clientv3.Cmp{clientv3.Compare(clientv3.CreateRevision("foo"), "=", 0)}, nil, nil),
wErr: true,
},
}
for idx, test := range tests {
_, err := cli.Do(context.TODO(), test.op)
if err != nil && !test.wErr {
t.Errorf("%d: expect no error, got %v", idx, err)
}
if err == nil && test.wErr {
t.Errorf("%d: expect error, got nil", idx)
}
}
}

View File

@ -60,7 +60,7 @@ func newEpStatusCommand() *cobra.Command {
Use: "status",
Short: "Prints out the status of endpoints specified in `--endpoints` flag",
Long: `When --write-out is set to simple, this command prints out comma-separated status lists for each endpoint.
The items in the lists are endpoint, ID, version, db size, is leader, raft term, raft index.
The items in the lists are endpoint, ID, version, db size, is leader, is learner, raft term, raft index, raft applied index, errors.
`,
Run: epStatusCommandFunc,
}

View File

@ -40,6 +40,7 @@ func NewMemberCommand() *cobra.Command {
mc.AddCommand(NewMemberRemoveCommand())
mc.AddCommand(NewMemberUpdateCommand())
mc.AddCommand(NewMemberListCommand())
mc.AddCommand(NewMemberPromoteCommand())
return mc
}
@ -100,6 +101,20 @@ The items in the lists are ID, Status, Name, Peer Addrs, Client Addrs, Is Learne
return cc
}
// NewMemberPromoteCommand returns the cobra command for "member promote".
func NewMemberPromoteCommand() *cobra.Command {
cc := &cobra.Command{
Use: "promote <memberID>",
Short: "Promotes a non-voting member in the cluster",
Long: `Promotes a non-voting learner member to a voting one in the cluster.
`,
Run: memberPromoteCommandFunc,
}
return cc
}
// memberAddCommandFunc executes the "member add" command.
func memberAddCommandFunc(cmd *cobra.Command, args []string) {
if len(args) < 1 {
@ -238,3 +253,23 @@ func memberListCommandFunc(cmd *cobra.Command, args []string) {
display.MemberList(*resp)
}
// memberPromoteCommandFunc executes the "member promote" command.
func memberPromoteCommandFunc(cmd *cobra.Command, args []string) {
if len(args) != 1 {
ExitWithError(ExitBadArgs, fmt.Errorf("member ID is not provided"))
}
id, err := strconv.ParseUint(args[0], 16, 64)
if err != nil {
ExitWithError(ExitBadArgs, fmt.Errorf("bad member ID arg (%v), expecting ID in Hex", err))
}
ctx, cancel := commandCtx(cmd)
resp, err := mustClientFromCmd(cmd).MemberPromote(ctx, id)
cancel()
if err != nil {
ExitWithError(ExitError, err)
}
display.MemberPromote(id, *resp)
}

View File

@ -42,6 +42,7 @@ type printer interface {
MemberAdd(v3.MemberAddResponse)
MemberRemove(id uint64, r v3.MemberRemoveResponse)
MemberUpdate(id uint64, r v3.MemberUpdateResponse)
MemberPromote(id uint64, r v3.MemberPromoteResponse)
MemberList(v3.MemberListResponse)
EndpointHealth([]epHealth)
@ -194,7 +195,8 @@ func makeEndpointHealthTable(healthList []epHealth) (hdr []string, rows [][]stri
}
func makeEndpointStatusTable(statusList []epStatus) (hdr []string, rows [][]string) {
hdr = []string{"endpoint", "ID", "version", "db size", "is leader", "raft term", "raft index", "raft applied index", "errors"}
hdr = []string{"endpoint", "ID", "version", "db size", "is leader", "is learner", "raft term",
"raft index", "raft applied index", "errors"}
for _, status := range statusList {
rows = append(rows, []string{
status.Ep,
@ -202,6 +204,7 @@ func makeEndpointStatusTable(statusList []epStatus) (hdr []string, rows [][]stri
status.Resp.Version,
humanize.Bytes(uint64(status.Resp.DbSize)),
fmt.Sprint(status.Resp.Leader == status.Resp.Header.MemberId),
fmt.Sprint(status.Resp.IsLearner),
fmt.Sprint(status.Resp.RaftTerm),
fmt.Sprint(status.Resp.RaftIndex),
fmt.Sprint(status.Resp.RaftAppliedIndex),

View File

@ -158,6 +158,7 @@ func (p *fieldsPrinter) EndpointStatus(eps []epStatus) {
fmt.Printf("\"Version\" : %q\n", ep.Resp.Version)
fmt.Println(`"DBSize" :`, ep.Resp.DbSize)
fmt.Println(`"Leader" :`, ep.Resp.Leader)
fmt.Println(`"IsLearner" :`, ep.Resp.IsLearner)
fmt.Println(`"RaftIndex" :`, ep.Resp.RaftIndex)
fmt.Println(`"RaftTerm" :`, ep.Resp.RaftTerm)
fmt.Println(`"RaftAppliedIndex" :`, ep.Resp.RaftAppliedIndex)

View File

@ -136,6 +136,10 @@ func (s *simplePrinter) MemberUpdate(id uint64, r v3.MemberUpdateResponse) {
fmt.Printf("Member %16x updated in cluster %16x\n", id, r.Header.ClusterId)
}
func (s *simplePrinter) MemberPromote(id uint64, r v3.MemberPromoteResponse) {
fmt.Printf("Member %16x promoted in cluster %16x\n", id, r.Header.ClusterId)
}
func (s *simplePrinter) MemberList(resp v3.MemberListResponse) {
_, rows := makeMemberListTable(resp)
for _, row := range rows {

View File

@ -59,6 +59,15 @@ type RaftCluster struct {
removed map[types.ID]bool
}
// ConfigChangeContext represents a context for confChange.
type ConfigChangeContext struct {
Member
// IsPromote indicates if the config change is for promoting a learner member.
// This flag is needed because both adding a new member and promoting a learner member
// uses the same config change type 'ConfChangeAddNode'.
IsPromote bool `json:"isPromote"`
}
// NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating
// cluster with raft learner member.
func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
@ -262,29 +271,40 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
switch cc.Type {
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
if members[id] != nil {
return ErrIDExists
}
urls := make(map[string]bool)
for _, m := range members {
for _, u := range m.PeerURLs {
urls[u] = true
}
}
m := new(Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
confChangeContext := new(ConfigChangeContext)
if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
if c.lg != nil {
c.lg.Panic("failed to unmarshal member", zap.Error(err))
c.lg.Panic("failed to unmarshal confChangeContext", zap.Error(err))
} else {
plog.Panicf("unmarshal member should never fail: %v", err)
plog.Panicf("unmarshal confChangeContext should never fail: %v", err)
}
}
for _, u := range m.PeerURLs {
if urls[u] {
return ErrPeerURLexists
// A ConfChangeAddNode to a existing learner node promotes it to a voting member.
if confChangeContext.IsPromote {
if members[id] == nil {
return ErrIDNotFound
}
if !members[id].IsLearner {
return ErrMemberNotLearner
}
} else {
// add a learner or a follower case
if members[id] != nil {
return ErrIDExists
}
}
urls := make(map[string]bool)
for _, m := range members {
for _, u := range m.PeerURLs {
urls[u] = true
}
}
for _, u := range confChangeContext.Member.PeerURLs {
if urls[u] {
return ErrPeerURLexists
}
}
}
case raftpb.ConfChangeRemoveNode:
if members[id] == nil {
return ErrIDNotFound
@ -434,6 +454,30 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
}
}
// PromoteMember marks the member's IsLearner RaftAttributes to false.
func (c *RaftCluster) PromoteMember(id types.ID) {
c.Lock()
defer c.Unlock()
c.members[id].RaftAttributes.IsLearner = false
if c.v2store != nil {
mustUpdateMemberInStore(c.v2store, c.members[id])
}
if c.be != nil {
mustSaveMemberToBackend(c.be, c.members[id])
}
if c.lg != nil {
c.lg.Info(
"promote member",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
)
} else {
plog.Noticef("promote member %s in cluster %s", id, c.cid)
}
}
func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
c.Lock()
defer c.Unlock()
@ -693,3 +737,44 @@ func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version) {
}
}
}
// IsLocalMemberLearner returns if the local member is raft learner
func (c *RaftCluster) IsLocalMemberLearner() bool {
c.Lock()
defer c.Unlock()
localMember, ok := c.members[c.localID]
if !ok {
if c.lg != nil {
c.lg.Panic(
"failed to find local ID in cluster members",
zap.String("cluster-id", c.cid.String()),
zap.String("local-member-id", c.localID.String()),
)
} else {
plog.Panicf("failed to find local ID %s in cluster %s", c.localID.String(), c.cid.String())
}
}
return localMember.IsLearner
}
// IsMemberExist returns if the member with the given id exists in cluster.
func (c *RaftCluster) IsMemberExist(id types.ID) bool {
c.Lock()
defer c.Unlock()
_, ok := c.members[id]
return ok
}
// VotingMemberIDs returns the ID of voting members in cluster.
func (c *RaftCluster) VotingMemberIDs() []types.ID {
c.Lock()
defer c.Unlock()
var ids []types.ID
for _, m := range c.members {
if !m.IsLearner {
ids = append(ids, m.ID)
}
}
sort.Sort(types.IDSlice(ids))
return ids
}

View File

@ -290,6 +290,12 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
t.Fatal(err)
}
attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
ctx1, err := json.Marshal(&Member{ID: types.ID(1), RaftAttributes: attr})
if err != nil {
t.Fatal(err)
}
attr = RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}}
ctx5, err := json.Marshal(&Member{ID: types.ID(5), RaftAttributes: attr})
if err != nil {
@ -308,6 +314,16 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
t.Fatal(err)
}
ctx3, err := json.Marshal(&ConfigChangeContext{Member: Member{ID: types.ID(3), RaftAttributes: attr}, IsPromote: true})
if err != nil {
t.Fatal(err)
}
ctx6, err := json.Marshal(&ConfigChangeContext{Member: Member{ID: types.ID(6), RaftAttributes: attr}, IsPromote: true})
if err != nil {
t.Fatal(err)
}
tests := []struct {
cc raftpb.ConfChange
werr error
@ -335,8 +351,9 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: 1,
Type: raftpb.ConfChangeAddNode,
NodeID: 1,
Context: ctx1,
},
ErrIDExists,
},
@ -388,6 +405,22 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
},
nil,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: 3,
Context: ctx3,
},
ErrMemberNotLearner,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: 6,
Context: ctx6,
},
ErrIDNotFound,
},
}
for i, tt := range tests {
err := cl.ValidateConfigurationChange(tt.cc)

View File

@ -21,10 +21,12 @@ import (
)
var (
ErrIDRemoved = errors.New("membership: ID removed")
ErrIDExists = errors.New("membership: ID exists")
ErrIDNotFound = errors.New("membership: ID not found")
ErrPeerURLexists = errors.New("membership: peerURL exists")
ErrIDRemoved = errors.New("membership: ID removed")
ErrIDExists = errors.New("membership: ID exists")
ErrIDNotFound = errors.New("membership: ID not found")
ErrPeerURLexists = errors.New("membership: peerURL exists")
ErrMemberNotLearner = errors.New("membership: can only promote a learner member")
ErrLearnerNotReady = errors.New("membership: can only promote a learner member which is in sync with leader")
)
func isKeyNotFound(err error) bool {

View File

@ -132,6 +132,11 @@ func (s *serverRecorder) UpdateMember(_ context.Context, m membership.Member) ([
return nil, nil
}
func (s *serverRecorder) PromoteMember(_ context.Context, id uint64) ([]*membership.Member, error) {
s.actions = append(s.actions, action{name: "PromoteMember", params: []interface{}{id}})
return nil, nil
}
type action struct {
name string
params []interface{}
@ -168,6 +173,9 @@ func (rs *resServer) RemoveMember(_ context.Context, _ uint64) ([]*membership.Me
func (rs *resServer) UpdateMember(_ context.Context, _ membership.Member) ([]*membership.Member, error) {
return nil, nil
}
func (rs *resServer) PromoteMember(_ context.Context, _ uint64) ([]*membership.Member, error) {
return nil, nil
}
func boolp(b bool) *bool { return &b }

View File

@ -74,6 +74,9 @@ func (fs *errServer) RemoveMember(ctx context.Context, id uint64) ([]*membership
func (fs *errServer) UpdateMember(ctx context.Context, m membership.Member) ([]*membership.Member, error) {
return nil, fs.err
}
func (fs *errServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
return nil, fs.err
}
func TestWriteError(t *testing.T) {
// nil error should not panic

View File

@ -79,6 +79,14 @@ func (s *v2v3Server) RemoveMember(ctx context.Context, id uint64) ([]*membership
return v3MembersToMembership(resp.Members), nil
}
func (s *v2v3Server) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
resp, err := s.c.MemberPromote(ctx, id)
if err != nil {
return nil, err
}
return v3MembersToMembership(resp.Members), nil
}
func (s *v2v3Server) UpdateMember(ctx context.Context, m membership.Member) ([]*membership.Member, error) {
resp, err := s.c.MemberUpdate(ctx, uint64(m.ID), m.PeerURLs)
if err != nil {

View File

@ -48,6 +48,11 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return nil, rpctypes.ErrGRPCNotCapable
}
// TODO: add test in clientv3/integration to verify behavior
if s.IsLearner() && !isRPCSupportedForLearner(req) {
return nil, rpctypes.ErrGPRCNotSupportedForLearner
}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
@ -190,6 +195,10 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
return rpctypes.ErrGRPCNotCapable
}
if s.IsLearner() { // learner does not support Watch and LeaseKeepAlive RPC
return rpctypes.ErrGPRCNotSupportedForLearner
}
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {

View File

@ -55,6 +55,10 @@ type AuthGetter interface {
AuthStore() auth.AuthStore
}
type ClusterStatusGetter interface {
IsLearner() bool
}
type maintenanceServer struct {
lg *zap.Logger
rg etcdserver.RaftStatusGetter
@ -63,10 +67,11 @@ type maintenanceServer struct {
a Alarmer
lt LeaderTransferrer
hdr header
cs ClusterStatusGetter
}
func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s)}
srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s}
return &authMaintenanceServer{srv, s}
}
@ -179,6 +184,7 @@ func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (
RaftTerm: ms.rg.Term(),
DbSize: ms.bg.Backend().Size(),
DbSizeInUse: ms.bg.Backend().SizeInUse(),
IsLearner: ms.cs.IsLearner(),
}
if resp.Leader == raft.None {
resp.Errors = append(resp.Errors, etcdserver.ErrNoLeader.Error())

View File

@ -16,7 +16,6 @@ package v3rpc
import (
"context"
"errors"
"time"
"go.etcd.io/etcd/v3/etcdserver"
@ -94,8 +93,11 @@ func (cs *ClusterServer) MemberList(ctx context.Context, r *pb.MemberListRequest
}
func (cs *ClusterServer) MemberPromote(ctx context.Context, r *pb.MemberPromoteRequest) (*pb.MemberPromoteResponse, error) {
// TODO: implement
return nil, errors.New("not implemented")
membs, err := cs.server.PromoteMember(ctx, r.ID)
if err != nil {
return nil, togRPCError(err)
}
return &pb.MemberPromoteResponse{Header: cs.header(), Members: membersToProtoMembers(membs)}, nil
}
func (cs *ClusterServer) header() *pb.ResponseHeader {

View File

@ -40,6 +40,8 @@ var (
ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
ErrGRPCMemberBadURLs = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err()
ErrGRPCMemberNotFound = status.New(codes.NotFound, "etcdserver: member not found").Err()
ErrGRPCMemberNotLearner = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member").Err()
ErrGRPCLearnerNotReady = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member which is in sync with leader").Err()
ErrGRPCRequestTooLarge = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err()
ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err()
@ -69,6 +71,8 @@ var (
ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
ErrGPRCNotSupportedForLearner = status.New(codes.FailedPrecondition, "etcdserver: rpc not supported for learner").Err()
ErrGRPCBadLeaderTransferee = status.New(codes.FailedPrecondition, "etcdserver: bad leader transferee").Err()
errStringToError = map[string]error{
ErrorDesc(ErrGRPCEmptyKey): ErrGRPCEmptyKey,
@ -91,6 +95,8 @@ var (
ErrorDesc(ErrGRPCMemberNotEnoughStarted): ErrGRPCMemberNotEnoughStarted,
ErrorDesc(ErrGRPCMemberBadURLs): ErrGRPCMemberBadURLs,
ErrorDesc(ErrGRPCMemberNotFound): ErrGRPCMemberNotFound,
ErrorDesc(ErrGRPCMemberNotLearner): ErrGRPCMemberNotLearner,
ErrorDesc(ErrGRPCLearnerNotReady): ErrGRPCLearnerNotReady,
ErrorDesc(ErrGRPCRequestTooLarge): ErrGRPCRequestTooLarge,
ErrorDesc(ErrGRPCRequestTooManyRequests): ErrGRPCRequestTooManyRequests,
@ -120,6 +126,7 @@ var (
ErrorDesc(ErrGRPCTimeoutDueToConnectionLost): ErrGRPCTimeoutDueToConnectionLost,
ErrorDesc(ErrGRPCUnhealthy): ErrGRPCUnhealthy,
ErrorDesc(ErrGRPCCorrupt): ErrGRPCCorrupt,
ErrorDesc(ErrGRPCBadLeaderTransferee): ErrGRPCBadLeaderTransferee,
}
)
@ -144,6 +151,8 @@ var (
ErrMemberNotEnoughStarted = Error(ErrGRPCMemberNotEnoughStarted)
ErrMemberBadURLs = Error(ErrGRPCMemberBadURLs)
ErrMemberNotFound = Error(ErrGRPCMemberNotFound)
ErrMemberNotLearner = Error(ErrGRPCMemberNotLearner)
ErrMemberLearnerNotReady = Error(ErrGRPCLearnerNotReady)
ErrRequestTooLarge = Error(ErrGRPCRequestTooLarge)
ErrTooManyRequests = Error(ErrGRPCRequestTooManyRequests)
@ -173,6 +182,7 @@ var (
ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
ErrUnhealthy = Error(ErrGRPCUnhealthy)
ErrCorrupt = Error(ErrGRPCCorrupt)
ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)
)
// EtcdError defines gRPC server errors.

View File

@ -22,6 +22,7 @@ import (
"go.etcd.io/etcd/v3/etcdserver"
"go.etcd.io/etcd/v3/etcdserver/api/membership"
"go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
"go.etcd.io/etcd/v3/lease"
"go.etcd.io/etcd/v3/mvcc"
@ -34,6 +35,8 @@ var toGRPCErrorMap = map[error]error{
membership.ErrIDNotFound: rpctypes.ErrGRPCMemberNotFound,
membership.ErrIDExists: rpctypes.ErrGRPCMemberExist,
membership.ErrPeerURLexists: rpctypes.ErrGRPCPeerURLExist,
membership.ErrMemberNotLearner: rpctypes.ErrGRPCMemberNotLearner,
membership.ErrLearnerNotReady: rpctypes.ErrGRPCLearnerNotReady,
etcdserver.ErrNotEnoughStartedMembers: rpctypes.ErrMemberNotEnoughStarted,
mvcc.ErrCompacted: rpctypes.ErrGRPCCompacted,
@ -52,6 +55,7 @@ var toGRPCErrorMap = map[error]error{
etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,
etcdserver.ErrBadLeaderTransferee: rpctypes.ErrGRPCBadLeaderTransferee,
lease.ErrLeaseNotFound: rpctypes.ErrGRPCLeaseNotFound,
lease.ErrLeaseExists: rpctypes.ErrGRPCLeaseExist,
@ -116,3 +120,15 @@ func isClientCtxErr(ctxErr error, err error) bool {
}
return false
}
// in v3.4, learner is allowed to serve serializable read and endpoint status
func isRPCSupportedForLearner(req interface{}) bool {
switch r := req.(type) {
case *pb.StatusRequest:
return true
case *pb.RangeRequest:
return r.Serializable
default:
return false
}
}

View File

@ -37,6 +37,7 @@ var (
ErrUnhealthy = errors.New("etcdserver: unhealthy cluster")
ErrKeyNotFound = errors.New("etcdserver: key not found")
ErrCorrupt = errors.New("etcdserver: corrupt cluster")
ErrBadLeaderTransferee = errors.New("etcdserver: bad leader transferee")
)
type DiscoveryError struct {

View File

@ -156,6 +156,11 @@ type Server interface {
// UpdateMember attempts to update an existing member in the cluster. It will
// return ErrIDNotFound if the member ID does not exist.
UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
// PromoteMember attempts to promote a non-voting node to a voting node. It will
// return ErrIDNotFound if the member ID does not exist.
// return ErrLearnerNotReady if the member are not ready.
// return ErrMemberNotLearner if the member is not a learner.
PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
// ClusterVersion is the cluster-wide minimum major.minor version.
// Cluster version is set to the min version that an etcd member is
@ -1372,8 +1377,8 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
ep.snapi = ep.appliedi
}
func (s *EtcdServer) isMultiNode() bool {
return s.cluster != nil && len(s.cluster.MemberIDs()) > 1
func (s *EtcdServer) hasMultipleVotingMembers() bool {
return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
}
func (s *EtcdServer) isLeader() bool {
@ -1382,6 +1387,10 @@ func (s *EtcdServer) isLeader() bool {
// MoveLeader transfers the leader to the given transferee.
func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error {
if !s.cluster.IsMemberExist(types.ID(transferee)) || s.cluster.Member(types.ID(transferee)).IsLearner {
return ErrBadLeaderTransferee
}
now := time.Now()
interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
@ -1435,20 +1444,20 @@ func (s *EtcdServer) TransferLeadership() error {
return nil
}
if !s.isMultiNode() {
if !s.hasMultipleVotingMembers() {
if lg := s.getLogger(); lg != nil {
lg.Info(
"skipped leadership transfer; it's a single-node cluster",
"skipped leadership transfer for single voting member cluster",
zap.String("local-member-id", s.ID().String()),
zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
)
} else {
plog.Printf("skipped leadership transfer for single member cluster")
plog.Printf("skipped leadership transfer for single voting member cluster")
}
return nil
}
transferee, ok := longestConnected(s.r.transport, s.cluster.MemberIDs())
transferee, ok := longestConnected(s.r.transport, s.cluster.VotingMemberIDs())
if !ok {
return ErrUnhealthy
}
@ -1611,6 +1620,48 @@ func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership
return s.configure(ctx, cc)
}
// PromoteMember promotes a learner node to a voting node.
func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
if err := s.checkMembershipOperationPermission(ctx); err != nil {
return nil, err
}
// check if we can promote this learner
if err := s.mayPromoteMember(types.ID(id)); err != nil {
return nil, err
}
// build the context for the promote confChange. mark IsLearner to false and IsPromote to true.
promoteChangeContext := membership.ConfigChangeContext{
Member: membership.Member{
ID: types.ID(id),
},
IsPromote: true,
}
b, err := json.Marshal(promoteChangeContext)
if err != nil {
return nil, err
}
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: id,
Context: b,
}
return s.configure(ctx, cc)
}
func (s *EtcdServer) mayPromoteMember(id types.ID) error {
if !s.Cfg.StrictReconfigCheck {
return nil
}
// TODO add more checks whether the member can be promoted.
return nil
}
func (s *EtcdServer) mayRemoveMember(id types.ID) error {
if !s.Cfg.StrictReconfigCheck {
return nil
@ -2061,28 +2112,33 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
*confState = *s.r.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
m := new(membership.Member)
if err := json.Unmarshal(cc.Context, m); err != nil {
confChangeContext := new(membership.ConfigChangeContext)
if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
if lg != nil {
lg.Panic("failed to unmarshal member", zap.Error(err))
} else {
plog.Panicf("unmarshal member should never fail: %v", err)
}
}
if cc.NodeID != uint64(m.ID) {
if cc.NodeID != uint64(confChangeContext.Member.ID) {
if lg != nil {
lg.Panic(
"got different member ID",
zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
zap.String("member-id-from-message", m.ID.String()),
zap.String("member-id-from-message", confChangeContext.Member.ID.String()),
)
} else {
plog.Panicf("nodeID should always be equal to member ID")
}
}
s.cluster.AddMember(m)
if m.ID != s.id {
s.r.transport.AddPeer(m.ID, m.PeerURLs)
if confChangeContext.IsPromote {
s.cluster.PromoteMember(confChangeContext.Member.ID)
} else {
s.cluster.AddMember(&confChangeContext.Member)
if confChangeContext.Member.ID != s.id {
s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
}
}
case raftpb.ConfChangeRemoveNode:
@ -2440,3 +2496,8 @@ func (s *EtcdServer) Alarms() []*pb.AlarmMember {
func (s *EtcdServer) Logger() *zap.Logger {
return s.lg
}
// IsLearner returns if the local member is raft learner
func (s *EtcdServer) IsLearner() bool {
return s.cluster.IsLocalMemberLearner()
}

View File

@ -508,35 +508,57 @@ func TestApplyConfChangeError(t *testing.T) {
}
cl.RemoveMember(4)
attr := membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 1)}}
ctx, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
if err != nil {
t.Fatal(err)
}
attr = membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 4)}}
ctx4, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
if err != nil {
t.Fatal(err)
}
attr = membership.RaftAttributes{PeerURLs: []string{fmt.Sprintf("http://127.0.0.1:%d", 5)}}
ctx5, err := json.Marshal(&membership.Member{ID: types.ID(1), RaftAttributes: attr})
if err != nil {
t.Fatal(err)
}
tests := []struct {
cc raftpb.ConfChange
werr error
}{
{
raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: 4,
Type: raftpb.ConfChangeAddNode,
NodeID: 4,
Context: ctx4,
},
membership.ErrIDRemoved,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeUpdateNode,
NodeID: 4,
Type: raftpb.ConfChangeUpdateNode,
NodeID: 4,
Context: ctx4,
},
membership.ErrIDRemoved,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: 1,
Type: raftpb.ConfChangeAddNode,
NodeID: 1,
Context: ctx,
},
membership.ErrIDExists,
},
{
raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: 5,
Type: raftpb.ConfChangeRemoveNode,
NodeID: 5,
Context: ctx5,
},
membership.ErrIDNotFound,
},
@ -553,7 +575,7 @@ func TestApplyConfChangeError(t *testing.T) {
if err != tt.werr {
t.Errorf("#%d: applyConfChange error = %v, want %v", i, err, tt.werr)
}
cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None}
cc := raftpb.ConfChange{Type: tt.cc.Type, NodeID: raft.None, Context: tt.cc.Context}
w := []testutil.Action{
{
Name: "ApplyConfChange",
@ -1318,6 +1340,54 @@ func TestRemoveMember(t *testing.T) {
}
}
// TestPromoteMember tests PromoteMember can propose and perform learner node promotion.
func TestPromoteMember(t *testing.T) {
n := newNodeConfChangeCommitterRecorder()
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
}
cl := newTestCluster(nil)
st := v2store.New()
cl.SetStore(v2store.New())
cl.AddMember(&membership.Member{
ID: 1234,
RaftAttributes: membership.RaftAttributes{
IsLearner: true,
},
})
r := newRaftNode(raftNodeConfig{
lg: zap.NewExample(),
Node: n,
raftStorage: raft.NewMemoryStorage(),
storage: mockstorage.NewStorageRecorder(""),
transport: newNopTransporter(),
})
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zap.NewExample(),
r: *r,
v2store: st,
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
}
s.start()
_, err := s.PromoteMember(context.TODO(), 1234)
gaction := n.Action()
s.Stop()
if err != nil {
t.Fatalf("PromoteMember error: %v", err)
}
wactions := []testutil.Action{{Name: "ProposeConfChange:ConfChangeAddNode"}, {Name: "ApplyConfChange:ConfChangeAddNode"}}
if !reflect.DeepEqual(gaction, wactions) {
t.Errorf("action = %v, want %v", gaction, wactions)
}
if cl.Member(1234).IsLearner {
t.Errorf("member with id 1234 is not promoted")
}
}
// TestUpdateMember tests RemoveMember can propose and perform node update.
func TestUpdateMember(t *testing.T) {
n := newNodeConfChangeCommitterRecorder()

View File

@ -559,6 +559,8 @@ type member struct {
clientMaxCallSendMsgSize int
clientMaxCallRecvMsgSize int
useIP bool
isLearner bool
}
func (m *member) GRPCAddr() string { return m.grpcAddr }
@ -1272,3 +1274,121 @@ type grpcAPI struct {
// Election is the election API for the client's connection.
Election epb.ElectionClient
}
// GetLearnerMembers returns the list of learner members in cluster using MemberList API.
func (c *ClusterV3) GetLearnerMembers() ([]*pb.Member, error) {
cli := c.Client(0)
resp, err := cli.MemberList(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to list member %v", err)
}
var learners []*pb.Member
for _, m := range resp.Members {
if m.IsLearner {
learners = append(learners, m)
}
}
return learners, nil
}
// AddAndLaunchLearnerMember creates a leaner member, adds it to cluster
// via v3 MemberAdd API, and then launches the new member.
func (c *ClusterV3) AddAndLaunchLearnerMember(t testing.TB) {
m := c.mustNewMember(t)
m.isLearner = true
scheme := schemeFromTLSInfo(c.cfg.PeerTLS)
peerURLs := []string{scheme + "://" + m.PeerListeners[0].Addr().String()}
cli := c.Client(0)
_, err := cli.MemberAddAsLearner(context.Background(), peerURLs)
if err != nil {
t.Fatalf("failed to add learner member %v", err)
}
m.InitialPeerURLsMap = types.URLsMap{}
for _, mm := range c.Members {
m.InitialPeerURLsMap[mm.Name] = mm.PeerURLs
}
m.InitialPeerURLsMap[m.Name] = m.PeerURLs
m.NewCluster = false
if err := m.Launch(); err != nil {
t.Fatal(err)
}
c.Members = append(c.Members, m)
c.waitMembersMatch(t)
}
// getMembers returns a list of members in cluster, in format of etcdserverpb.Member
func (c *ClusterV3) getMembers() []*pb.Member {
var mems []*pb.Member
for _, m := range c.Members {
mem := &pb.Member{
Name: m.Name,
PeerURLs: m.PeerURLs.StringSlice(),
ClientURLs: m.ClientURLs.StringSlice(),
IsLearner: m.isLearner,
}
mems = append(mems, mem)
}
return mems
}
// waitMembersMatch waits until v3rpc MemberList returns the 'same' members info as the
// local 'c.Members', which is the local recording of members in the testing cluster. With
// the exception that the local recording c.Members does not have info on Member.ID, which
// is generated when the member is been added to cluster.
//
// Note:
// A successful match means the Member.clientURLs are matched. This means member has already
// finished publishing its server attributes to cluster. Publishing attributes is a cluster-wide
// write request (in v2 server). Therefore, at this point, any raft log entries prior to this
// would have already been applied.
//
// If a new member was added to an existing cluster, at this point, it has finished publishing
// its own server attributes to the cluster. And therefore by the same argument, it has already
// applied the raft log entries (especially those of type raftpb.ConfChangeType). At this point,
// the new member has the correct view of the cluster configuration.
//
// Special note on learner member:
// Learner member is only added to a cluster via v3rpc MemberAdd API (as of v3.4). When starting
// the learner member, its initial view of the cluster created by peerURLs map does not have info
// on whether or not the new member itself is learner. But at this point, a successful match does
// indicate that the new learner member has applied the raftpb.ConfChangeAddLearnerNode entry
// which was used to add the learner itself to the cluster, and therefore it has the correct info
// on learner.
func (c *ClusterV3) waitMembersMatch(t testing.TB) {
wMembers := c.getMembers()
sort.Sort(SortableProtoMemberSliceByPeerURLs(wMembers))
cli := c.Client(0)
for {
resp, err := cli.MemberList(context.Background())
if err != nil {
t.Fatalf("failed to list member %v", err)
}
if len(resp.Members) != len(wMembers) {
continue
}
sort.Sort(SortableProtoMemberSliceByPeerURLs(resp.Members))
for _, m := range resp.Members {
m.ID = 0
}
if reflect.DeepEqual(resp.Members, wMembers) {
return
}
time.Sleep(tickDuration)
}
}
type SortableProtoMemberSliceByPeerURLs []*pb.Member
func (p SortableProtoMemberSliceByPeerURLs) Len() int { return len(p) }
func (p SortableProtoMemberSliceByPeerURLs) Less(i, j int) bool {
return p[i].PeerURLs[0] < p[j].PeerURLs[0]
}
func (p SortableProtoMemberSliceByPeerURLs) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@ -16,6 +16,7 @@ package integration
import (
"context"
"strings"
"testing"
"time"
@ -106,3 +107,70 @@ func TestMoveLeaderError(t *testing.T) {
t.Errorf("err = %v, want %v", err, rpctypes.ErrGRPCNotLeader)
}
}
// TestMoveLeaderToLearnerError ensures that leader transfer to learner member will fail.
func TestMoveLeaderToLearnerError(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)
// we have to add and launch learner member after initial cluster was created, because
// bootstrapping a cluster with learner member is not supported.
clus.AddAndLaunchLearnerMember(t)
learners, err := clus.GetLearnerMembers()
if err != nil {
t.Fatalf("failed to get the learner members in cluster: %v", err)
}
if len(learners) != 1 {
t.Fatalf("added 1 learner to cluster, got %d", len(learners))
}
learnerID := learners[0].ID
leaderIdx := clus.WaitLeader(t)
cli := clus.Client(leaderIdx)
_, err = cli.MoveLeader(context.Background(), learnerID)
if err == nil {
t.Fatalf("expecting leader transfer to learner to fail, got no error")
}
expectedErrKeywords := "bad leader transferee"
if !strings.Contains(err.Error(), expectedErrKeywords) {
t.Errorf("expecting error to contain %s, got %s", expectedErrKeywords, err.Error())
}
}
// TestTransferLeadershipWithLearner ensures TransferLeadership does not timeout due to learner is
// automatically picked by leader as transferee.
func TestTransferLeadershipWithLearner(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)
clus.AddAndLaunchLearnerMember(t)
learners, err := clus.GetLearnerMembers()
if err != nil {
t.Fatalf("failed to get the learner members in cluster: %v", err)
}
if len(learners) != 1 {
t.Fatalf("added 1 learner to cluster, got %d", len(learners))
}
leaderIdx := clus.WaitLeader(t)
errCh := make(chan error, 1)
go func() {
// note that this cluster has 1 leader and 1 learner. TransferLeadership should return nil.
// Leadership transfer is skipped in cluster with 1 voting member.
errCh <- clus.Members[leaderIdx].s.TransferLeadership()
}()
select {
case err := <-errCh:
if err != nil {
t.Errorf("got error during leadership transfer: %v", err)
}
case <-time.After(5 * time.Second):
t.Error("timed out waiting for leader transition")
}
}