mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #14269 from ahrtr/3.4_resend_readindex
[3.4] etcdserver: resend ReadIndex request on empty apply request
This commit is contained in:
commit
2c778eebf7
@ -259,10 +259,6 @@ type EtcdServer struct {
|
||||
peerRt http.RoundTripper
|
||||
reqIDGen *idutil.Generator
|
||||
|
||||
// forceVersionC is used to force the version monitor loop
|
||||
// to detect the cluster version immediately.
|
||||
forceVersionC chan struct{}
|
||||
|
||||
// wgMu blocks concurrent waitgroup mutation while server stopping
|
||||
wgMu sync.RWMutex
|
||||
// wg is used to wait for the go routines that depends on the server state
|
||||
@ -277,6 +273,9 @@ type EtcdServer struct {
|
||||
leadTimeMu sync.RWMutex
|
||||
leadElectedTime time.Time
|
||||
|
||||
firstCommitInTermMu sync.RWMutex
|
||||
firstCommitInTermC chan struct{}
|
||||
|
||||
*AccessController
|
||||
}
|
||||
|
||||
@ -532,16 +531,16 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
||||
storage: NewStorage(w, ss),
|
||||
},
|
||||
),
|
||||
id: id,
|
||||
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
cluster: cl,
|
||||
stats: sstats,
|
||||
lstats: lstats,
|
||||
SyncTicker: time.NewTicker(500 * time.Millisecond),
|
||||
peerRt: prt,
|
||||
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
|
||||
forceVersionC: make(chan struct{}),
|
||||
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
|
||||
id: id,
|
||||
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
cluster: cl,
|
||||
stats: sstats,
|
||||
lstats: lstats,
|
||||
SyncTicker: time.NewTicker(500 * time.Millisecond),
|
||||
peerRt: prt,
|
||||
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
|
||||
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
|
||||
firstCommitInTermC: make(chan struct{}),
|
||||
}
|
||||
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
|
||||
|
||||
@ -1935,6 +1934,16 @@ func (s *EtcdServer) leaderChangedNotify() <-chan struct{} {
|
||||
return s.leaderChanged
|
||||
}
|
||||
|
||||
// FirstCommitInTermNotify returns channel that will be unlocked on first
|
||||
// entry committed in new term, which is necessary for new leader to answer
|
||||
// read-only requests (leader is not able to respond any read-only requests
|
||||
// as long as linearizable semantic is required)
|
||||
func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} {
|
||||
s.firstCommitInTermMu.RLock()
|
||||
defer s.firstCommitInTermMu.RUnlock()
|
||||
return s.firstCommitInTermC
|
||||
}
|
||||
|
||||
// RaftStatusGetter represents etcd server and Raft progress.
|
||||
type RaftStatusGetter interface {
|
||||
ID() types.ID
|
||||
@ -2202,10 +2211,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
// raft state machine may generate noop entry when leader confirmation.
|
||||
// skip it in advance to avoid some potential bug in the future
|
||||
if len(e.Data) == 0 {
|
||||
select {
|
||||
case s.forceVersionC <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
s.notifyAboutFirstCommitInTerm()
|
||||
|
||||
// promote lessor when the local member is leader and finished
|
||||
// applying all entries from the last term.
|
||||
if s.isLeader() {
|
||||
@ -2278,6 +2285,15 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *EtcdServer) notifyAboutFirstCommitInTerm() {
|
||||
newNotifier := make(chan struct{})
|
||||
s.firstCommitInTermMu.Lock()
|
||||
notifierToClose := s.firstCommitInTermC
|
||||
s.firstCommitInTermC = newNotifier
|
||||
s.firstCommitInTermMu.Unlock()
|
||||
close(notifierToClose)
|
||||
}
|
||||
|
||||
// applyConfChange applies a ConfChange to the server. It is only
|
||||
// invoked with a ConfChange that has already passed through Raft
|
||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
|
||||
@ -2504,7 +2520,7 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
|
||||
func (s *EtcdServer) monitorVersions() {
|
||||
for {
|
||||
select {
|
||||
case <-s.forceVersionC:
|
||||
case <-s.FirstCommitInTermNotify():
|
||||
case <-time.After(monitorVersionInterval):
|
||||
case <-s.stopping:
|
||||
return
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/etcd/auth"
|
||||
@ -40,6 +41,7 @@ const (
|
||||
// We should stop accepting new proposals if the gap growing to a certain point.
|
||||
maxGapBetweenApplyAndCommitIndex = 5000
|
||||
traceThreshold = 100 * time.Millisecond
|
||||
readIndexRetryTime = 500 * time.Millisecond
|
||||
|
||||
// The timeout for the node to catch up its applied index, and is used in
|
||||
// lease related operations, such as LeaseRenew and LeaseTimeToLive.
|
||||
@ -695,12 +697,8 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
|
||||
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }
|
||||
|
||||
func (s *EtcdServer) linearizableReadLoop() {
|
||||
var rs raft.ReadState
|
||||
|
||||
for {
|
||||
ctxToSend := make([]byte, 8)
|
||||
id1 := s.reqIDGen.Next()
|
||||
binary.BigEndian.PutUint64(ctxToSend, id1)
|
||||
requestId := s.reqIDGen.Next()
|
||||
leaderChangedNotifier := s.leaderChangedNotify()
|
||||
select {
|
||||
case <-leaderChangedNotifier:
|
||||
@ -712,95 +710,38 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
|
||||
// as a single loop is can unlock multiple reads, it is not very useful
|
||||
// to propagate the trace from Txn or Range.
|
||||
trace := traceutil.New("linearizableReadLoop", s.getLogger())
|
||||
nextnr := newNotifier()
|
||||
trace := traceutil.New("linearizableReadLoop", s.Logger())
|
||||
|
||||
nextnr := newNotifier()
|
||||
s.readMu.Lock()
|
||||
nr := s.readNotifier
|
||||
s.readNotifier = nextnr
|
||||
s.readMu.Unlock()
|
||||
|
||||
lg := s.getLogger()
|
||||
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
|
||||
cancel()
|
||||
if err == raft.ErrStopped {
|
||||
return
|
||||
}
|
||||
if lg != nil {
|
||||
lg.Warn("failed to get read index from Raft", zap.Error(err))
|
||||
} else {
|
||||
plog.Errorf("failed to get read index from raft: %v", err)
|
||||
}
|
||||
readIndexFailed.Inc()
|
||||
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
|
||||
if isStopped(err) {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
nr.notify(err)
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
|
||||
var (
|
||||
timeout bool
|
||||
done bool
|
||||
)
|
||||
for !timeout && !done {
|
||||
select {
|
||||
case rs = <-s.r.readStateC:
|
||||
done = bytes.Equal(rs.RequestCtx, ctxToSend)
|
||||
if !done {
|
||||
// a previous request might time out. now we should ignore the response of it and
|
||||
// continue waiting for the response of the current requests.
|
||||
id2 := uint64(0)
|
||||
if len(rs.RequestCtx) == 8 {
|
||||
id2 = binary.BigEndian.Uint64(rs.RequestCtx)
|
||||
}
|
||||
if lg != nil {
|
||||
lg.Warn(
|
||||
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
|
||||
zap.Uint64("sent-request-id", id1),
|
||||
zap.Uint64("received-request-id", id2),
|
||||
)
|
||||
} else {
|
||||
plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2)
|
||||
}
|
||||
slowReadIndex.Inc()
|
||||
}
|
||||
case <-leaderChangedNotifier:
|
||||
timeout = true
|
||||
readIndexFailed.Inc()
|
||||
// return a retryable error.
|
||||
nr.notify(ErrLeaderChanged)
|
||||
case <-time.After(s.Cfg.ReqTimeout()):
|
||||
if lg != nil {
|
||||
lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))
|
||||
} else {
|
||||
plog.Warningf("timed out waiting for read index response (local node might have slow network)")
|
||||
}
|
||||
nr.notify(ErrTimeout)
|
||||
timeout = true
|
||||
slowReadIndex.Inc()
|
||||
case <-s.stopping:
|
||||
return
|
||||
}
|
||||
}
|
||||
if !done {
|
||||
continue
|
||||
}
|
||||
trace.Step("read index received")
|
||||
|
||||
index := rs.Index
|
||||
trace.AddField(traceutil.Field{Key: "readStateIndex", Value: index})
|
||||
trace.AddField(traceutil.Field{Key: "readStateIndex", Value: confirmedIndex})
|
||||
|
||||
ai := s.getAppliedIndex()
|
||||
trace.AddField(traceutil.Field{Key: "appliedIndex", Value: ai})
|
||||
appliedIndex := s.getAppliedIndex()
|
||||
trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(appliedIndex, 10)})
|
||||
|
||||
if ai < index {
|
||||
if appliedIndex < confirmedIndex {
|
||||
select {
|
||||
case <-s.applyWait.Wait(index):
|
||||
case <-s.applyWait.Wait(confirmedIndex):
|
||||
case <-s.stopping:
|
||||
return
|
||||
}
|
||||
}
|
||||
// unblock all l-reads requested at indices before rs.Index
|
||||
// unblock all l-reads requested at indices before confirmedIndex
|
||||
nr.notify(nil)
|
||||
trace.Step("applied index is now lower than readState.Index")
|
||||
|
||||
@ -808,6 +749,107 @@ func (s *EtcdServer) linearizableReadLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func isStopped(err error) bool {
|
||||
return err == raft.ErrStopped || err == ErrStopped
|
||||
}
|
||||
|
||||
func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
|
||||
err := s.sendReadIndex(requestId)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
lg := s.Logger()
|
||||
errorTimer := time.NewTimer(s.Cfg.ReqTimeout())
|
||||
defer errorTimer.Stop()
|
||||
retryTimer := time.NewTimer(readIndexRetryTime)
|
||||
defer retryTimer.Stop()
|
||||
|
||||
firstCommitInTermNotifier := s.FirstCommitInTermNotify()
|
||||
|
||||
for {
|
||||
select {
|
||||
case rs := <-s.r.readStateC:
|
||||
requestIdBytes := uint64ToBigEndianBytes(requestId)
|
||||
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
|
||||
if !gotOwnResponse {
|
||||
// a previous request might time out. now we should ignore the response of it and
|
||||
// continue waiting for the response of the current requests.
|
||||
responseId := uint64(0)
|
||||
if len(rs.RequestCtx) == 8 {
|
||||
responseId = binary.BigEndian.Uint64(rs.RequestCtx)
|
||||
}
|
||||
lg.Warn(
|
||||
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
|
||||
zap.Uint64("sent-request-id", requestId),
|
||||
zap.Uint64("received-request-id", responseId),
|
||||
)
|
||||
slowReadIndex.Inc()
|
||||
continue
|
||||
}
|
||||
return rs.Index, nil
|
||||
case <-leaderChangedNotifier:
|
||||
readIndexFailed.Inc()
|
||||
// return a retryable error.
|
||||
return 0, ErrLeaderChanged
|
||||
case <-firstCommitInTermNotifier:
|
||||
firstCommitInTermNotifier = s.FirstCommitInTermNotify()
|
||||
lg.Info("first commit in current term: resending ReadIndex request")
|
||||
err := s.sendReadIndex(requestId)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
retryTimer.Reset(readIndexRetryTime)
|
||||
continue
|
||||
case <-retryTimer.C:
|
||||
lg.Warn(
|
||||
"waiting for ReadIndex response took too long, retrying",
|
||||
zap.Uint64("sent-request-id", requestId),
|
||||
zap.Duration("retry-timeout", readIndexRetryTime),
|
||||
)
|
||||
err := s.sendReadIndex(requestId)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
retryTimer.Reset(readIndexRetryTime)
|
||||
continue
|
||||
case <-errorTimer.C:
|
||||
lg.Warn(
|
||||
"timed out waiting for read index response (local node might have slow network)",
|
||||
zap.Duration("timeout", s.Cfg.ReqTimeout()),
|
||||
)
|
||||
slowReadIndex.Inc()
|
||||
return 0, ErrTimeout
|
||||
case <-s.stopping:
|
||||
return 0, ErrStopped
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func uint64ToBigEndianBytes(number uint64) []byte {
|
||||
byteResult := make([]byte, 8)
|
||||
binary.BigEndian.PutUint64(byteResult, number)
|
||||
return byteResult
|
||||
}
|
||||
|
||||
func (s *EtcdServer) sendReadIndex(requestIndex uint64) error {
|
||||
ctxToSend := uint64ToBigEndianBytes(requestIndex)
|
||||
|
||||
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
|
||||
err := s.r.ReadIndex(cctx, ctxToSend)
|
||||
cancel()
|
||||
if err == raft.ErrStopped {
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
lg := s.Logger()
|
||||
lg.Warn("failed to get read index from Raft", zap.Error(err))
|
||||
readIndexFailed.Inc()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
|
||||
s.readMu.RLock()
|
||||
nc := s.readNotifier
|
||||
|
Loading…
x
Reference in New Issue
Block a user