From 94a1d0c1b5c94dd95bd11d357a27cfb6e965a9ce Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 4 Apr 2024 14:52:06 +0800 Subject: [PATCH] *: LeaseTimeToLive returns error if leader changed The old leader demotes lessor and all the leases' expire time will be updated. Instead of returning incorrect remaining TTL, we should return errors to force client retry. Cherry-pick: d3bb6f688b4643155b4a9924cec726bdc76a1306 Signed-off-by: Wei Fu --- build.sh | 2 +- go.sum | 2 + server/etcdserver/v3_server.go | 12 +++++ server/lease/leasehttp/http.go | 11 +++++ server/lease/lessor.go | 7 +++ tests/go.mod | 1 + tests/go.sum | 2 + tests/integration/v3_lease_test.go | 76 ++++++++++++++++++++++++++++++ 8 files changed, 112 insertions(+), 1 deletion(-) diff --git a/build.sh b/build.sh index 0f65f6dba..05d916ea3 100755 --- a/build.sh +++ b/build.sh @@ -27,7 +27,7 @@ GOFAIL_VERSION=$(cd tools/mod && go list -m -f '{{.Version}}' go.etcd.io/gofail) toggle_failpoints() { mode="$1" if command -v gofail >/dev/null 2>&1; then - run gofail "$mode" server/etcdserver/ server/mvcc/ server/wal/ server/mvcc/backend/ + run gofail "$mode" server/etcdserver/ server/lease/leasehttp server/mvcc/ server/wal/ server/mvcc/backend/ if [[ "$mode" == "enable" ]]; then go get go.etcd.io/gofail@"${GOFAIL_VERSION}" cd ./server && go get go.etcd.io/gofail@"${GOFAIL_VERSION}" diff --git a/go.sum b/go.sum index 8e08168e0..f5c4efc4f 100644 --- a/go.sum +++ b/go.sum @@ -310,6 +310,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= +go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg= +go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M= diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index b6e7a8067..a9e1f3586 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -377,6 +377,9 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR if err := s.waitAppliedIndex(); err != nil { return nil, err } + + // gofail: var beforeLookupWhenLeaseTimeToLive struct{} + // primary; timetolive directly from leader le := s.lessor.Lookup(lease.LeaseID(r.ID)) if le == nil { @@ -392,6 +395,15 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR } resp.Keys = kbs } + + // The leasor could be demoted if leader changed during lookup. + // We should return error to force retry instead of returning + // incorrect remaining TTL. + if le.Demoted() { + // NOTE: lease.ErrNotPrimary is not retryable error for + // client. Instead, uses ErrLeaderChanged. + return nil, ErrLeaderChanged + } return resp, nil } diff --git a/server/lease/leasehttp/http.go b/server/lease/leasehttp/http.go index 1b9390d97..9c815cbae 100644 --- a/server/lease/leasehttp/http.go +++ b/server/lease/leasehttp/http.go @@ -103,6 +103,9 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout) return } + + // gofail: var beforeLookupWhenForwardLeaseTimeToLive struct{} + l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID)) if l == nil { http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound) @@ -126,6 +129,14 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { resp.LeaseTimeToLiveResponse.Keys = kbs } + // The leasor could be demoted if leader changed during lookup. + // We should return error to force retry instead of returning + // incorrect remaining TTL. + if l.Demoted() { + http.Error(w, lease.ErrNotPrimary.Error(), http.StatusInternalServerError) + return + } + v, err = resp.Marshal() if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/server/lease/lessor.go b/server/lease/lessor.go index abeeb09bf..e0c2273df 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -900,6 +900,13 @@ func (l *Lease) forever() { l.expiry = forever } +// Demoted returns true if the lease's expiry has been reset to forever. +func (l *Lease) Demoted() bool { + l.expiryMu.Lock() + defer l.expiryMu.Unlock() + return l.expiry == forever +} + // Keys returns all the keys attached to the lease. func (l *Lease) Keys() []string { l.mu.RLock() diff --git a/tests/go.mod b/tests/go.mod index 451564ccd..19f09c2c6 100644 --- a/tests/go.mod +++ b/tests/go.mod @@ -36,6 +36,7 @@ require ( go.etcd.io/etcd/pkg/v3 v3.5.13 go.etcd.io/etcd/raft/v3 v3.5.13 go.etcd.io/etcd/server/v3 v3.5.13 + go.etcd.io/gofail v0.1.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 go.opentelemetry.io/otel v1.20.0 go.opentelemetry.io/otel/sdk v1.20.0 diff --git a/tests/go.sum b/tests/go.sum index b9e625bcd..54936cde0 100644 --- a/tests/go.sum +++ b/tests/go.sum @@ -304,6 +304,8 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= +go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg= +go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.0 h1:PzIubN4/sjByhDRHLviCjJuweBXWFZWhghjg7cS28+M= diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index e11c6f246..9388a97f0 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -22,10 +22,14 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/client/pkg/v3/testutil" + clientv3 "go.etcd.io/etcd/client/v3" + gofail "go.etcd.io/gofail/runtime" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -1056,6 +1060,78 @@ func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) { } } +func TestV3LeaseTimeToLiveWithLeaderChanged(t *testing.T) { + t.Run("normal", func(subT *testing.T) { + testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenLeaseTimeToLive") + }) + + t.Run("forward", func(subT *testing.T) { + testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenForwardLeaseTimeToLive") + }) +} + +func testV3LeaseTimeToLiveWithLeaderChanged(t *testing.T, fpName string) { + if len(gofail.List()) == 0 { + t.Skip("please run 'make gofail-enable' before running the test") + } + + BeforeTest(t) + + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + oldLeadIdx := clus.WaitLeader(t) + followerIdx := (oldLeadIdx + 1) % 3 + + followerMemberID := clus.Members[followerIdx].ID() + + oldLeadC := clus.Client(oldLeadIdx) + + leaseResp, err := oldLeadC.Grant(ctx, 100) + require.NoError(t, err) + + require.NoError(t, gofail.Enable(fpName, `sleep("3s")`)) + t.Cleanup(func() { + terr := gofail.Disable(fpName) + if terr != nil && terr != gofail.ErrDisabled { + t.Fatalf("failed to disable %s: %v", fpName, terr) + } + }) + + readyCh := make(chan struct{}) + errCh := make(chan error, 1) + + var targetC *clientv3.Client + switch fpName { + case "beforeLookupWhenLeaseTimeToLive": + targetC = oldLeadC + case "beforeLookupWhenForwardLeaseTimeToLive": + targetC = clus.Client((oldLeadIdx + 2) % 3) + default: + t.Fatalf("unsupported %s failpoint", fpName) + } + + go func() { + <-readyCh + time.Sleep(1 * time.Second) + + _, merr := oldLeadC.MoveLeader(ctx, uint64(followerMemberID)) + assert.NoError(t, gofail.Disable(fpName)) + errCh <- merr + }() + + close(readyCh) + + ttlResp, err := targetC.TimeToLive(ctx, leaseResp.ID) + require.NoError(t, err) + require.GreaterOrEqual(t, int64(100), ttlResp.TTL) + + require.NoError(t, <-errCh) +} + // acquireLeaseAndKey creates a new lease and creates an attached key. func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) { // create lease