From e5c2dff3467e7ffead0c4c4c29b6903e6b23809a Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 14 Aug 2019 01:33:31 -0700 Subject: [PATCH] etcdserver: detect leader change on reads Signed-off-by: Gyuho Lee --- etcdserver/api/v3rpc/rpctypes/error.go | 2 ++ etcdserver/api/v3rpc/util.go | 1 + etcdserver/errors.go | 1 + etcdserver/server.go | 22 +++++++++++++++++++++- etcdserver/v3_server.go | 9 +++++++++ 5 files changed, 34 insertions(+), 1 deletion(-) diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 55eab38ef..bc1ad7bbd 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -61,6 +61,7 @@ var ( ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err() ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err() + ErrGRPCLeaderChanged = status.New(codes.Unavailable, "etcdserver: leader changed").Err() ErrGRPCNotCapable = status.New(codes.Unavailable, "etcdserver: not capable").Err() ErrGRPCStopped = status.New(codes.Unavailable, "etcdserver: server stopped").Err() ErrGRPCTimeout = status.New(codes.Unavailable, "etcdserver: request timed out").Err() @@ -163,6 +164,7 @@ var ( ErrNoLeader = Error(ErrGRPCNoLeader) ErrNotLeader = Error(ErrGRPCNotLeader) + ErrLeaderChanged = Error(ErrGRPCLeaderChanged) ErrNotCapable = Error(ErrGRPCNotCapable) ErrStopped = Error(ErrGRPCStopped) ErrTimeout = Error(ErrGRPCTimeout) diff --git a/etcdserver/api/v3rpc/util.go b/etcdserver/api/v3rpc/util.go index 799c1197d..c4a1ce042 100644 --- a/etcdserver/api/v3rpc/util.go +++ b/etcdserver/api/v3rpc/util.go @@ -44,6 +44,7 @@ var toGRPCErrorMap = map[error]error{ etcdserver.ErrNoLeader: rpctypes.ErrGRPCNoLeader, etcdserver.ErrNotLeader: rpctypes.ErrGRPCNotLeader, + etcdserver.ErrLeaderChanged: rpctypes.ErrGRPCLeaderChanged, etcdserver.ErrStopped: rpctypes.ErrGRPCStopped, etcdserver.ErrTimeout: rpctypes.ErrGRPCTimeout, etcdserver.ErrTimeoutDueToLeaderFail: rpctypes.ErrGRPCTimeoutDueToLeaderFail, diff --git a/etcdserver/errors.go b/etcdserver/errors.go index fb93c4b2a..8cec52a17 100644 --- a/etcdserver/errors.go +++ b/etcdserver/errors.go @@ -27,6 +27,7 @@ var ( ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure") ErrTimeoutDueToConnectionLost = errors.New("etcdserver: request timed out, possibly due to connection lost") ErrTimeoutLeaderTransfer = errors.New("etcdserver: request timed out, leader transfer took too long") + ErrLeaderChanged = errors.New("etcdserver: leader changed") ErrNotEnoughStartedMembers = errors.New("etcdserver: re-configuration failed due to not enough started members") ErrNoLeader = errors.New("etcdserver: no leader") ErrNotLeader = errors.New("etcdserver: not leader") diff --git a/etcdserver/server.go b/etcdserver/server.go index c1ec3aa0e..f46e91d34 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -198,7 +198,9 @@ type EtcdServer struct { // stopping is closed by run goroutine on shutdown. stopping chan struct{} // done is closed when all goroutines from start() complete. - done chan struct{} + done chan struct{} + leaderChanged chan struct{} + leaderChangedMu sync.RWMutex errorc chan error id types.ID @@ -597,6 +599,7 @@ func (s *EtcdServer) start() { s.ctx, s.cancel = context.WithCancel(context.Background()) s.readwaitc = make(chan struct{}, 1) s.readNotifier = newNotifier() + s.leaderChanged = make(chan struct{}) if s.ClusterVersion() != nil { plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String())) } else { @@ -733,6 +736,17 @@ func (s *EtcdServer) run() { s.compactor.Resume() } } + if newLeader { + select { + case s.leaderChanged <- struct{}{}: + default: + } + s.leaderChangedMu.Lock() + lc := s.leaderChanged + s.leaderChanged = make(chan struct{}) + s.leaderChangedMu.Unlock() + close(lc) + } // TODO: remove the nil checking // current test utility does not provide the stats @@ -841,6 +855,12 @@ func (s *EtcdServer) run() { } } +func (s *EtcdServer) leaderChangedNotify() <-chan struct{} { + s.leaderChangedMu.RLock() + defer s.leaderChangedMu.RUnlock() + return s.leaderChanged +} + func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) { s.applySnapshot(ep, apply) s.applyEntries(ep, apply) diff --git a/etcdserver/v3_server.go b/etcdserver/v3_server.go index f214a1926..9d429e329 100644 --- a/etcdserver/v3_server.go +++ b/etcdserver/v3_server.go @@ -614,7 +614,10 @@ func (s *EtcdServer) linearizableReadLoop() { id1 := s.reqIDGen.Next() binary.BigEndian.PutUint64(ctxToSend, id1) + leaderChangedNotifier := s.leaderChangedNotify() select { + case <-leaderChangedNotifier: + continue case <-s.readwaitc: case <-s.stopping: return @@ -659,6 +662,12 @@ func (s *EtcdServer) linearizableReadLoop() { slowReadIndex.Inc() } + case <-leaderChangedNotifier: + timeout = true + readIndexFailed.Inc() + // return a retryable error. + nr.notify(ErrLeaderChanged) + case <-time.After(s.Cfg.ReqTimeout()): plog.Warningf("timed out waiting for read index response (local node might have slow network)") nr.notify(ErrTimeout)