*: LeaseTimeToLive returns error if leader changed

The old leader demotes lessor and all the leases' expire time will be
updated. Instead of returning incorrect remaining TTL, we should return
errors to force client retry.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
Wei Fu 2024-03-25 23:10:35 +08:00
parent e4448c4744
commit d3bb6f688b
5 changed files with 108 additions and 2 deletions

View File

@ -357,6 +357,9 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
if err := s.waitAppliedIndex(); err != nil {
return nil, err
}
// gofail: var beforeLookupWhenLeaseTimeToLive struct{}
// primary; timetolive directly from leader
le := s.lessor.Lookup(lease.LeaseID(r.ID))
if le == nil {
@ -372,6 +375,15 @@ func (s *EtcdServer) leaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
}
resp.Keys = kbs
}
// The leasor could be demoted if leader changed during lookup.
// We should return error to force retry instead of returning
// incorrect remaining TTL.
if le.Demoted() {
// NOTE: lease.ErrNotPrimary is not retryable error for
// client. Instead, uses ErrLeaderChanged.
return nil, errors.ErrLeaderChanged
}
return resp, nil
}

View File

@ -95,6 +95,13 @@ func (l *Lease) forever() {
l.expiry = forever
}
// Demoted returns true if the lease's expiry has been reset to forever.
func (l *Lease) Demoted() bool {
l.expiryMu.Lock()
defer l.expiryMu.Unlock()
return l.expiry == forever
}
// Keys returns all the keys attached to the lease.
func (l *Lease) Keys() []string {
l.mu.RLock()

View File

@ -103,6 +103,9 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
return
}
// gofail: var beforeLookupWhenForwardLeaseTimeToLive struct{}
l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
if l == nil {
http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
@ -126,6 +129,14 @@ func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resp.LeaseTimeToLiveResponse.Keys = kbs
}
// The leasor could be demoted if leader changed during lookup.
// We should return error to force retry instead of returning
// incorrect remaining TTL.
if l.Demoted() {
http.Error(w, lease.ErrNotPrimary.Error(), http.StatusInternalServerError)
return
}
v, err = resp.Marshal()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)

View File

@ -22,6 +22,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
@ -30,8 +32,10 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/testutil"
clientv3 "go.etcd.io/etcd/client/v3"
framecfg "go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/integration"
gofail "go.etcd.io/gofail/runtime"
)
// TestV3LeasePromote ensures the newly elected leader can promote itself
@ -1046,6 +1050,78 @@ func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) {
}
}
func TestV3LeaseTimeToLiveWithLeaderChanged(t *testing.T) {
t.Run("normal", func(subT *testing.T) {
testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenLeaseTimeToLive")
})
t.Run("forward", func(subT *testing.T) {
testV3LeaseTimeToLiveWithLeaderChanged(subT, "beforeLookupWhenForwardLeaseTimeToLive")
})
}
func testV3LeaseTimeToLiveWithLeaderChanged(t *testing.T, fpName string) {
if len(gofail.List()) == 0 {
t.Skip("please run 'make gofail-enable' before running the test")
}
integration.BeforeTest(t)
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
oldLeadIdx := clus.WaitLeader(t)
followerIdx := (oldLeadIdx + 1) % 3
followerMemberID := clus.Members[followerIdx].ID()
oldLeadC := clus.Client(oldLeadIdx)
leaseResp, err := oldLeadC.Grant(ctx, 100)
require.NoError(t, err)
require.NoError(t, gofail.Enable(fpName, `sleep("3s")`))
t.Cleanup(func() {
terr := gofail.Disable(fpName)
if terr != nil && terr != gofail.ErrDisabled {
t.Fatalf("failed to disable %s: %v", fpName, terr)
}
})
readyCh := make(chan struct{})
errCh := make(chan error, 1)
var targetC *clientv3.Client
switch fpName {
case "beforeLookupWhenLeaseTimeToLive":
targetC = oldLeadC
case "beforeLookupWhenForwardLeaseTimeToLive":
targetC = clus.Client((oldLeadIdx + 2) % 3)
default:
t.Fatalf("unsupported %s failpoint", fpName)
}
go func() {
<-readyCh
time.Sleep(1 * time.Second)
_, merr := oldLeadC.MoveLeader(ctx, uint64(followerMemberID))
assert.NoError(t, gofail.Disable(fpName))
errCh <- merr
}()
close(readyCh)
ttlResp, err := targetC.TimeToLive(ctx, leaseResp.ID)
require.NoError(t, err)
require.GreaterOrEqual(t, int64(100), ttlResp.TTL)
require.NoError(t, <-errCh)
}
// acquireLeaseAndKey creates a new lease and creates an attached key.
func acquireLeaseAndKey(clus *integration.Cluster, key string) (int64, error) {
// create lease

View File

@ -36,7 +36,7 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g
.PHONY: gofail-enable
gofail-enable: install-gofail
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
gofail enable server/etcdserver/ server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
@ -44,7 +44,7 @@ gofail-enable: install-gofail
.PHONY: gofail-disable
gofail-disable: install-gofail
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
gofail disable server/etcdserver/ server/lease/leasehttp server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go mod tidy
cd ./etcdutl && go mod tidy
cd ./etcdctl && go mod tidy