diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 7aee4d0b9..96b959f4d 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -291,6 +291,9 @@ type EtcdServer struct { leadTimeMu sync.RWMutex leadElectedTime time.Time + firstCommitInTermMu sync.RWMutex + firstCommitInTermC chan struct{} + *AccessController } @@ -517,17 +520,18 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { storage: NewStorage(w, ss), }, ), - id: id, - attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: cl, - stats: sstats, - lstats: lstats, - SyncTicker: time.NewTicker(500 * time.Millisecond), - peerRt: prt, - reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), - forceVersionC: make(chan struct{}), - AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, - consistIndex: cindex.NewConsistentIndex(be.BatchTx()), + id: id, + attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, + cluster: cl, + stats: sstats, + lstats: lstats, + SyncTicker: time.NewTicker(500 * time.Millisecond), + peerRt: prt, + reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), + forceVersionC: make(chan struct{}), + AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, + consistIndex: cindex.NewConsistentIndex(be.BatchTx()), + firstCommitInTermC: make(chan struct{}), } serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) @@ -1770,6 +1774,16 @@ func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} { return s.leaderChanged } +// FirstCommitInTermNotify returns channel that will be unlocked on first +// entry committed in new term, which is necessary for new leader to answer +// read-only requests (leader is not able to respond any read-only requests +// as long as linearizable semantic is required) +func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} { + s.firstCommitInTermMu.RLock() + defer s.firstCommitInTermMu.RUnlock() + return s.firstCommitInTermC +} + // RaftStatusGetter represents etcd server and Raft progress. type RaftStatusGetter interface { ID() types.ID @@ -2068,6 +2082,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { // raft state machine may generate noop entry when leader confirmation. // skip it in advance to avoid some potential bug in the future if len(e.Data) == 0 { + s.notifyAboutFirstCommitInTerm() + select { case s.forceVersionC <- struct{}{}: default: @@ -2140,6 +2156,15 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { }) } +func (s *EtcdServer) notifyAboutFirstCommitInTerm() { + newNotifier := make(chan struct{}) + s.firstCommitInTermMu.Lock() + notifierToClose := s.firstCommitInTermC + s.firstCommitInTermC = newNotifier + s.firstCommitInTermMu.Unlock() + close(notifierToClose) +} + // applyConfChange applies a ConfChange to the server. It is only // invoked with a ConfChange that has already passed through Raft func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index b06c2e149..442288a6e 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -775,6 +775,8 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, retryTimer := time.NewTimer(readIndexRetryTime) defer retryTimer.Stop() + firstCommitInTermNotifier := s.FirstCommitInTermNotify() + for { select { case rs := <-s.r.readStateC: @@ -800,6 +802,15 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, readIndexFailed.Inc() // return a retryable error. return 0, ErrLeaderChanged + case <-firstCommitInTermNotifier: + firstCommitInTermNotifier = s.FirstCommitInTermNotify() + lg.Info("first commit in current term: resending ReadIndex request") + err := s.sendReadIndex(requestId) + if err != nil { + return 0, err + } + retryTimer.Reset(readIndexRetryTime) + continue case <-retryTimer.C: lg.Warn( "waiting for ReadIndex response took too long, retrying",