mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #17425 from ivanvc/release-3.5-backport-ignore-old-leaders-leases-revoking-request
[3.5] backport ignore old leaders leases revoking requests
This commit is contained in:
commit
9ffba74e66
@ -660,8 +660,15 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
|
||||
if srv.Cfg.EnableLeaseCheckpoint {
|
||||
// setting checkpointer enables lease checkpoint feature.
|
||||
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
|
||||
srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
|
||||
if !srv.ensureLeadership() {
|
||||
srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader",
|
||||
zap.Uint64("local-member-id", uint64(srv.ID())))
|
||||
return lease.ErrNotPrimary
|
||||
}
|
||||
|
||||
srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@ -1145,7 +1152,19 @@ func (s *EtcdServer) run() {
|
||||
|
||||
func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
|
||||
s.GoAttach(func() {
|
||||
// We shouldn't revoke any leases if current member isn't a leader,
|
||||
// because the operation should only be performed by the leader. When
|
||||
// the leader gets blocked on the raft loop, such as writing WAL entries,
|
||||
// it can't process any events or messages from raft. It may think it
|
||||
// is still the leader even the leader has already changed.
|
||||
// Refer to https://github.com/etcd-io/etcd/issues/15247
|
||||
lg := s.Logger()
|
||||
if !s.ensureLeadership() {
|
||||
lg.Warn("Ignore the lease revoking request because current member isn't a leader",
|
||||
zap.Uint64("local-member-id", uint64(s.ID())))
|
||||
return
|
||||
}
|
||||
|
||||
// Increases throughput of expired leases deletion process through parallelization
|
||||
c := make(chan struct{}, maxPendingRevokes)
|
||||
for _, curLease := range leases {
|
||||
@ -1178,6 +1197,29 @@ func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
|
||||
})
|
||||
}
|
||||
|
||||
// ensureLeadership checks whether current member is still the leader.
|
||||
func (s *EtcdServer) ensureLeadership() bool {
|
||||
lg := s.Logger()
|
||||
|
||||
ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
|
||||
defer cancel()
|
||||
if err := s.linearizableReadNotify(ctx); err != nil {
|
||||
lg.Warn("Failed to check current member's leadership",
|
||||
zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
newLeaderId := s.raftStatus().Lead
|
||||
if newLeaderId != uint64(s.ID()) {
|
||||
lg.Warn("Current member isn't a leader",
|
||||
zap.Uint64("local-member-id", uint64(s.ID())),
|
||||
zap.Uint64("new-lead", newLeaderId))
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// Cleanup removes allocated objects by EtcdServer.NewServer in
|
||||
// situation that EtcdServer::Start was not called (that takes care of cleanup).
|
||||
func (s *EtcdServer) Cleanup() {
|
||||
|
@ -297,6 +297,17 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
|
||||
|
||||
func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
|
||||
if s.isLeader() {
|
||||
// If s.isLeader() returns true, but we fail to ensure the current
|
||||
// member's leadership, there are a couple of possibilities:
|
||||
// 1. current member gets stuck on writing WAL entries;
|
||||
// 2. current member is in network isolation status;
|
||||
// 3. current member isn't a leader anymore (possibly due to #1 above).
|
||||
// In such case, we just return error to client, so that the client can
|
||||
// switch to another member to continue the lease keep-alive operation.
|
||||
if !s.ensureLeadership() {
|
||||
return -1, lease.ErrNotPrimary
|
||||
}
|
||||
|
||||
if err := s.waitAppliedIndex(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ type RangeDeleter func() TxnDelete
|
||||
|
||||
// Checkpointer permits checkpointing of lease remaining TTLs to the consensus log. Defined here to
|
||||
// avoid circular dependency with mvcc.
|
||||
type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest)
|
||||
type Checkpointer func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error
|
||||
|
||||
type LeaseID int64
|
||||
|
||||
@ -423,7 +423,9 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
|
||||
// By applying a RAFT entry only when the remainingTTL is already set, we limit the number
|
||||
// of RAFT entries written per lease to a max of 2 per checkpoint interval.
|
||||
if clearRemainingTTL {
|
||||
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}})
|
||||
if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: []*pb.LeaseCheckpoint{{ID: int64(l.ID), Remaining_TTL: 0}}}); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
}
|
||||
|
||||
le.mu.Lock()
|
||||
@ -659,7 +661,9 @@ func (le *lessor) checkpointScheduledLeases() {
|
||||
le.mu.Unlock()
|
||||
|
||||
if len(cps) != 0 {
|
||||
le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
|
||||
if err := le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps}); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
if len(cps) < maxLeaseCheckpointBatchSize {
|
||||
return
|
||||
|
@ -250,10 +250,11 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
|
||||
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
|
||||
for _, cp := range cp.GetCheckpoints() {
|
||||
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
defer le.Stop()
|
||||
// Set checkpointer
|
||||
@ -540,7 +541,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
|
||||
defer le.Stop()
|
||||
le.minLeaseTTL = 1
|
||||
checkpointedC := make(chan struct{})
|
||||
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
|
||||
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) error {
|
||||
close(checkpointedC)
|
||||
if len(lc.Checkpoints) != 1 {
|
||||
t.Errorf("expected 1 checkpoint but got %d", len(lc.Checkpoints))
|
||||
@ -549,6 +550,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
|
||||
if c.Remaining_TTL != 1 {
|
||||
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
_, err := le.Grant(1, 2)
|
||||
if err != nil {
|
||||
|
157
tests/e2e/v3_lease_no_proxy_test.go
Normal file
157
tests/e2e/v3_lease_no_proxy_test.go
Normal file
@ -0,0 +1,157 @@
|
||||
// Copyright 2024 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//go:build !cluster_proxy
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.etcd.io/etcd/tests/v3/framework/e2e"
|
||||
"go.etcd.io/etcd/tests/v3/framework/testutils"
|
||||
)
|
||||
|
||||
// TestLeaseRevoke_IgnoreOldLeader verifies that leases shouldn't be revoked
|
||||
// by old leader.
|
||||
// See the case 1 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093.
|
||||
func TestLeaseRevoke_IgnoreOldLeader(t *testing.T) {
|
||||
testLeaseRevokeIssue(t, true)
|
||||
}
|
||||
|
||||
// TestLeaseRevoke_ClientSwitchToOtherMember verifies that leases shouldn't
|
||||
// be revoked by new leader.
|
||||
// See the case 2 in https://github.com/etcd-io/etcd/issues/15247#issuecomment-1777862093.
|
||||
func TestLeaseRevoke_ClientSwitchToOtherMember(t *testing.T) {
|
||||
testLeaseRevokeIssue(t, false)
|
||||
}
|
||||
|
||||
func testLeaseRevokeIssue(t *testing.T, connectToOneFollower bool) {
|
||||
e2e.BeforeTest(t)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
t.Log("Starting a new etcd cluster")
|
||||
epc, err := e2e.NewEtcdProcessCluster(t, &e2e.EtcdProcessClusterConfig{
|
||||
ClusterSize: 3,
|
||||
GoFailEnabled: true,
|
||||
GoFailClientTimeout: 40 * time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
if errC := epc.Close(); errC != nil {
|
||||
t.Fatalf("error closing etcd processes (%v)", errC)
|
||||
}
|
||||
}()
|
||||
|
||||
leaderIdx := epc.WaitLeader(t)
|
||||
t.Logf("Leader index: %d", leaderIdx)
|
||||
|
||||
epsForNormalOperations := epc.Procs[(leaderIdx+2)%3].EndpointsGRPC()
|
||||
t.Logf("Creating a client for normal operations: %v", epsForNormalOperations)
|
||||
client, err := clientv3.New(clientv3.Config{Endpoints: epsForNormalOperations, DialTimeout: 3 * time.Second})
|
||||
require.NoError(t, err)
|
||||
defer client.Close()
|
||||
|
||||
var epsForLeaseKeepAlive []string
|
||||
if connectToOneFollower {
|
||||
epsForLeaseKeepAlive = epc.Procs[(leaderIdx+1)%3].EndpointsGRPC()
|
||||
} else {
|
||||
epsForLeaseKeepAlive = epc.EndpointsGRPC()
|
||||
}
|
||||
t.Logf("Creating a client for the leaseKeepAlive operation: %v", epsForLeaseKeepAlive)
|
||||
clientForKeepAlive, err := clientv3.New(clientv3.Config{Endpoints: epsForLeaseKeepAlive, DialTimeout: 3 * time.Second})
|
||||
require.NoError(t, err)
|
||||
defer clientForKeepAlive.Close()
|
||||
|
||||
resp, err := client.Status(ctx, epsForNormalOperations[0])
|
||||
require.NoError(t, err)
|
||||
oldLeaderId := resp.Leader
|
||||
|
||||
t.Log("Creating a new lease")
|
||||
leaseRsp, err := client.Grant(ctx, 20)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Log("Starting a goroutine to keep alive the lease")
|
||||
doneC := make(chan struct{})
|
||||
stopC := make(chan struct{})
|
||||
startC := make(chan struct{}, 1)
|
||||
go func() {
|
||||
defer close(doneC)
|
||||
|
||||
respC, kerr := clientForKeepAlive.KeepAlive(ctx, leaseRsp.ID)
|
||||
require.NoError(t, kerr)
|
||||
// ensure we have received the first response from the server
|
||||
<-respC
|
||||
startC <- struct{}{}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stopC:
|
||||
return
|
||||
case <-respC:
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
t.Log("Wait for the keepAlive goroutine to get started")
|
||||
<-startC
|
||||
|
||||
t.Log("Trigger the failpoint to simulate stalled writing")
|
||||
err = epc.Procs[leaderIdx].Failpoints().SetupHTTP(ctx, "raftBeforeSave", `sleep("30s")`)
|
||||
require.NoError(t, err)
|
||||
|
||||
cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
t.Logf("Waiting for a new leader to be elected, old leader index: %d, old leader ID: %d", leaderIdx, oldLeaderId)
|
||||
testutils.ExecuteUntil(cctx, t, func() {
|
||||
for {
|
||||
resp, err = client.Status(ctx, epsForNormalOperations[0])
|
||||
if err == nil && resp.Leader != oldLeaderId {
|
||||
t.Logf("A new leader has already been elected, new leader index: %d", resp.Leader)
|
||||
return
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
})
|
||||
cancel()
|
||||
|
||||
t.Log("Writing a key/value pair")
|
||||
_, err = client.Put(ctx, "foo", "bar")
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Log("Sleeping 30 seconds")
|
||||
time.Sleep(30 * time.Second)
|
||||
|
||||
t.Log("Remove the failpoint 'raftBeforeSave'")
|
||||
err = epc.Procs[leaderIdx].Failpoints().DeactivateHTTP(ctx, "raftBeforeSave")
|
||||
require.NoError(t, err)
|
||||
|
||||
// By default, etcd tries to revoke leases every 7 seconds.
|
||||
t.Log("Sleeping 10 seconds")
|
||||
time.Sleep(10 * time.Second)
|
||||
|
||||
t.Log("Confirming the lease isn't revoked")
|
||||
leases, err := client.Leases(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(leases.Leases))
|
||||
|
||||
t.Log("Waiting for the keepAlive goroutine to exit")
|
||||
close(stopC)
|
||||
<-doneC
|
||||
}
|
@ -491,6 +491,14 @@ func (epc *EtcdProcessCluster) EndpointsV3() []string {
|
||||
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsV3() })
|
||||
}
|
||||
|
||||
func (epc *EtcdProcessCluster) EndpointsGRPC() []string {
|
||||
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsGRPC() })
|
||||
}
|
||||
|
||||
func (epc *EtcdProcessCluster) EndpointsHTTP() []string {
|
||||
return epc.Endpoints(func(ep EtcdProcess) []string { return ep.EndpointsHTTP() })
|
||||
}
|
||||
|
||||
func (epc *EtcdProcessCluster) Endpoints(f func(ep EtcdProcess) []string) (ret []string) {
|
||||
for _, p := range epc.Procs {
|
||||
ret = append(ret, f(p)...)
|
||||
|
@ -757,7 +757,7 @@ func TestV3LeaseFailover(t *testing.T) {
|
||||
|
||||
// send keep alive to old leader until the old leader starts
|
||||
// to drop lease request.
|
||||
var expectedExp time.Time
|
||||
expectedExp := time.Now().Add(5 * time.Second)
|
||||
for {
|
||||
if err = lac.Send(lreq); err != nil {
|
||||
break
|
||||
|
Loading…
x
Reference in New Issue
Block a user