diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 7aee4d0b9..71016e9f7 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -273,10 +273,6 @@ type EtcdServer struct { peerRt http.RoundTripper reqIDGen *idutil.Generator - // forceVersionC is used to force the version monitor loop - // to detect the cluster version immediately. - forceVersionC chan struct{} - // wgMu blocks concurrent waitgroup mutation while server stopping wgMu sync.RWMutex // wg is used to wait for the goroutines that depends on the server state @@ -291,6 +287,9 @@ type EtcdServer struct { leadTimeMu sync.RWMutex leadElectedTime time.Time + firstCommitInTermMu sync.RWMutex + firstCommitInTermC chan struct{} + *AccessController } @@ -517,17 +516,17 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { storage: NewStorage(w, ss), }, ), - id: id, - attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, - cluster: cl, - stats: sstats, - lstats: lstats, - SyncTicker: time.NewTicker(500 * time.Millisecond), - peerRt: prt, - reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), - forceVersionC: make(chan struct{}), - AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, - consistIndex: cindex.NewConsistentIndex(be.BatchTx()), + id: id, + attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()}, + cluster: cl, + stats: sstats, + lstats: lstats, + SyncTicker: time.NewTicker(500 * time.Millisecond), + peerRt: prt, + reqIDGen: idutil.NewGenerator(uint16(id), time.Now()), + AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist}, + consistIndex: cindex.NewConsistentIndex(be.BatchTx()), + firstCommitInTermC: make(chan struct{}), } serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1) @@ -1770,6 +1769,16 @@ func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} { return s.leaderChanged } +// FirstCommitInTermNotify returns channel that will be unlocked on first +// entry committed in new term, which is necessary for new leader to answer +// read-only requests (leader is not able to respond any read-only requests +// as long as linearizable semantic is required) +func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} { + s.firstCommitInTermMu.RLock() + defer s.firstCommitInTermMu.RUnlock() + return s.firstCommitInTermC +} + // RaftStatusGetter represents etcd server and Raft progress. type RaftStatusGetter interface { ID() types.ID @@ -2068,10 +2077,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { // raft state machine may generate noop entry when leader confirmation. // skip it in advance to avoid some potential bug in the future if len(e.Data) == 0 { - select { - case s.forceVersionC <- struct{}{}: - default: - } + s.notifyAboutFirstCommitInTerm() + // promote lessor when the local member is leader and finished // applying all entries from the last term. if s.isLeader() { @@ -2140,6 +2147,15 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { }) } +func (s *EtcdServer) notifyAboutFirstCommitInTerm() { + newNotifier := make(chan struct{}) + s.firstCommitInTermMu.Lock() + notifierToClose := s.firstCommitInTermC + s.firstCommitInTermC = newNotifier + s.firstCommitInTermMu.Unlock() + close(notifierToClose) +} + // applyConfChange applies a ConfChange to the server. It is only // invoked with a ConfChange that has already passed through Raft func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) { @@ -2319,7 +2335,7 @@ func (s *EtcdServer) ClusterVersion() *semver.Version { func (s *EtcdServer) monitorVersions() { for { select { - case <-s.forceVersionC: + case <-s.FirstCommitInTermNotify(): case <-time.After(monitorVersionInterval): case <-s.stopping: return diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index b06c2e149..442288a6e 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -775,6 +775,8 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, retryTimer := time.NewTimer(readIndexRetryTime) defer retryTimer.Stop() + firstCommitInTermNotifier := s.FirstCommitInTermNotify() + for { select { case rs := <-s.r.readStateC: @@ -800,6 +802,15 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, readIndexFailed.Inc() // return a retryable error. return 0, ErrLeaderChanged + case <-firstCommitInTermNotifier: + firstCommitInTermNotifier = s.FirstCommitInTermNotify() + lg.Info("first commit in current term: resending ReadIndex request") + err := s.sendReadIndex(requestId) + if err != nil { + return 0, err + } + retryTimer.Reset(readIndexRetryTime) + continue case <-retryTimer.C: lg.Warn( "waiting for ReadIndex response took too long, retrying", diff --git a/tests/go.mod b/tests/go.mod index 4a43b433c..1e28eb328 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -37,6 +37,7 @@ require ( go.etcd.io/etcd/server/v3 v3.5.0-alpha.0 go.uber.org/zap v1.16.0 golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba google.golang.org/grpc v1.36.1 gopkg.in/yaml.v2 v2.3.0 diff --git a/tests/go.sum b/tests/go.sum index e8e94809b..891b40c59 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -367,6 +367,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/tests/integration/v3_leadership_test.go b/tests/integration/v3_leadership_test.go index 7fd5ff1ca..72aaf46df 100644 --- a/tests/integration/v3_leadership_test.go +++ b/tests/integration/v3_leadership_test.go @@ -16,12 +16,14 @@ package integration import ( "context" + "fmt" "strings" "testing" "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "golang.org/x/sync/errgroup" ) func TestMoveLeader(t *testing.T) { testMoveLeader(t, true) } @@ -36,7 +38,7 @@ func testMoveLeader(t *testing.T, auto bool) { oldLeadIdx := clus.WaitLeader(t) oldLeadID := uint64(clus.Members[oldLeadIdx].s.ID()) - // ensure followers go through leader transition while learship transfer + // ensure followers go through leader transition while leadership transfer idc := make(chan uint64) stopc := make(chan struct{}) defer close(stopc) @@ -179,3 +181,81 @@ func TestTransferLeadershipWithLearner(t *testing.T) { t.Error("timed out waiting for leader transition") } } + +func TestFirstCommitNotification(t *testing.T) { + BeforeTest(t) + clusterSize := 3 + cluster := NewClusterV3(t, &ClusterConfig{Size: clusterSize}) + defer cluster.Terminate(t) + + oldLeaderIdx := cluster.WaitLeader(t) + oldLeaderClient := cluster.Client(oldLeaderIdx) + + newLeaderIdx := (oldLeaderIdx + 1) % clusterSize + newLeaderId := uint64(cluster.Members[newLeaderIdx].ID()) + + notifiers := make(map[int]<-chan struct{}, clusterSize) + for i, clusterMember := range cluster.Members { + notifiers[i] = clusterMember.s.FirstCommitInTermNotify() + } + + _, err := oldLeaderClient.MoveLeader(context.Background(), newLeaderId) + + if err != nil { + t.Errorf("got error during leadership transfer: %v", err) + } + + leaderAppliedIndex := cluster.Members[newLeaderIdx].s.AppliedIndex() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + group, groupContext := errgroup.WithContext(ctx) + + for i, notifier := range notifiers { + member, notifier := cluster.Members[i], notifier + group.Go(func() error { + return checkFirstCommitNotification(groupContext, member, leaderAppliedIndex, notifier) + }) + } + + err = group.Wait() + if err != nil { + t.Error(err) + } +} + +func checkFirstCommitNotification( + ctx context.Context, + member *member, + leaderAppliedIndex uint64, + notifier <-chan struct{}, +) error { + // wait until server applies all the changes of leader + for member.s.AppliedIndex() < leaderAppliedIndex { + select { + case <-ctx.Done(): + return ctx.Err() + default: + time.Sleep(100 * time.Millisecond) + } + } + + select { + case msg, ok := <-notifier: + if ok { + return fmt.Errorf( + "member with ID %d got message via notifier, msg: %v", + member.ID(), + msg, + ) + } + default: + return fmt.Errorf( + "notification was not triggered, member ID: %d", + member.ID(), + ) + } + + return nil +}