diff --git a/CHANGELOG-3.5.md b/CHANGELOG-3.5.md
index cc38797ed..0e66b7d70 100644
--- a/CHANGELOG-3.5.md
+++ b/CHANGELOG-3.5.md
@@ -14,6 +14,8 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.1...v3.5.2) and
### etcd server
- Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13476).
+- Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to enable checkpoint persisting.
+- Fix [Lease checkpoints don't prevent to reset ttl on leader change](https://github.com/etcd-io/etcd/pull/13508), requires enabling checkpoint persisting.
diff --git a/CHANGELOG-3.6.md b/CHANGELOG-3.6.md
index 3f6b28d5f..9f23560d7 100644
--- a/CHANGELOG-3.6.md
+++ b/CHANGELOG-3.6.md
@@ -33,9 +33,11 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
- Add [`etcd --log-format`](https://github.com/etcd-io/etcd/pull/13339) flag to support log format.
- Add [`etcd --experimental-max-learners`](https://github.com/etcd-io/etcd/pull/13377) flag to allow configuration of learner max membership.
+- Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to handle upgrade from v3.5.2 clusters with this feature enabled.
- Fix [non mutating requests pass through quotaKVServer when NOSPACE](https://github.com/etcd-io/etcd/pull/13435)
- Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13467).
- Fix [Provide a better liveness probe for when etcd runs as a Kubernetes pod](https://github.com/etcd-io/etcd/pull/13399)
+- Fix [Lease checkpoints don't prevent to reset ttl on leader change](https://github.com/etcd-io/etcd/pull/13508).
### tools/benchmark
diff --git a/server/config/config.go b/server/config/config.go
index 74587efd6..43ecab7ec 100644
--- a/server/config/config.go
+++ b/server/config/config.go
@@ -149,10 +149,12 @@ type ServerConfig struct {
ForceNewCluster bool
- // EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
+ // EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
EnableLeaseCheckpoint bool
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
LeaseCheckpointInterval time.Duration
+ // LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
+ LeaseCheckpointPersist bool
EnableGRPCGateway bool
diff --git a/server/embed/config.go b/server/embed/config.go
index ecec546a0..c168fe841 100644
--- a/server/embed/config.go
+++ b/server/embed/config.go
@@ -315,9 +315,14 @@ type Config struct {
// Deprecated in v3.5.
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
- // ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
+ // ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
- ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
+ // ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
+ // Requires experimental-enable-lease-checkpoint to be enabled.
+ // Deprecated in v3.6.
+ // TODO: Delete in v3.7
+ ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
+ ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
// ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop.
ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
@@ -704,6 +709,14 @@ func (cfg *Config) Validate() error {
}
}
+ if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint {
+ cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist")
+ }
+
+ if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint {
+ return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
+ }
+
return nil
}
diff --git a/server/embed/config_test.go b/server/embed/config_test.go
index e7da274b2..bb5c08880 100644
--- a/server/embed/config_test.go
+++ b/server/embed/config_test.go
@@ -297,6 +297,56 @@ func TestPeerURLsMapAndTokenFromSRV(t *testing.T) {
}
}
+func TestLeaseCheckpointValidate(t *testing.T) {
+ tcs := []struct {
+ name string
+ configFunc func() Config
+ expectError bool
+ }{
+ {
+ name: "Default config should pass",
+ configFunc: func() Config {
+ return *NewConfig()
+ },
+ },
+ {
+ name: "Enabling checkpoint leases should pass",
+ configFunc: func() Config {
+ cfg := *NewConfig()
+ cfg.ExperimentalEnableLeaseCheckpoint = true
+ return cfg
+ },
+ },
+ {
+ name: "Enabling checkpoint leases and persist should pass",
+ configFunc: func() Config {
+ cfg := *NewConfig()
+ cfg.ExperimentalEnableLeaseCheckpoint = true
+ cfg.ExperimentalEnableLeaseCheckpointPersist = true
+ return cfg
+ },
+ },
+ {
+ name: "Enabling checkpoint leases persist without checkpointing itself should fail",
+ configFunc: func() Config {
+ cfg := *NewConfig()
+ cfg.ExperimentalEnableLeaseCheckpointPersist = true
+ return cfg
+ },
+ expectError: true,
+ },
+ }
+ for _, tc := range tcs {
+ t.Run(tc.name, func(t *testing.T) {
+ cfg := tc.configFunc()
+ err := cfg.Validate()
+ if (err != nil) != tc.expectError {
+ t.Errorf("config.Validate() = %q, expected error: %v", err, tc.expectError)
+ }
+ })
+ }
+}
+
func TestLogRotation(t *testing.T) {
tests := []struct {
name string
diff --git a/server/embed/etcd.go b/server/embed/etcd.go
index 5970437f0..a3ba75a56 100644
--- a/server/embed/etcd.go
+++ b/server/embed/etcd.go
@@ -210,6 +210,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
+ LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go
index 52c99eb6b..eadc5d637 100644
--- a/server/etcdmain/config.go
+++ b/server/etcdmain/config.go
@@ -282,7 +282,9 @@ func newConfig() *config {
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
- fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
+ fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
+ // TODO: delete in v3.7
+ fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go
index 2de2477ec..884caf411 100644
--- a/server/etcdserver/server.go
+++ b/server/etcdserver/server.go
@@ -344,9 +344,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
- srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{
+ srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
+ CheckpointPersist: cfg.LeaseCheckpointPersist,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})
diff --git a/server/lease/leasehttp/http_test.go b/server/lease/leasehttp/http_test.go
index 4fb8fd7eb..e614b4033 100644
--- a/server/lease/leasehttp/http_test.go
+++ b/server/lease/leasehttp/http_test.go
@@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)
- le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
+ le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
@@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)
- le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
+ le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
@@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, be)
- le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
+ le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
diff --git a/server/lease/lessor.go b/server/lease/lessor.go
index 715b82079..0a77fd669 100644
--- a/server/lease/lessor.go
+++ b/server/lease/lessor.go
@@ -24,6 +24,7 @@ import (
"sync"
"time"
+ "github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/storage/backend"
@@ -37,6 +38,8 @@ const NoLease = LeaseID(0)
// MaxLeaseTTL is the maximum lease TTL value
const MaxLeaseTTL = 9000000000
+var v3_6 = semver.Version{Major: 3, Minor: 6}
+
var (
forever = time.Time{}
@@ -180,19 +183,29 @@ type lessor struct {
checkpointInterval time.Duration
// the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration
+ // whether lessor should always persist remaining TTL (always enabled in v3.6).
+ checkpointPersist bool
+ // cluster is used to adapt lessor logic based on cluster version
+ cluster cluster
+}
+
+type cluster interface {
+ // Version is the cluster-wide minimum major.minor version.
+ Version() *semver.Version
}
type LessorConfig struct {
MinLeaseTTL int64
CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration
+ CheckpointPersist bool
}
-func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
- return newLessor(lg, b, cfg)
+func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
+ return newLessor(lg, b, cluster, cfg)
}
-func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
+func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 {
@@ -210,11 +223,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
+ checkpointPersist: cfg.CheckpointPersist,
// expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
lg: lg,
+ cluster: cluster,
}
l.initAndRecover()
@@ -351,6 +366,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL
+ if le.shouldPersistCheckpoints() {
+ l.persistTo(le.b)
+ }
if le.isPrimary() {
// schedule the next checkpoint as needed
le.scheduleCheckpointIfNeeded(l)
@@ -359,6 +377,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
return nil
}
+func (le *lessor) shouldPersistCheckpoints() bool {
+ cv := le.cluster.Version()
+ return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6))
+}
+
+func greaterOrEqual(first, second semver.Version) bool {
+ return !first.LessThan(second)
+}
+
// Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) {
@@ -446,6 +473,7 @@ func (le *lessor) Promote(extend time.Duration) {
l.refresh(extend)
item := &LeaseWithTime{id: l.ID, time: l.expiry}
le.leaseExpiredNotifier.RegisterOrUpdate(item)
+ le.scheduleCheckpointIfNeeded(l)
}
if len(le.leaseMap) < leaseRevokeRate {
@@ -783,9 +811,10 @@ func (le *lessor) initAndRecover() {
ttl: lpb.TTL,
// itemSet will be filled in when recover key-value pairs
// set expiry to forever, refresh when promoted
- itemSet: make(map[LeaseItem]struct{}),
- expiry: forever,
- revokec: make(chan struct{}),
+ itemSet: make(map[LeaseItem]struct{}),
+ expiry: forever,
+ revokec: make(chan struct{}),
+ remainingTTL: lpb.RemainingTTL,
}
}
le.leaseExpiredNotifier.Init()
diff --git a/server/lease/lessor_bench_test.go b/server/lease/lessor_bench_test.go
index 570663deb..86f9d1bf5 100644
--- a/server/lease/lessor_bench_test.go
+++ b/server/lease/lessor_bench_test.go
@@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) {
be, _ := betesting.NewDefaultTmpBackend(t)
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
- le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
+ le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le.SetRangeDeleter(func() TxnDelete {
ftd := &FakeTxnDelete{be.BatchTx()}
ftd.Lock()
diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go
index e5fb16e0d..27ca86fd8 100644
--- a/server/lease/lessor_test.go
+++ b/server/lease/lessor_test.go
@@ -25,7 +25,9 @@ import (
"testing"
"time"
+ "github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
+ "go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.uber.org/zap"
@@ -45,7 +47,7 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)
@@ -108,7 +110,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@@ -157,7 +159,7 @@ func TestLessorRevoke(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
var fd *fakeDeleter
le.SetRangeDeleter(func() TxnDelete {
@@ -211,7 +213,7 @@ func TestLessorRenew(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)
@@ -244,7 +246,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
@@ -293,7 +295,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
@@ -312,7 +314,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
- le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le = newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
// extend after recovery should extend expiration on lease pile-up
@@ -342,7 +344,7 @@ func TestLessorDetach(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@@ -383,7 +385,7 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20)
@@ -392,7 +394,7 @@ func TestLessorRecover(t *testing.T) {
}
// Create a new lessor with the same backend
- nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ nle := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer nle.Stop()
nl1 := nle.Lookup(l1.ID)
if nl1 == nil || nl1.ttl != l1.ttl {
@@ -413,7 +415,7 @@ func TestLessorExpire(t *testing.T) {
testMinTTL := int64(1)
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()
le.Promote(1 * time.Second)
@@ -466,7 +468,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
testMinTTL := int64(1)
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()
le.Promote(1 * time.Second)
@@ -515,7 +517,7 @@ func TestLessorMaxTTL(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
_, err := le.Grant(1, MaxLeaseTTL+1)
@@ -531,7 +533,8 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
+ defer le.Stop()
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
@@ -544,13 +547,11 @@ func TestLessorCheckpointScheduling(t *testing.T) {
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
}
})
- defer le.Stop()
- le.Promote(0)
-
_, err := le.Grant(1, 2)
if err != nil {
t.Fatal(err)
}
+ le.Promote(0)
// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
select {
@@ -566,7 +567,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
- le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
+ le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
l, err := le.Grant(1, 10)
if err != nil {
@@ -580,6 +581,75 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
}
}
+func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
+ const ttl int64 = 10
+ const checkpointTTL int64 = 5
+
+ tcs := []struct {
+ name string
+ cluster cluster
+ checkpointPersist bool
+ expectRemainingTTL int64
+ }{
+ {
+ name: "Etcd v3.6 and newer persist remainingTTL on checkpoint",
+ cluster: clusterLatest(),
+ expectRemainingTTL: checkpointTTL,
+ },
+ {
+ name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set",
+ cluster: clusterV3_5(),
+ checkpointPersist: true,
+ expectRemainingTTL: checkpointTTL,
+ },
+ {
+ name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set",
+ cluster: clusterNil(),
+ checkpointPersist: true,
+ expectRemainingTTL: checkpointTTL,
+ },
+ {
+ name: "Etcd v3.5 and older reset remainingTTL on checkpoint",
+ cluster: clusterV3_5(),
+ expectRemainingTTL: ttl,
+ },
+ {
+ name: "Etcd with version unknown fallbacks to v3.5 behavior",
+ cluster: clusterNil(),
+ expectRemainingTTL: ttl,
+ },
+ }
+ for _, tc := range tcs {
+ t.Run(tc.name, func(t *testing.T) {
+ lg := zap.NewNop()
+ dir, be := NewTestBackend(t)
+ defer os.RemoveAll(dir)
+ defer be.Close()
+
+ cfg := LessorConfig{MinLeaseTTL: minLeaseTTL}
+ cfg.CheckpointPersist = tc.checkpointPersist
+ le := newLessor(lg, be, tc.cluster, cfg)
+ l, err := le.Grant(2, ttl)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if l.RemainingTTL() != ttl {
+ t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl)
+ }
+ le.Checkpoint(2, checkpointTTL)
+ if l.RemainingTTL() != checkpointTTL {
+ t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL)
+ }
+ le.Stop()
+ le2 := newLessor(lg, be, clusterLatest(), cfg)
+ l = le2.Lookup(2)
+ if l.RemainingTTL() != tc.expectRemainingTTL {
+ t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL)
+ }
+ })
+ }
+}
+
type fakeDeleter struct {
deleted []string
tx backend.BatchTx
@@ -607,3 +677,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
bcfg.Path = filepath.Join(tmpPath, "be")
return tmpPath, backend.New(bcfg)
}
+
+func clusterLatest() cluster {
+ return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")}
+}
+
+func clusterV3_5() cluster {
+ return fakeCluster{semver.New("3.5.0")}
+}
+
+func clusterNil() cluster {
+ return fakeCluster{}
+}
+
+type fakeCluster struct {
+ version *semver.Version
+}
+
+func (c fakeCluster) Version() *semver.Version {
+ return c.version
+}
diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go
index 631e00689..4021ff676 100644
--- a/tests/framework/integration/cluster.go
+++ b/tests/framework/integration/cluster.go
@@ -168,6 +168,7 @@ type ClusterConfig struct {
EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration
+ LeaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration
ExperimentalMaxLearners int
@@ -331,6 +332,7 @@ func (c *Cluster) mustNewMember(t testutil.TB, memberNumber int64) *Member {
UseTCP: c.Cfg.UseTCP,
EnableLeaseCheckpoint: c.Cfg.EnableLeaseCheckpoint,
LeaseCheckpointInterval: c.Cfg.LeaseCheckpointInterval,
+ LeaseCheckpointPersist: c.Cfg.LeaseCheckpointPersist,
WatchProgressNotifyInterval: c.Cfg.WatchProgressNotifyInterval,
ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners,
})
@@ -634,6 +636,7 @@ type MemberConfig struct {
UseTCP bool
EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration
+ LeaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration
ExperimentalMaxLearners int
}
@@ -733,6 +736,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
m.UseTCP = mcfg.UseTCP
m.EnableLeaseCheckpoint = mcfg.EnableLeaseCheckpoint
m.LeaseCheckpointInterval = mcfg.LeaseCheckpointInterval
+ m.LeaseCheckpointPersist = mcfg.LeaseCheckpointPersist
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go
index 40bced9c3..240e6bcba 100644
--- a/tests/integration/v3_lease_test.go
+++ b/tests/integration/v3_lease_test.go
@@ -230,56 +230,109 @@ func TestV3LeaseKeepAlive(t *testing.T) {
// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted
// across leader elections.
func TestV3LeaseCheckpoint(t *testing.T) {
- integration.BeforeTest(t)
-
- var ttl int64 = 300
- leaseInterval := 2 * time.Second
- clus := integration.NewClusterV3(t, &integration.ClusterConfig{
- Size: 3,
- EnableLeaseCheckpoint: true,
- LeaseCheckpointInterval: leaseInterval,
- UseBridge: true,
- })
- defer clus.Terminate(t)
-
- // create lease
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- c := integration.ToGRPC(clus.RandClient())
- lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl})
- if err != nil {
- t.Fatal(err)
+ tcs := []struct {
+ name string
+ checkpointingEnabled bool
+ ttl time.Duration
+ checkpointingInterval time.Duration
+ leaderChanges int
+ clusterSize int
+ expectTTLIsGT time.Duration
+ expectTTLIsLT time.Duration
+ }{
+ {
+ name: "Checkpointing disabled, lease TTL is reset",
+ ttl: 300 * time.Second,
+ leaderChanges: 1,
+ clusterSize: 3,
+ expectTTLIsGT: 298 * time.Second,
+ },
+ {
+ name: "Checkpointing enabled 10s, lease TTL is preserved after leader change",
+ ttl: 300 * time.Second,
+ checkpointingEnabled: true,
+ checkpointingInterval: 10 * time.Second,
+ leaderChanges: 1,
+ clusterSize: 3,
+ expectTTLIsLT: 290 * time.Second,
+ },
+ {
+ name: "Checkpointing enabled 10s, lease TTL is preserved after cluster restart",
+ ttl: 300 * time.Second,
+ checkpointingEnabled: true,
+ checkpointingInterval: 10 * time.Second,
+ leaderChanges: 1,
+ clusterSize: 1,
+ expectTTLIsLT: 290 * time.Second,
+ },
+ {
+ // Checking if checkpointing continues after the first leader change.
+ name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes",
+ ttl: 300 * time.Second,
+ checkpointingEnabled: true,
+ checkpointingInterval: 10 * time.Second,
+ leaderChanges: 2,
+ clusterSize: 3,
+ expectTTLIsLT: 280 * time.Second,
+ },
}
+ for _, tc := range tcs {
+ t.Run(tc.name, func(t *testing.T) {
+ integration.BeforeTest(t)
+ config := &integration.ClusterConfig{
+ Size: tc.clusterSize,
+ EnableLeaseCheckpoint: tc.checkpointingEnabled,
+ LeaseCheckpointInterval: tc.checkpointingInterval,
+ }
+ clus := integration.NewClusterV3(t, config)
+ defer clus.Terminate(t)
- // wait for a checkpoint to occur
- time.Sleep(leaseInterval + 1*time.Second)
-
- // Force a leader election
- leaderId := clus.WaitLeader(t)
- leader := clus.Members[leaderId]
- leader.Stop(t)
- time.Sleep(time.Duration(3*integration.ElectionTicks) * integration.TickDuration)
- leader.Restart(t)
- newLeaderId := clus.WaitLeader(t)
- c2 := integration.ToGRPC(clus.Client(newLeaderId))
-
- time.Sleep(250 * time.Millisecond)
-
- // Check the TTL of the new leader
- var ttlresp *pb.LeaseTimeToLiveResponse
- for i := 0; i < 10; i++ {
- if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
- if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
- time.Sleep(time.Millisecond * 250)
- } else {
+ // create lease
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ c := integration.ToGRPC(clus.RandClient())
+ lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())})
+ if err != nil {
t.Fatal(err)
}
- }
- }
- expectedTTL := ttl - int64(leaseInterval.Seconds())
- if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL {
- t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL)
+ for i := 0; i < tc.leaderChanges; i++ {
+ // wait for a checkpoint to occur
+ time.Sleep(tc.checkpointingInterval + 1*time.Second)
+
+ // Force a leader election
+ leaderId := clus.WaitLeader(t)
+ leader := clus.Members[leaderId]
+ leader.Stop(t)
+ time.Sleep(time.Duration(3*integration.ElectionTicks) * integration.TickDuration)
+ leader.Restart(t)
+ }
+
+ newLeaderId := clus.WaitLeader(t)
+ c2 := integration.ToGRPC(clus.Client(newLeaderId))
+
+ time.Sleep(250 * time.Millisecond)
+
+ // Check the TTL of the new leader
+ var ttlresp *pb.LeaseTimeToLiveResponse
+ for i := 0; i < 10; i++ {
+ if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
+ if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
+ time.Sleep(time.Millisecond * 250)
+ } else {
+ t.Fatal(err)
+ }
+ }
+ }
+
+ if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT {
+ t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT)
+ }
+
+ if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT {
+ t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT)
+ }
+ })
}
}