Merge pull request #17642 from fuweid/fix-17506

*: LeaseTimeToLive returns error if leader changed
This commit is contained in:
Marek Siarkowicz 2024-04-02 14:55:41 +02:00 committed by GitHub
commit 09769c4be7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 108 additions and 2 deletions

View File

@ -357,6 +357,9 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
if err := s.waitAppliedIndex(); err != nil {
return nil, err
}
// gofail: var beforeLookupWhenLeaseTimeToLive struct{}
// primary; timetolive directly from leader
le := s.lessor.Lookup(lease.LeaseID(r.ID))
if le == nil {
@ -372,6 +375,15 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
}
resp.Keys = kbs
}
// The leasor could be demoted if leader changed during lookup.
// We should return error to force retry instead of returning
// incorrect remaining TTL.
if le.Demoted() {
// NOTE: lease.ErrNotPrimary is not retryable error for
// client. Instead, uses ErrLeaderChanged.
return nil, errors.ErrLeaderChanged
}
return resp, nil
}

View File

@ -95,6 +95,13 @@ func (l *Lease) forever() {
l.expiry = forever
}
// Demoted returns true if the lease's expiry has been reset to forever.
func (l *Lease) Demoted() bool {
l.expiryMu.Lock()
defer l.expiryMu.Unlock()
return l.expiry == forever
}
// Keys returns all the keys attached to the lease.
func (l *Lease) Keys() []string {
l.mu.RLock()

View File

@ -103,6 +103,9 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}
// gofail: var beforeLookupWhenForwardLeaseTimeToLive struct{}
l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
if l == nil {
http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
@ -126,6 +129,14 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resp.LeaseTimeToLiveResponse.Keys = kbs
}
// The leasor could be demoted if leader changed during lookup.
// We should return error to force retry instead of returning
// incorrect remaining TTL.
if l.Demoted() {
http.Error(w, lease.ErrNotPrimary.Error(), http.StatusInternalServerError)
return
}
v, err = resp.Marshal()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)

View File

@ -22,6 +22,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
@ -30,8 +32,10 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/testutil"
clientv3 "go.etcd.io/etcd/client/v3"
framecfg "go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/integration"
gofail "go.etcd.io/gofail/runtime"
)
// TestV3LeasePromote ensures the newly elected leader can promote itself
@ -1046,6 +1050,78 @@ func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) {
}
}
func TestV3LeaseTimeToLiveWithLeaderChanged(t *testing.T) {
t.Run("normal", func(subT *testing.T) {
testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenLeaseTimeToLive")
})
t.Run("forward", func(subT *testing.T) {
testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenForwardLeaseTimeToLive")
})
}
func testV3LeaseTimeToLiveWithLeaderChanged(t *testing.T, fpName string) {
if len(gofail.List()) == 0 {
t.Skip("please run 'make gofail-enable' before running the test")
}
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
oldLeadIdx := clus.WaitLeader(t)
followerIdx := (oldLeadIdx + 1) % 3
followerMemberID := clus.Members[followerIdx].ID()
oldLeadC := clus.Client(oldLeadIdx)
leaseResp, err := oldLeadC.Grant(ctx, 100)
require.NoError(t, err)
require.NoError(t, gofail.Enable(fpName, `sleep("3s")`))
t.Cleanup(func() {
terr := gofail.Disable(fpName)
if terr != nil && terr != gofail.ErrDisabled {
t.Fatalf("failed to disable %s: %v", fpName, terr)
}
})
readyCh := make(chan struct{})
errCh := make(chan error, 1)
var targetC *clientv3.Client
switch fpName {
case "beforeLookupWhenLeaseTimeToLive":
targetC = oldLeadC
case "beforeLookupWhenForwardLeaseTimeToLive":
targetC = clus.Client((oldLeadIdx + 2) % 3)
default:
t.Fatalf("unsupported %s failpoint", fpName)
}
go func() {
<-readyCh
time.Sleep(1 * time.Second)
_, merr := oldLeadC.MoveLeader(ctx, uint64(followerMemberID))
assert.NoError(t, gofail.Disable(fpName))
errCh <- merr
}()
close(readyCh)
ttlResp, err := targetC.TimeToLive(ctx, leaseResp.ID)
require.NoError(t, err)
require.GreaterOrEqual(t, int64(100), ttlResp.TTL)
require.NoError(t, <-errCh)
}
// acquireLeaseAndKey creates a new lease and creates an attached key.
func acquireLeaseAndKey(clus *integration.Cluster, key string) (int64, error) {
// create lease

View File

@ -36,7 +36,7 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g
.PHONY: gofail-enable
gofail-enable: install-gofail
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
gofail enable server/etcdserver/ server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
@ -44,7 +44,7 @@ gofail-enable: install-gofail
.PHONY: gofail-disable
gofail-disable: install-gofail
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
gofail disable server/etcdserver/ server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go mod tidy
cd ./etcdutl && go mod tidy
cd ./etcdctl && go mod tidy