*: address comments

This commit is contained in:
Jingyi Hu 2019-05-15 15:57:09 -07:00
parent 90d28c0de7
commit 23f1d02391
7 changed files with 23 additions and 19 deletions

View File

@ -1011,6 +1011,7 @@ func TestKVForLearner(t *testing.T) {
}
defer cli.Close()
// TODO: expose servers's ReadyNotify() in test and use it instead.
// waiting for learner member to catch up applying the config change entries in raft log.
time.Sleep(3 * time.Second)

View File

@ -62,6 +62,9 @@ type RaftCluster struct {
// ConfigChangeContext represents a context for confChange.
type ConfigChangeContext struct {
Member
// IsPromote indicates if the config change is for promoting a learner member.
// This flag is needed because both adding a new member and promoting a learner member
// uses the same config change type 'ConfChangeAddNode'.
IsPromote bool `json:"isPromote"`
}
@ -268,13 +271,6 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
}
switch cc.Type {
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
urls := make(map[string]bool)
for _, m := range members {
for _, u := range m.PeerURLs {
urls[u] = true
}
}
confChangeContext := new(ConfigChangeContext)
if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
if c.lg != nil {
@ -297,7 +293,13 @@ func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
return ErrIDExists
}
for _, u := range confChangeContext.PeerURLs {
urls := make(map[string]bool)
for _, m := range members {
for _, u := range m.PeerURLs {
urls[u] = true
}
}
for _, u := range confChangeContext.Member.PeerURLs {
if urls[u] {
return ErrPeerURLexists
}
@ -736,8 +738,8 @@ func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version) {
}
}
// IsLearner returns if the local member is raft learner
func (c *RaftCluster) IsLearner() bool {
// IsLocalMemberLearner returns if the local member is raft learner
func (c *RaftCluster) IsLocalMemberLearner() bool {
c.Lock()
defer c.Unlock()
localMember, ok := c.members[c.localID]

View File

@ -26,7 +26,7 @@ var (
ErrIDNotFound = errors.New("membership: ID not found")
ErrPeerURLexists = errors.New("membership: peerURL exists")
ErrMemberNotLearner = errors.New("membership: can only promote a learner member")
ErrLearnerNotReady = errors.New("membership: can only promote a learner member which catches up with leader")
ErrLearnerNotReady = errors.New("membership: can only promote a learner member which is in sync with leader")
)
func isKeyNotFound(err error) bool {

View File

@ -49,7 +49,7 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
}
// TODO: add test in clientv3/integration to verify behavior
if s.IsLearner() && !isRPCEnabledForLearner(req) {
if s.IsLearner() && !isRPCSupportedForLearner(req) {
return nil, rpctypes.ErrGPRCNotSupportedForLearner
}

View File

@ -41,7 +41,7 @@ var (
ErrGRPCMemberBadURLs = status.New(codes.InvalidArgument, "etcdserver: given member URLs are invalid").Err()
ErrGRPCMemberNotFound = status.New(codes.NotFound, "etcdserver: member not found").Err()
ErrGRPCMemberNotLearner = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member").Err()
ErrGRPCLearnerNotReady = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member which catches up with leader").Err()
ErrGRPCLearnerNotReady = status.New(codes.FailedPrecondition, "etcdserver: can only promote a learner member which is in sync with leader").Err()
ErrGRPCRequestTooLarge = status.New(codes.InvalidArgument, "etcdserver: request is too large").Err()
ErrGRPCRequestTooManyRequests = status.New(codes.ResourceExhausted, "etcdserver: too many requests").Err()

View File

@ -122,7 +122,7 @@ func isClientCtxErr(ctxErr error, err error) bool {
}
// in v3.4, learner is allowed to serve serializable read and endpoint status
func isRPCEnabledForLearner(req interface{}) bool {
func isRPCSupportedForLearner(req interface{}) bool {
switch r := req.(type) {
case *pb.StatusRequest:
return true

View File

@ -1377,6 +1377,10 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
ep.snapi = ep.appliedi
}
func (s *EtcdServer) hasMultipleVotingMembers() bool {
return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
}
func (s *EtcdServer) isLeader() bool {
return uint64(s.ID()) == s.Lead()
}
@ -1440,7 +1444,7 @@ func (s *EtcdServer) TransferLeadership() error {
return nil
}
if s.cluster == nil || len(s.cluster.VotingMemberIDs()) <= 1 {
if !s.hasMultipleVotingMembers() {
if lg := s.getLogger(); lg != nil {
lg.Info(
"skipped leadership transfer for single voting member cluster",
@ -1654,9 +1658,6 @@ func (s *EtcdServer) mayPromoteMember(id types.ID) error {
return nil
}
// TODO add more checks whether the member can be promoted.
// like learner progress check or if cluster is ready to promote a learner
// this is an example to get progress
fmt.Printf("raftStatus, %#v\n", raftStatus())
return nil
}
@ -2498,5 +2499,5 @@ func (s *EtcdServer) Logger() *zap.Logger {
// IsLearner returns if the local member is raft learner
func (s *EtcdServer) IsLearner() bool {
return s.cluster.IsLearner()
return s.cluster.IsLocalMemberLearner()
}