set an separate applyTimeout for the waitAppliedIndex

This commit is contained in:
ahrtr 2022-03-02 10:47:45 +08:00 committed by ahrtr
parent fe3a57976e
commit 1b3d6cb0c8
5 changed files with 83 additions and 19 deletions

View File

@ -77,6 +77,7 @@ var (
ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err()
ErrGRPCTimeoutDueToLeaderFail = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to previous leader failure").Err()
ErrGRPCTimeoutDueToConnectionLost = status.New(codes.Unavailable, "etcdserver: request timed out, possibly due to connection lost").Err()
ErrGRPCTimeoutWaitAppliedIndex = status.New(codes.Unavailable, "etcdserver: request timed out, waiting for the applied index took too long").Err()
ErrGRPCUnhealthy = status.New(codes.Unavailable, "etcdserver: unhealthy cluster").Err()
ErrGRPCCorrupt = status.New(codes.DataLoss, "etcdserver: corrupt cluster").Err()
ErrGRPCNotSupportedForLearner = status.New(codes.FailedPrecondition, "etcdserver: rpc not supported for learner").Err()
@ -212,6 +213,7 @@ var (
ErrTimeout = Error(ErrGRPCTimeout)
ErrTimeoutDueToLeaderFail = Error(ErrGRPCTimeoutDueToLeaderFail)
ErrTimeoutDueToConnectionLost = Error(ErrGRPCTimeoutDueToConnectionLost)
ErrTimeoutWaitAppliedIndex = Error(ErrGRPCTimeoutWaitAppliedIndex)
ErrUnhealthy = Error(ErrGRPCUnhealthy)
ErrCorrupt = Error(ErrGRPCCorrupt)
ErrBadLeaderTransferee = Error(ErrGRPCBadLeaderTransferee)

View File

@ -54,6 +54,7 @@ var toGRPCErrorMap = map[error]error{
etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout,
etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail,
etcdserver.ErrTimeoutDueToConnectionLost: rpctypes.ErrGRPCTimeoutDueToConnectionLost,
etcdserver.ErrTimeoutWaitAppliedIndex: rpctypes.ErrGRPCTimeoutWaitAppliedIndex,
etcdserver.ErrUnhealthy: rpctypes.ErrGRPCUnhealthy,
etcdserver.ErrKeyNotFound: rpctypes.ErrGRPCKeyNotFound,
etcdserver.ErrCorrupt: rpctypes.ErrGRPCCorrupt,

View File

@ -27,6 +27,7 @@ var (
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost")
ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long")
ErrTimeoutWaitAppliedIndex = errors.New("etcdserver: request timed out, waiting for the applied index took too long")
ErrLeaderChanged = errors.New("etcdserver: leader changed")
ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members")
ErrLearnerNotReady = errors.New("etcdserver: can only promote a learner member which is in sync with leader")

View File

@ -1906,3 +1906,59 @@ func (s *sendMsgAppRespTransporter) Send(m []raftpb.Message) {
}
s.sendC <- send
}
func TestWaitAppliedIndex(t *testing.T) {
cases := []struct {
name string
appliedIndex uint64
committedIndex uint64
action func(s *EtcdServer)
ExpectedError error
}{
{
name: "The applied Id is already equal to the commitId",
appliedIndex: 10,
committedIndex: 10,
action: func(s *EtcdServer) {
s.applyWait.Trigger(10)
},
ExpectedError: nil,
},
{
name: "The etcd server has already stopped",
appliedIndex: 10,
committedIndex: 12,
action: func(s *EtcdServer) {
s.stopping <- struct{}{}
},
ExpectedError: ErrStopped,
},
{
name: "Timed out waiting for the applied index",
appliedIndex: 10,
committedIndex: 12,
action: nil,
ExpectedError: ErrTimeoutWaitAppliedIndex,
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
s := &EtcdServer{
appliedIndex: tc.appliedIndex,
committedIndex: tc.committedIndex,
stopping: make(chan struct{}, 1),
applyWait: wait.NewTimeList(),
}
if tc.action != nil {
go tc.action(s)
}
err := s.waitAppliedIndex()
if err != tc.ExpectedError {
t.Errorf("Unexpected error, want (%v), got (%v)", tc.ExpectedError, err)
}
})
}
}

View File

@ -45,6 +45,10 @@ const (
maxGapBetweenApplyAndCommitIndex = 5000
traceThreshold = 100 * time.Millisecond
readIndexRetryTime = 500 * time.Millisecond
// The timeout for the node to catch up its applied index, and is used in
// lease related operations, such as LeaseRenew and LeaseTimeToLive.
applyTimeout = time.Second
)
type RaftKV interface {
@ -275,13 +279,13 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*
return resp.(*pb.LeaseGrantResponse), nil
}
func (s *EtcdServer) waitAppliedIndex(ctx context.Context) error {
func (s *EtcdServer) waitAppliedIndex() error {
select {
case <-s.ApplyWait():
case <-s.stopping:
return ErrStopped
case <-ctx.Done():
return ErrTimeout
case <-time.After(applyTimeout):
return ErrTimeoutWaitAppliedIndex
}
return nil
@ -296,22 +300,22 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
}
func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
if s.isLeader() {
if err := s.waitAppliedIndex(cctx); err != nil {
if err := s.waitAppliedIndex(); err != nil {
return 0, err
}
ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
}
ttl, err := s.lessor.Renew(id)
if err == nil { // already requested to primary lessor(leader)
return ttl, nil
}
if err != lease.ErrNotPrimary {
return -1, err
}
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
// renewals don't go through raft; forward to leader manually
for cctx.Err() == nil {
@ -321,7 +325,7 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
}
for _, url := range leader.PeerURLs {
lurl := url + leasehttp.LeasePrefix
ttl, err = leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
ttl, err := leasehttp.RenewHTTP(cctx, id, lurl, s.peerRt)
if err == nil || err == lease.ErrLeaseNotFound {
return ttl, err
}
@ -337,11 +341,8 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
}
func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
if s.isLeader() {
if err := s.waitAppliedIndex(cctx); err != nil {
if err := s.waitAppliedIndex(); err != nil {
return nil, err
}
// primary; timetolive directly from leader
@ -362,6 +363,9 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
return resp, nil
}
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
defer cancel()
// forward to leader
for cctx.Err() == nil {
leader, err := s.waitLeader(cctx)