From f53db9b246826f472e8c23c818d74a0e680c8189 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Mon, 25 Jul 2022 09:21:31 +0800 Subject: [PATCH] etcdserver: resend ReadIndex request on empty apply request Backport https://github.com/etcd-io/etcd/pull/12795 to 3.4 Signed-off-by: Benjamin Wang --- etcdserver/server.go | 54 +++++++---- etcdserver/v3_server.go | 192 ++++++++++++++++++++++++---------------- 2 files changed, 152 insertions(+), 94 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index d32a37e0a..808f8c9df 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -259,10 +259,6 @@ type EtcdServer struct { peerRt http.RoundTripper reqIDGen *idutil.Generator - // forceVersionC is used to force the version monitor loop - // to detect the cluster version immediately. - forceVersionC chan struct{} - // wgMu blocks concurrent waitgroup mutation while server stopping wgMu sync.RWMutex // wg is used to wait for the go routines that depends on the server state @@ -277,6 +273,9 @@ type EtcdServer struct { leadTimeMu sync.RWMutex leadElectedTime time.Time + firstCommitInTermMu sync.RWMutex + firstCommitInTermC chan struct{} + *AccessController } @@ -532,16 +531,16 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { storage: NewStorage(w, ss), }, ), - id: id, - attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: cl, - stats: sstats, - lstats: lstats, - SyncTicker: time.NewTicker(500 * time.Millisecond), - peerRt: prt, - reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), - forceVersionC: make(chan struct{}), - AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, + id: id, + attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, + cluster: cl, + stats: sstats, + lstats: lstats, + SyncTicker: time.NewTicker(500 * time.Millisecond), + peerRt: prt, + reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), + AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, + firstCommitInTermC: make(chan struct{}), } serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) @@ -1935,6 +1934,16 @@ func (s *EtcdServer) leaderChangedNotify() <-chan struct{} { return s.leaderChanged } +// FirstCommitInTermNotify returns channel that will be unlocked on first +// entry committed in new term, which is necessary for new leader to answer +// read-only requests (leader is not able to respond any read-only requests +// as long as linearizable semantic is required) +func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} { + s.firstCommitInTermMu.RLock() + defer s.firstCommitInTermMu.RUnlock() + return s.firstCommitInTermC +} + // RaftStatusGetter represents etcd server and Raft progress. type RaftStatusGetter interface { ID() types.ID @@ -2202,10 +2211,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { // raft state machine may generate noop entry when leader confirmation. // skip it in advance to avoid some potential bug in the future if len(e.Data) == 0 { - select { - case s.forceVersionC <- struct{}{}: - default: - } + s.notifyAboutFirstCommitInTerm() + // promote lessor when the local member is leader and finished // applying all entries from the last term. if s.isLeader() { @@ -2278,6 +2285,15 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { }) } +func (s *EtcdServer) notifyAboutFirstCommitInTerm() { + newNotifier := make(chan struct{}) + s.firstCommitInTermMu.Lock() + notifierToClose := s.firstCommitInTermC + s.firstCommitInTermC = newNotifier + s.firstCommitInTermMu.Unlock() + close(notifierToClose) +} + // applyConfChange applies a ConfChange to the server. It is only // invoked with a ConfChange that has already passed through Raft func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { @@ -2504,7 +2520,7 @@ func (s *EtcdServer) ClusterVersion() *semver.Version { func (s *EtcdServer) monitorVersions() { for { select { - case <-s.forceVersionC: + case <-s.FirstCommitInTermNotify(): case <-time.After(monitorVersionInterval): case <-s.stopping: return diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index fdbce738c..30007c007 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "encoding/binary" + "strconv" "time" "go.etcd.io/etcd/auth" @@ -40,6 +41,7 @@ const ( // We should stop accepting new proposals if the gap growing to a certain point. maxGapBetweenApplyAndCommitIndex = 5000 traceThreshold = 100 * time.Millisecond + readIndexRetryTime = 500 * time.Millisecond // The timeout for the node to catch up its applied index, and is used in // lease related operations, such as LeaseRenew and LeaseTimeToLive. @@ -695,12 +697,8 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() } func (s *EtcdServer) linearizableReadLoop() { - var rs raft.ReadState - for { - ctxToSend := make([]byte, 8) - id1 := s.reqIDGen.Next() - binary.BigEndian.PutUint64(ctxToSend, id1) + requestId := s.reqIDGen.Next() leaderChangedNotifier := s.leaderChangedNotify() select { case <-leaderChangedNotifier: @@ -712,95 +710,38 @@ func (s *EtcdServer) linearizableReadLoop() { // as a single loop is can unlock multiple reads, it is not very useful // to propagate the trace from Txn or Range. - trace := traceutil.New("linearizableReadLoop", s.getLogger()) - nextnr := newNotifier() + trace := traceutil.New("linearizableReadLoop", s.Logger()) + nextnr := newNotifier() s.readMu.Lock() nr := s.readNotifier s.readNotifier = nextnr s.readMu.Unlock() - lg := s.getLogger() - cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) - if err := s.r.ReadIndex(cctx, ctxToSend); err != nil { - cancel() - if err == raft.ErrStopped { - return - } - if lg != nil { - lg.Warn("failed to get read index from Raft", zap.Error(err)) - } else { - plog.Errorf("failed to get read index from raft: %v", err) - } - readIndexFailed.Inc() + confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId) + if isStopped(err) { + return + } + if err != nil { nr.notify(err) continue } - cancel() - var ( - timeout bool - done bool - ) - for !timeout && !done { - select { - case rs = <-s.r.readStateC: - done = bytes.Equal(rs.RequestCtx, ctxToSend) - if !done { - // a previous request might time out. now we should ignore the response of it and - // continue waiting for the response of the current requests. - id2 := uint64(0) - if len(rs.RequestCtx) == 8 { - id2 = binary.BigEndian.Uint64(rs.RequestCtx) - } - if lg != nil { - lg.Warn( - "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader", - zap.Uint64("sent-request-id", id1), - zap.Uint64("received-request-id", id2), - ) - } else { - plog.Warningf("ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader (request ID want %d, got %d)", id1, id2) - } - slowReadIndex.Inc() - } - case <-leaderChangedNotifier: - timeout = true - readIndexFailed.Inc() - // return a retryable error. - nr.notify(ErrLeaderChanged) - case <-time.After(s.Cfg.ReqTimeout()): - if lg != nil { - lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout())) - } else { - plog.Warningf("timed out waiting for read index response (local node might have slow network)") - } - nr.notify(ErrTimeout) - timeout = true - slowReadIndex.Inc() - case <-s.stopping: - return - } - } - if !done { - continue - } trace.Step("read index received") - index := rs.Index - trace.AddField(traceutil.Field{Key: "readStateIndex", Value: index}) + trace.AddField(traceutil.Field{Key: "readStateIndex", Value: confirmedIndex}) - ai := s.getAppliedIndex() - trace.AddField(traceutil.Field{Key: "appliedIndex", Value: ai}) + appliedIndex := s.getAppliedIndex() + trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(appliedIndex, 10)}) - if ai < index { + if appliedIndex < confirmedIndex { select { - case <-s.applyWait.Wait(index): + case <-s.applyWait.Wait(confirmedIndex): case <-s.stopping: return } } - // unblock all l-reads requested at indices before rs.Index + // unblock all l-reads requested at indices before confirmedIndex nr.notify(nil) trace.Step("applied index is now lower than readState.Index") @@ -808,6 +749,107 @@ func (s *EtcdServer) linearizableReadLoop() { } } +func isStopped(err error) bool { + return err == raft.ErrStopped || err == ErrStopped +} + +func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) { + err := s.sendReadIndex(requestId) + if err != nil { + return 0, err + } + + lg := s.Logger() + errorTimer := time.NewTimer(s.Cfg.ReqTimeout()) + defer errorTimer.Stop() + retryTimer := time.NewTimer(readIndexRetryTime) + defer retryTimer.Stop() + + firstCommitInTermNotifier := s.FirstCommitInTermNotify() + + for { + select { + case rs := <-s.r.readStateC: + requestIdBytes := uint64ToBigEndianBytes(requestId) + gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes) + if !gotOwnResponse { + // a previous request might time out. now we should ignore the response of it and + // continue waiting for the response of the current requests. + responseId := uint64(0) + if len(rs.RequestCtx) == 8 { + responseId = binary.BigEndian.Uint64(rs.RequestCtx) + } + lg.Warn( + "ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader", + zap.Uint64("sent-request-id", requestId), + zap.Uint64("received-request-id", responseId), + ) + slowReadIndex.Inc() + continue + } + return rs.Index, nil + case <-leaderChangedNotifier: + readIndexFailed.Inc() + // return a retryable error. + return 0, ErrLeaderChanged + case <-firstCommitInTermNotifier: + firstCommitInTermNotifier = s.FirstCommitInTermNotify() + lg.Info("first commit in current term: resending ReadIndex request") + err := s.sendReadIndex(requestId) + if err != nil { + return 0, err + } + retryTimer.Reset(readIndexRetryTime) + continue + case <-retryTimer.C: + lg.Warn( + "waiting for ReadIndex response took too long, retrying", + zap.Uint64("sent-request-id", requestId), + zap.Duration("retry-timeout", readIndexRetryTime), + ) + err := s.sendReadIndex(requestId) + if err != nil { + return 0, err + } + retryTimer.Reset(readIndexRetryTime) + continue + case <-errorTimer.C: + lg.Warn( + "timed out waiting for read index response (local node might have slow network)", + zap.Duration("timeout", s.Cfg.ReqTimeout()), + ) + slowReadIndex.Inc() + return 0, ErrTimeout + case <-s.stopping: + return 0, ErrStopped + } + } +} + +func uint64ToBigEndianBytes(number uint64) []byte { + byteResult := make([]byte, 8) + binary.BigEndian.PutUint64(byteResult, number) + return byteResult +} + +func (s *EtcdServer) sendReadIndex(requestIndex uint64) error { + ctxToSend := uint64ToBigEndianBytes(requestIndex) + + cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout()) + err := s.r.ReadIndex(cctx, ctxToSend) + cancel() + if err == raft.ErrStopped { + return err + } + if err != nil { + lg := s.Logger() + lg.Warn("failed to get read index from Raft", zap.Error(err)) + readIndexFailed.Inc() + return err + } + return nil +} + func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error { s.readMu.RLock() nc := s.readNotifier