etcdserver: resend ReadIndex request on empty apply request

Empty apply indicates first commit in current term. It is first time when follower is sure, that it's ReadIndex request can be processed.
This commit is contained in:
wpedrak 2021-03-23 16:39:05 +01:00
parent bad0b4d513
commit 3d485faac5
2 changed files with 47 additions and 11 deletions

View File

@ -291,6 +291,9 @@ type EtcdServer struct {
leadTimeMu sync.RWMutex
leadElectedTime time.Time
firstCommitInTermMu sync.RWMutex
firstCommitInTermC chan struct{}
*AccessController
}
@ -517,17 +520,18 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
storage: NewStorage(w, ss),
},
),
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
id: id,
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
cluster: cl,
stats: sstats,
lstats: lstats,
SyncTicker: time.NewTicker(500 * time.Millisecond),
peerRt: prt,
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
forceVersionC: make(chan struct{}),
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
firstCommitInTermC: make(chan struct{}),
}
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
@ -1770,6 +1774,16 @@ func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} {
return s.leaderChanged
}
// FirstCommitInTermNotify returns channel that will be unlocked on first
// entry committed in new term, which is necessary for new leader to answer
// read-only requests (leader is not able to respond any read-only requests
// as long as linearizable semantic is required)
func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} {
s.firstCommitInTermMu.RLock()
defer s.firstCommitInTermMu.RUnlock()
return s.firstCommitInTermC
}
// RaftStatusGetter represents etcd server and Raft progress.
type RaftStatusGetter interface {
ID() types.ID
@ -2068,6 +2082,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
// raft state machine may generate noop entry when leader confirmation.
// skip it in advance to avoid some potential bug in the future
if len(e.Data) == 0 {
s.notifyAboutFirstCommitInTerm()
select {
case s.forceVersionC <- struct{}{}:
default:
@ -2140,6 +2156,15 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
})
}
func (s *EtcdServer) notifyAboutFirstCommitInTerm() {
newNotifier := make(chan struct{})
s.firstCommitInTermMu.Lock()
notifierToClose := s.firstCommitInTermC
s.firstCommitInTermC = newNotifier
s.firstCommitInTermMu.Unlock()
close(notifierToClose)
}
// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {

View File

@ -775,6 +775,8 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
retryTimer := time.NewTimer(readIndexRetryTime)
defer retryTimer.Stop()
firstCommitInTermNotifier := s.FirstCommitInTermNotify()
for {
select {
case rs := <-s.r.readStateC:
@ -800,6 +802,15 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
readIndexFailed.Inc()
// return a retryable error.
return 0, ErrLeaderChanged
case <-firstCommitInTermNotifier:
firstCommitInTermNotifier = s.FirstCommitInTermNotify()
lg.Info("first commit in current term: resending ReadIndex request")
err := s.sendReadIndex(requestId)
if err != nil {
return 0, err
}
retryTimer.Reset(readIndexRetryTime)
continue
case <-retryTimer.C:
lg.Warn(
"waiting for ReadIndex response took too long, retrying",