mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #12795 from wpedrak/resend-read-index-on-first-commit-in-term
etcdserver: resend ReadIndex request on empty apply request
This commit is contained in:
commit
7f97dfd45a
@ -273,10 +273,6 @@ type EtcdServer struct {
|
||||
peerRt http.RoundTripper
|
||||
reqIDGen *idutil.Generator
|
||||
|
||||
// forceVersionC is used to force the version monitor loop
|
||||
// to detect the cluster version immediately.
|
||||
forceVersionC chan struct{}
|
||||
|
||||
// wgMu blocks concurrent waitgroup mutation while server stopping
|
||||
wgMu sync.RWMutex
|
||||
// wg is used to wait for the goroutines that depends on the server state
|
||||
@ -291,6 +287,9 @@ type EtcdServer struct {
|
||||
leadTimeMu sync.RWMutex
|
||||
leadElectedTime time.Time
|
||||
|
||||
firstCommitInTermMu sync.RWMutex
|
||||
firstCommitInTermC chan struct{}
|
||||
|
||||
*AccessController
|
||||
}
|
||||
|
||||
@ -517,17 +516,17 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
storage: NewStorage(w, ss),
|
||||
},
|
||||
),
|
||||
id: id,
|
||||
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
cluster: cl,
|
||||
stats: sstats,
|
||||
lstats: lstats,
|
||||
SyncTicker: time.NewTicker(500 * time.Millisecond),
|
||||
peerRt: prt,
|
||||
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
|
||||
forceVersionC: make(chan struct{}),
|
||||
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
|
||||
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
|
||||
id: id,
|
||||
attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
|
||||
cluster: cl,
|
||||
stats: sstats,
|
||||
lstats: lstats,
|
||||
SyncTicker: time.NewTicker(500 * time.Millisecond),
|
||||
peerRt: prt,
|
||||
reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
|
||||
AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
|
||||
consistIndex: cindex.NewConsistentIndex(be.BatchTx()),
|
||||
firstCommitInTermC: make(chan struct{}),
|
||||
}
|
||||
serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
|
||||
|
||||
@ -1770,6 +1769,16 @@ func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} {
|
||||
return s.leaderChanged
|
||||
}
|
||||
|
||||
// FirstCommitInTermNotify returns channel that will be unlocked on first
|
||||
// entry committed in new term, which is necessary for new leader to answer
|
||||
// read-only requests (leader is not able to respond any read-only requests
|
||||
// as long as linearizable semantic is required)
|
||||
func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} {
|
||||
s.firstCommitInTermMu.RLock()
|
||||
defer s.firstCommitInTermMu.RUnlock()
|
||||
return s.firstCommitInTermC
|
||||
}
|
||||
|
||||
// RaftStatusGetter represents etcd server and Raft progress.
|
||||
type RaftStatusGetter interface {
|
||||
ID() types.ID
|
||||
@ -2068,10 +2077,8 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
// raft state machine may generate noop entry when leader confirmation.
|
||||
// skip it in advance to avoid some potential bug in the future
|
||||
if len(e.Data) == 0 {
|
||||
select {
|
||||
case s.forceVersionC <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
s.notifyAboutFirstCommitInTerm()
|
||||
|
||||
// promote lessor when the local member is leader and finished
|
||||
// applying all entries from the last term.
|
||||
if s.isLeader() {
|
||||
@ -2140,6 +2147,15 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *EtcdServer) notifyAboutFirstCommitInTerm() {
|
||||
newNotifier := make(chan struct{})
|
||||
s.firstCommitInTermMu.Lock()
|
||||
notifierToClose := s.firstCommitInTermC
|
||||
s.firstCommitInTermC = newNotifier
|
||||
s.firstCommitInTermMu.Unlock()
|
||||
close(notifierToClose)
|
||||
}
|
||||
|
||||
// applyConfChange applies a ConfChange to the server. It is only
|
||||
// invoked with a ConfChange that has already passed through Raft
|
||||
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
|
||||
@ -2319,7 +2335,7 @@ func (s *EtcdServer) ClusterVersion() *semver.Version {
|
||||
func (s *EtcdServer) monitorVersions() {
|
||||
for {
|
||||
select {
|
||||
case <-s.forceVersionC:
|
||||
case <-s.FirstCommitInTermNotify():
|
||||
case <-time.After(monitorVersionInterval):
|
||||
case <-s.stopping:
|
||||
return
|
||||
|
@ -775,6 +775,8 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
|
||||
retryTimer := time.NewTimer(readIndexRetryTime)
|
||||
defer retryTimer.Stop()
|
||||
|
||||
firstCommitInTermNotifier := s.FirstCommitInTermNotify()
|
||||
|
||||
for {
|
||||
select {
|
||||
case rs := <-s.r.readStateC:
|
||||
@ -800,6 +802,15 @@ func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{},
|
||||
readIndexFailed.Inc()
|
||||
// return a retryable error.
|
||||
return 0, ErrLeaderChanged
|
||||
case <-firstCommitInTermNotifier:
|
||||
firstCommitInTermNotifier = s.FirstCommitInTermNotify()
|
||||
lg.Info("first commit in current term: resending ReadIndex request")
|
||||
err := s.sendReadIndex(requestId)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
retryTimer.Reset(readIndexRetryTime)
|
||||
continue
|
||||
case <-retryTimer.C:
|
||||
lg.Warn(
|
||||
"waiting for ReadIndex response took too long, retrying",
|
||||
|
@ -37,6 +37,7 @@ require (
|
||||
go.etcd.io/etcd/server/v3 v3.5.0-alpha.0
|
||||
go.uber.org/zap v1.16.0
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
|
||||
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba
|
||||
google.golang.org/grpc v1.36.1
|
||||
gopkg.in/yaml.v2 v2.3.0
|
||||
|
@ -367,6 +367,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
@ -16,12 +16,14 @@ package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func TestMoveLeader(t *testing.T) { testMoveLeader(t, true) }
|
||||
@ -36,7 +38,7 @@ func testMoveLeader(t *testing.T, auto bool) {
|
||||
oldLeadIdx := clus.WaitLeader(t)
|
||||
oldLeadID := uint64(clus.Members[oldLeadIdx].s.ID())
|
||||
|
||||
// ensure followers go through leader transition while learship transfer
|
||||
// ensure followers go through leader transition while leadership transfer
|
||||
idc := make(chan uint64)
|
||||
stopc := make(chan struct{})
|
||||
defer close(stopc)
|
||||
@ -179,3 +181,81 @@ func TestTransferLeadershipWithLearner(t *testing.T) {
|
||||
t.Error("timed out waiting for leader transition")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFirstCommitNotification(t *testing.T) {
|
||||
BeforeTest(t)
|
||||
clusterSize := 3
|
||||
cluster := NewClusterV3(t, &ClusterConfig{Size: clusterSize})
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
oldLeaderIdx := cluster.WaitLeader(t)
|
||||
oldLeaderClient := cluster.Client(oldLeaderIdx)
|
||||
|
||||
newLeaderIdx := (oldLeaderIdx + 1) % clusterSize
|
||||
newLeaderId := uint64(cluster.Members[newLeaderIdx].ID())
|
||||
|
||||
notifiers := make(map[int]<-chan struct{}, clusterSize)
|
||||
for i, clusterMember := range cluster.Members {
|
||||
notifiers[i] = clusterMember.s.FirstCommitInTermNotify()
|
||||
}
|
||||
|
||||
_, err := oldLeaderClient.MoveLeader(context.Background(), newLeaderId)
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("got error during leadership transfer: %v", err)
|
||||
}
|
||||
|
||||
leaderAppliedIndex := cluster.Members[newLeaderIdx].s.AppliedIndex()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
group, groupContext := errgroup.WithContext(ctx)
|
||||
|
||||
for i, notifier := range notifiers {
|
||||
member, notifier := cluster.Members[i], notifier
|
||||
group.Go(func() error {
|
||||
return checkFirstCommitNotification(groupContext, member, leaderAppliedIndex, notifier)
|
||||
})
|
||||
}
|
||||
|
||||
err = group.Wait()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func checkFirstCommitNotification(
|
||||
ctx context.Context,
|
||||
member *member,
|
||||
leaderAppliedIndex uint64,
|
||||
notifier <-chan struct{},
|
||||
) error {
|
||||
// wait until server applies all the changes of leader
|
||||
for member.s.AppliedIndex() < leaderAppliedIndex {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case msg, ok := <-notifier:
|
||||
if ok {
|
||||
return fmt.Errorf(
|
||||
"member with ID %d got message via notifier, msg: %v",
|
||||
member.ID(),
|
||||
msg,
|
||||
)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf(
|
||||
"notification was not triggered, member ID: %d",
|
||||
member.ID(),
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user