mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12780 from wpedrak/read_index_retry
Read index retry
This commit is contained in:
commit
30ce6067da
@ -44,6 +44,7 @@ const (
|
|||||||
// We should stop accepting new proposals if the gap growing to a certain point.
|
// We should stop accepting new proposals if the gap growing to a certain point.
|
||||||
maxGapBetweenApplyAndCommitIndex = 5000
|
maxGapBetweenApplyAndCommitIndex = 5000
|
||||||
traceThreshold = 100 * time.Millisecond
|
traceThreshold = 100 * time.Millisecond
|
||||||
|
readIndexRetryTime = 500 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
type RaftKV interface {
|
type RaftKV interface {
|
||||||
@ -706,12 +707,8 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
|
|||||||
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
|
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
|
||||||
|
|
||||||
func (s *EtcdServer) linearizableReadLoop() {
|
func (s *EtcdServer) linearizableReadLoop() {
|
||||||
var rs raft.ReadState
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
ctxToSend := make([]byte, 8)
|
requestId := s.reqIDGen.Next()
|
||||||
id1 := s.reqIDGen.Next()
|
|
||||||
binary.BigEndian.PutUint64(ctxToSend, id1)
|
|
||||||
leaderChangedNotifier := s.LeaderChangedNotify()
|
leaderChangedNotifier := s.LeaderChangedNotify()
|
||||||
select {
|
select {
|
||||||
case <-leaderChangedNotifier:
|
case <-leaderChangedNotifier:
|
||||||
@ -724,82 +721,37 @@ func (s *EtcdServer) linearizableReadLoop() {
|
|||||||
// as a single loop is can unlock multiple reads, it is not very useful
|
// as a single loop is can unlock multiple reads, it is not very useful
|
||||||
// to propagate the trace from Txn or Range.
|
// to propagate the trace from Txn or Range.
|
||||||
trace := traceutil.New("linearizableReadLoop", s.Logger())
|
trace := traceutil.New("linearizableReadLoop", s.Logger())
|
||||||
nextnr := newNotifier()
|
|
||||||
|
|
||||||
|
nextnr := newNotifier()
|
||||||
s.readMu.Lock()
|
s.readMu.Lock()
|
||||||
nr := s.readNotifier
|
nr := s.readNotifier
|
||||||
s.readNotifier = nextnr
|
s.readNotifier = nextnr
|
||||||
s.readMu.Unlock()
|
s.readMu.Unlock()
|
||||||
|
|
||||||
lg := s.Logger()
|
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
|
||||||
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
if isStopped(err) {
|
||||||
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
|
|
||||||
cancel()
|
|
||||||
if err == raft.ErrStopped {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
lg.Warn("failed to get read index from Raft", zap.Error(err))
|
if err != nil {
|
||||||
readIndexFailed.Inc()
|
|
||||||
nr.notify(err)
|
nr.notify(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cancel()
|
|
||||||
|
|
||||||
var (
|
|
||||||
timeout bool
|
|
||||||
done bool
|
|
||||||
)
|
|
||||||
for !timeout && !done {
|
|
||||||
select {
|
|
||||||
case rs = <-s.r.readStateC:
|
|
||||||
done = bytes.Equal(rs.RequestCtx, ctxToSend)
|
|
||||||
if !done {
|
|
||||||
// a previous request might time out. now we should ignore the response of it and
|
|
||||||
// continue waiting for the response of the current requests.
|
|
||||||
id2 := uint64(0)
|
|
||||||
if len(rs.RequestCtx) == 8 {
|
|
||||||
id2 = binary.BigEndian.Uint64(rs.RequestCtx)
|
|
||||||
}
|
|
||||||
lg.Warn(
|
|
||||||
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
|
|
||||||
zap.Uint64("sent-request-id", id1),
|
|
||||||
zap.Uint64("received-request-id", id2),
|
|
||||||
)
|
|
||||||
slowReadIndex.Inc()
|
|
||||||
}
|
|
||||||
case <-leaderChangedNotifier:
|
|
||||||
timeout = true
|
|
||||||
readIndexFailed.Inc()
|
|
||||||
// return a retryable error.
|
|
||||||
nr.notify(ErrLeaderChanged)
|
|
||||||
case <-time.After(s.Cfg.ReqTimeout()):
|
|
||||||
lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))
|
|
||||||
nr.notify(ErrTimeout)
|
|
||||||
timeout = true
|
|
||||||
slowReadIndex.Inc()
|
|
||||||
case <-s.stopping:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !done {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
trace.Step("read index received")
|
trace.Step("read index received")
|
||||||
|
|
||||||
index := rs.Index
|
trace.AddField(traceutil.Field{Key: "readStateIndex", Value: confirmedIndex})
|
||||||
trace.AddField(traceutil.Field{Key: "readStateIndex", Value: index})
|
|
||||||
|
|
||||||
ai := s.getAppliedIndex()
|
appliedIndex := s.getAppliedIndex()
|
||||||
trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(ai, 10)})
|
trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(appliedIndex, 10)})
|
||||||
|
|
||||||
if ai < index {
|
if appliedIndex < confirmedIndex {
|
||||||
select {
|
select {
|
||||||
case <-s.applyWait.Wait(index):
|
case <-s.applyWait.Wait(confirmedIndex):
|
||||||
case <-s.stopping:
|
case <-s.stopping:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// unblock all l-reads requested at indices before rs.Index
|
// unblock all l-reads requested at indices before confirmedIndex
|
||||||
nr.notify(nil)
|
nr.notify(nil)
|
||||||
trace.Step("applied index is now lower than readState.Index")
|
trace.Step("applied index is now lower than readState.Index")
|
||||||
|
|
||||||
@ -807,6 +759,96 @@ func (s *EtcdServer) linearizableReadLoop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isStopped(err error) bool {
|
||||||
|
return err == raft.ErrStopped || err == ErrStopped
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
|
||||||
|
err := s.sendReadIndex(requestId)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
lg := s.Logger()
|
||||||
|
errorTimer := time.NewTimer(s.Cfg.ReqTimeout())
|
||||||
|
defer errorTimer.Stop()
|
||||||
|
retryTimer := time.NewTimer(readIndexRetryTime)
|
||||||
|
defer retryTimer.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case rs := <-s.r.readStateC:
|
||||||
|
requestIdBytes := uint64ToBigEndianBytes(requestId)
|
||||||
|
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
|
||||||
|
if !gotOwnResponse {
|
||||||
|
// a previous request might time out. now we should ignore the response of it and
|
||||||
|
// continue waiting for the response of the current requests.
|
||||||
|
responseId := uint64(0)
|
||||||
|
if len(rs.RequestCtx) == 8 {
|
||||||
|
responseId = binary.BigEndian.Uint64(rs.RequestCtx)
|
||||||
|
}
|
||||||
|
lg.Warn(
|
||||||
|
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
|
||||||
|
zap.Uint64("sent-request-id", requestId),
|
||||||
|
zap.Uint64("received-request-id", responseId),
|
||||||
|
)
|
||||||
|
slowReadIndex.Inc()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return rs.Index, nil
|
||||||
|
case <-leaderChangedNotifier:
|
||||||
|
readIndexFailed.Inc()
|
||||||
|
// return a retryable error.
|
||||||
|
return 0, ErrLeaderChanged
|
||||||
|
case <-retryTimer.C:
|
||||||
|
lg.Warn(
|
||||||
|
"waiting for ReadIndex response took too long, retrying",
|
||||||
|
zap.Uint64("sent-request-id", requestId),
|
||||||
|
zap.Duration("retry-timeout", readIndexRetryTime),
|
||||||
|
)
|
||||||
|
err := s.sendReadIndex(requestId)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
retryTimer.Reset(readIndexRetryTime)
|
||||||
|
continue
|
||||||
|
case <-errorTimer.C:
|
||||||
|
lg.Warn(
|
||||||
|
"timed out waiting for read index response (local node might have slow network)",
|
||||||
|
zap.Duration("timeout", s.Cfg.ReqTimeout()),
|
||||||
|
)
|
||||||
|
slowReadIndex.Inc()
|
||||||
|
return 0, ErrTimeout
|
||||||
|
case <-s.stopping:
|
||||||
|
return 0, ErrStopped
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func uint64ToBigEndianBytes(number uint64) []byte {
|
||||||
|
byteResult := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint64(byteResult, number)
|
||||||
|
return byteResult
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *EtcdServer) sendReadIndex(requestIndex uint64) error {
|
||||||
|
ctxToSend := uint64ToBigEndianBytes(requestIndex)
|
||||||
|
|
||||||
|
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||||
|
err := s.r.ReadIndex(cctx, ctxToSend)
|
||||||
|
cancel()
|
||||||
|
if err == raft.ErrStopped {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
lg := s.Logger()
|
||||||
|
lg.Warn("failed to get read index from Raft", zap.Error(err))
|
||||||
|
readIndexFailed.Inc()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *EtcdServer) LinearizableReadNotify(ctx context.Context) error {
|
func (s *EtcdServer) LinearizableReadNotify(ctx context.Context) error {
|
||||||
return s.linearizableReadNotify(ctx)
|
return s.linearizableReadNotify(ctx)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user