mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
support linearizable renew lease
When etcdserver receives a LeaseRenew request, it may be still in progress of processing the LeaseGrantRequest on exact the same leaseID. Accordingly it may return a TTL=0 to client due to the leaseID not found error. So the leader should wait for the appliedID to be available before processing client requests.
This commit is contained in:
parent
0c9a4e0f93
commit
fe3a57976e
@ -65,6 +65,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
|
||||
- Fix [segmentation violation(SIGSEGV) error due to premature unlocking of watchableStore](https://github.com/etcd-io/etcd/pull/13505)
|
||||
- Fix [inconsistent log format](https://github.com/etcd-io/etcd/pull/13864)
|
||||
- Fix [Inconsistent revision and data occurs](https://github.com/etcd-io/etcd/pull/13854)
|
||||
- Fix [Etcdserver is still in progress of processing LeaseGrantRequest when it receives a LeaseKeepAliveRequest on the same leaseID](https://github.com/etcd-io/etcd/pull/13690)
|
||||
|
||||
### tools/benchmark
|
||||
|
||||
|
@ -275,6 +275,18 @@ func (s *EtcdServer) LeaseGrant(ctx context.Context, r *pb.LeaseGrantRequest) (*
|
||||
return resp.(*pb.LeaseGrantResponse), nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) waitAppliedIndex(ctx context.Context) error {
|
||||
select {
|
||||
case <-s.ApplyWait():
|
||||
case <-s.stopping:
|
||||
return ErrStopped
|
||||
case <-ctx.Done():
|
||||
return ErrTimeout
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
|
||||
resp, err := s.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseRevoke: r})
|
||||
if err != nil {
|
||||
@ -284,6 +296,15 @@ func (s *EtcdServer) LeaseRevoke(ctx context.Context, r *pb.LeaseRevokeRequest)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, error) {
|
||||
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
|
||||
defer cancel()
|
||||
|
||||
if s.isLeader() {
|
||||
if err := s.waitAppliedIndex(cctx); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
|
||||
ttl, err := s.lessor.Renew(id)
|
||||
if err == nil { // already requested to primary lessor(leader)
|
||||
return ttl, nil
|
||||
@ -292,9 +313,6 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
|
||||
return -1, err
|
||||
}
|
||||
|
||||
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
|
||||
defer cancel()
|
||||
|
||||
// renewals don't go through raft; forward to leader manually
|
||||
for cctx.Err() == nil {
|
||||
leader, lerr := s.waitLeader(cctx)
|
||||
@ -319,7 +337,13 @@ func (s *EtcdServer) LeaseRenew(ctx context.Context, id lease.LeaseID) (int64, e
|
||||
}
|
||||
|
||||
func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
|
||||
if s.Leader() == s.ID() {
|
||||
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
|
||||
defer cancel()
|
||||
|
||||
if s.isLeader() {
|
||||
if err := s.waitAppliedIndex(cctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// primary; timetolive directly from leader
|
||||
le := s.lessor.Lookup(lease.LeaseID(r.ID))
|
||||
if le == nil {
|
||||
@ -338,9 +362,6 @@ func (s *EtcdServer) LeaseTimeToLive(ctx context.Context, r *pb.LeaseTimeToLiveR
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
|
||||
defer cancel()
|
||||
|
||||
// forward to leader
|
||||
for cctx.Err() == nil {
|
||||
leader, err := s.waitLeader(cctx)
|
||||
|
@ -1411,7 +1411,7 @@ func (c *Cluster) Endpoints() []string {
|
||||
|
||||
func (c *Cluster) ClusterClient() (client *clientv3.Client, err error) {
|
||||
if c.clusterClient == nil {
|
||||
endpoints := []string{}
|
||||
var endpoints []string
|
||||
for _, m := range c.Members {
|
||||
endpoints = append(endpoints, m.GrpcURL)
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ package integration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
@ -487,17 +488,31 @@ func TestV3LeaseLeases(t *testing.T) {
|
||||
// it was oberserved that the immediate lease renewal after granting a lease from follower resulted lease not found.
|
||||
// related issue https://github.com/etcd-io/etcd/issues/6978
|
||||
func TestV3LeaseRenewStress(t *testing.T) {
|
||||
testLeaseStress(t, stressLeaseRenew)
|
||||
testLeaseStress(t, stressLeaseRenew, false)
|
||||
}
|
||||
|
||||
// TestV3LeaseRenewStressWithClusterClient is similar to TestV3LeaseRenewStress,
|
||||
// but it uses a cluster client instead of a specific member's client.
|
||||
// The related issue is https://github.com/etcd-io/etcd/issues/13675.
|
||||
func TestV3LeaseRenewStressWithClusterClient(t *testing.T) {
|
||||
testLeaseStress(t, stressLeaseRenew, true)
|
||||
}
|
||||
|
||||
// TestV3LeaseTimeToLiveStress keeps creating lease and retrieving it immediately to ensure the lease can be retrieved.
|
||||
// it was oberserved that the immediate lease retrieval after granting a lease from follower resulted lease not found.
|
||||
// related issue https://github.com/etcd-io/etcd/issues/6978
|
||||
func TestV3LeaseTimeToLiveStress(t *testing.T) {
|
||||
testLeaseStress(t, stressLeaseTimeToLive)
|
||||
testLeaseStress(t, stressLeaseTimeToLive, false)
|
||||
}
|
||||
|
||||
func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error) {
|
||||
// TestV3LeaseTimeToLiveStressWithClusterClient is similar to TestV3LeaseTimeToLiveStress,
|
||||
// but it uses a cluster client instead of a specific member's client.
|
||||
// The related issue is https://github.com/etcd-io/etcd/issues/13675.
|
||||
func TestV3LeaseTimeToLiveStressWithClusterClient(t *testing.T) {
|
||||
testLeaseStress(t, stressLeaseTimeToLive, true)
|
||||
}
|
||||
|
||||
func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient) error, useClusterClient bool) {
|
||||
integration.BeforeTest(t)
|
||||
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
@ -506,13 +521,23 @@ func testLeaseStress(t *testing.T, stresser func(context.Context, pb.LeaseClient
|
||||
defer cancel()
|
||||
errc := make(chan error)
|
||||
|
||||
for i := 0; i < 30; i++ {
|
||||
for j := 0; j < 3; j++ {
|
||||
go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clus.Client(i)).Lease) }(j)
|
||||
if useClusterClient {
|
||||
for i := 0; i < 300; i++ {
|
||||
clusterClient, err := clus.ClusterClient()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clusterClient).Lease) }(i)
|
||||
}
|
||||
} else {
|
||||
for i := 0; i < 100; i++ {
|
||||
for j := 0; j < 3; j++ {
|
||||
go func(i int) { errc <- stresser(ctx, integration.ToGRPC(clus.Client(i)).Lease) }(j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < 90; i++ {
|
||||
for i := 0; i < 300; i++ {
|
||||
if err := <-errc; err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -543,7 +568,7 @@ func stressLeaseRenew(tctx context.Context, lc pb.LeaseClient) (reterr error) {
|
||||
continue
|
||||
}
|
||||
if rresp.TTL == 0 {
|
||||
return fmt.Errorf("TTL shouldn't be 0 so soon")
|
||||
return errors.New("TTL shouldn't be 0 so soon")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
Loading…
x
Reference in New Issue
Block a user