diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index 30c0d5062..6c5eefcb3 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -357,6 +357,9 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR if err := s.waitAppliedIndex(); err != nil { return nil, err } + + // gofail: var beforeLookupWhenLeaseTimeToLive struct{} + // primary; timetolive directly from leader le := s.lessor.Lookup(lease.LeaseID(r.ID)) if le == nil { @@ -372,6 +375,15 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR } resp.Keys = kbs } + + // The leasor could be demoted if leader changed during lookup. + // We should return error to force retry instead of returning + // incorrect remaining TTL. + if le.Demoted() { + // NOTE: lease.ErrNotPrimary is not retryable error for + // client. Instead, uses ErrLeaderChanged. + return nil, errors.ErrLeaderChanged + } return resp, nil } diff --git a/server/lease/lease.go b/server/lease/lease.go index b35a6efdc..95f3eb6f7 100644 --- a/server/lease/lease.go +++ b/server/lease/lease.go @@ -95,6 +95,13 @@ func (l *Lease) forever() { l.expiry = forever } +// Demoted returns true if the lease's expiry has been reset to forever. +func (l *Lease) Demoted() bool { + l.expiryMu.Lock() + defer l.expiryMu.Unlock() + return l.expiry == forever +} + // Keys returns all the keys attached to the lease. func (l *Lease) Keys() []string { l.mu.RLock() diff --git a/server/lease/leasehttp/http.go b/server/lease/leasehttp/http.go index 7c9f56bde..9a337132a 100644 --- a/server/lease/leasehttp/http.go +++ b/server/lease/leasehttp/http.go @@ -103,6 +103,9 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout) return } + + // gofail: var beforeLookupWhenForwardLeaseTimeToLive struct{} + l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID)) if l == nil { http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound) @@ -126,6 +129,14 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { resp.LeaseTimeToLiveResponse.Keys = kbs } + // The leasor could be demoted if leader changed during lookup. + // We should return error to force retry instead of returning + // incorrect remaining TTL. + if l.Demoted() { + http.Error(w, lease.ErrNotPrimary.Error(), http.StatusInternalServerError) + return + } + v, err = resp.Marshal() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 8e7b16e69..d9d9a7423 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -30,8 +32,10 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/pkg/v3/testutil" + clientv3 "go.etcd.io/etcd/client/v3" framecfg "go.etcd.io/etcd/tests/v3/framework/config" "go.etcd.io/etcd/tests/v3/framework/integration" + gofail "go.etcd.io/gofail/runtime" ) // TestV3LeasePromote ensures the newly elected leader can promote itself @@ -1046,6 +1050,78 @@ func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) { } } +func TestV3LeaseTimeToLiveWithLeaderChanged(t *testing.T) { + t.Run("normal", func(subT *testing.T) { + testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenLeaseTimeToLive") + }) + + t.Run("forward", func(subT *testing.T) { + testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenForwardLeaseTimeToLive") + }) +} + +func testV3LeaseTimeToLiveWithLeaderChanged(t *testing.T, fpName string) { + if len(gofail.List()) == 0 { + t.Skip("please run 'make gofail-enable' before running the test") + } + + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + oldLeadIdx := clus.WaitLeader(t) + followerIdx := (oldLeadIdx + 1) % 3 + + followerMemberID := clus.Members[followerIdx].ID() + + oldLeadC := clus.Client(oldLeadIdx) + + leaseResp, err := oldLeadC.Grant(ctx, 100) + require.NoError(t, err) + + require.NoError(t, gofail.Enable(fpName, `sleep("3s")`)) + t.Cleanup(func() { + terr := gofail.Disable(fpName) + if terr != nil && terr != gofail.ErrDisabled { + t.Fatalf("failed to disable %s: %v", fpName, terr) + } + }) + + readyCh := make(chan struct{}) + errCh := make(chan error, 1) + + var targetC *clientv3.Client + switch fpName { + case "beforeLookupWhenLeaseTimeToLive": + targetC = oldLeadC + case "beforeLookupWhenForwardLeaseTimeToLive": + targetC = clus.Client((oldLeadIdx + 2) % 3) + default: + t.Fatalf("unsupported %s failpoint", fpName) + } + + go func() { + <-readyCh + time.Sleep(1 * time.Second) + + _, merr := oldLeadC.MoveLeader(ctx, uint64(followerMemberID)) + assert.NoError(t, gofail.Disable(fpName)) + errCh <- merr + }() + + close(readyCh) + + ttlResp, err := targetC.TimeToLive(ctx, leaseResp.ID) + require.NoError(t, err) + require.GreaterOrEqual(t, int64(100), ttlResp.TTL) + + require.NoError(t, <-errCh) +} + // acquireLeaseAndKey creates a new lease and creates an attached key. func acquireLeaseAndKey(clus *integration.Cluster, key string) (int64, error) { // create lease diff --git a/tests/robustness/makefile.mk b/tests/robustness/makefile.mk index dee968eb6..8d50c03de 100644 --- a/tests/robustness/makefile.mk +++ b/tests/robustness/makefile.mk @@ -36,7 +36,7 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g .PHONY: gofail-enable gofail-enable: install-gofail - gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/ + gofail enable server/etcdserver/ server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/ cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION} cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION} cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION} @@ -44,7 +44,7 @@ gofail-enable: install-gofail .PHONY: gofail-disable gofail-disable: install-gofail - gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/ + gofail disable server/etcdserver/ server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/ cd ./server && go mod tidy cd ./etcdutl && go mod tidy cd ./etcdctl && go mod tidy