mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13508 from serathius/checkpoints-fix
Lease Checkpoints fix
This commit is contained in:
commit
170d9b9d73
@ -14,6 +14,8 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.1...v3.5.2) and
|
||||
|
||||
### etcd server
|
||||
- Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13476).
|
||||
- Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to enable checkpoint persisting.
|
||||
- Fix [Lease checkpoints don't prevent to reset ttl on leader change](https://github.com/etcd-io/etcd/pull/13508), requires enabling checkpoint persisting.
|
||||
|
||||
<hr>
|
||||
|
||||
|
@ -33,9 +33,11 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
|
||||
|
||||
- Add [`etcd --log-format`](https://github.com/etcd-io/etcd/pull/13339) flag to support log format.
|
||||
- Add [`etcd --experimental-max-learners`](https://github.com/etcd-io/etcd/pull/13377) flag to allow configuration of learner max membership.
|
||||
- Add [`etcd --experimental-enable-lease-checkpoint-persist`](https://github.com/etcd-io/etcd/pull/13508) flag to handle upgrade from v3.5.2 clusters with this feature enabled.
|
||||
- Fix [non mutating requests pass through quotaKVServer when NOSPACE](https://github.com/etcd-io/etcd/pull/13435)
|
||||
- Fix [exclude the same alarm type activated by multiple peers](https://github.com/etcd-io/etcd/pull/13467).
|
||||
- Fix [Provide a better liveness probe for when etcd runs as a Kubernetes pod](https://github.com/etcd-io/etcd/pull/13399)
|
||||
- Fix [Lease checkpoints don't prevent to reset ttl on leader change](https://github.com/etcd-io/etcd/pull/13508).
|
||||
|
||||
### tools/benchmark
|
||||
|
||||
|
@ -149,10 +149,12 @@ type ServerConfig struct {
|
||||
|
||||
ForceNewCluster bool
|
||||
|
||||
// EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
||||
// EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
|
||||
EnableLeaseCheckpoint bool
|
||||
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
|
||||
LeaseCheckpointInterval time.Duration
|
||||
// LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
|
||||
LeaseCheckpointPersist bool
|
||||
|
||||
EnableGRPCGateway bool
|
||||
|
||||
|
@ -315,9 +315,14 @@ type Config struct {
|
||||
// Deprecated in v3.5.
|
||||
// TODO: Delete in v3.6 (https://github.com/etcd-io/etcd/issues/12913)
|
||||
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
|
||||
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
||||
// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
|
||||
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
|
||||
// Requires experimental-enable-lease-checkpoint to be enabled.
|
||||
// Deprecated in v3.6.
|
||||
// TODO: Delete in v3.7
|
||||
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
|
||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||
// ExperimentalCompactionSleepInterval is the sleep interval between every etcd compaction loop.
|
||||
ExperimentalCompactionSleepInterval time.Duration `json:"experimental-compaction-sleep-interval"`
|
||||
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
||||
@ -704,6 +709,14 @@ func (cfg *Config) Validate() error {
|
||||
}
|
||||
}
|
||||
|
||||
if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint {
|
||||
cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist")
|
||||
}
|
||||
|
||||
if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint {
|
||||
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -297,6 +297,56 @@ func TestPeerURLsMapAndTokenFromSRV(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeaseCheckpointValidate(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
configFunc func() Config
|
||||
expectError bool
|
||||
}{
|
||||
{
|
||||
name: "Default config should pass",
|
||||
configFunc: func() Config {
|
||||
return *NewConfig()
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Enabling checkpoint leases should pass",
|
||||
configFunc: func() Config {
|
||||
cfg := *NewConfig()
|
||||
cfg.ExperimentalEnableLeaseCheckpoint = true
|
||||
return cfg
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Enabling checkpoint leases and persist should pass",
|
||||
configFunc: func() Config {
|
||||
cfg := *NewConfig()
|
||||
cfg.ExperimentalEnableLeaseCheckpoint = true
|
||||
cfg.ExperimentalEnableLeaseCheckpointPersist = true
|
||||
return cfg
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Enabling checkpoint leases persist without checkpointing itself should fail",
|
||||
configFunc: func() Config {
|
||||
cfg := *NewConfig()
|
||||
cfg.ExperimentalEnableLeaseCheckpointPersist = true
|
||||
return cfg
|
||||
},
|
||||
expectError: true,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
cfg := tc.configFunc()
|
||||
err := cfg.Validate()
|
||||
if (err != nil) != tc.expectError {
|
||||
t.Errorf("config.Validate() = %q, expected error: %v", err, tc.expectError)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestLogRotation(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
@ -210,6 +210,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
||||
ExperimentalEnableDistributedTracing: cfg.ExperimentalEnableDistributedTracing,
|
||||
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
|
||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||
CompactionSleepInterval: cfg.ExperimentalCompactionSleepInterval,
|
||||
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
||||
|
@ -282,7 +282,9 @@ func newConfig() *config {
|
||||
fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
|
||||
|
||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
|
||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
|
||||
// TODO: delete in v3.7
|
||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
|
||||
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalCompactionSleepInterval, "experimental-compaction-sleep-interval", cfg.ec.ExperimentalCompactionSleepInterval, "Sets the sleep interval between each compaction batch.")
|
||||
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
||||
|
@ -344,9 +344,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
|
||||
|
||||
// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
|
||||
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
|
||||
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, lease.LessorConfig{
|
||||
srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
|
||||
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
|
||||
CheckpointInterval: cfg.LeaseCheckpointInterval,
|
||||
CheckpointPersist: cfg.LeaseCheckpointPersist,
|
||||
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
|
||||
})
|
||||
|
||||
|
@ -31,7 +31,7 @@ func TestRenewHTTP(t *testing.T) {
|
||||
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||
defer betesting.Close(t, be)
|
||||
|
||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||
le.Promote(time.Second)
|
||||
l, err := le.Grant(1, int64(5))
|
||||
if err != nil {
|
||||
@ -55,7 +55,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
|
||||
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||
defer betesting.Close(t, be)
|
||||
|
||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||
le.Promote(time.Second)
|
||||
l, err := le.Grant(1, int64(5))
|
||||
if err != nil {
|
||||
@ -96,7 +96,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
|
||||
be, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
|
||||
defer betesting.Close(t, be)
|
||||
|
||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||
le.Promote(time.Second)
|
||||
l, err := le.Grant(1, int64(5))
|
||||
if err != nil {
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/server/v3/lease/leasepb"
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
@ -37,6 +38,8 @@ const NoLease = LeaseID(0)
|
||||
// MaxLeaseTTL is the maximum lease TTL value
|
||||
const MaxLeaseTTL = 9000000000
|
||||
|
||||
var v3_6 = semver.Version{Major: 3, Minor: 6}
|
||||
|
||||
var (
|
||||
forever = time.Time{}
|
||||
|
||||
@ -180,19 +183,29 @@ type lessor struct {
|
||||
checkpointInterval time.Duration
|
||||
// the interval to check if the expired lease is revoked
|
||||
expiredLeaseRetryInterval time.Duration
|
||||
// whether lessor should always persist remaining TTL (always enabled in v3.6).
|
||||
checkpointPersist bool
|
||||
// cluster is used to adapt lessor logic based on cluster version
|
||||
cluster cluster
|
||||
}
|
||||
|
||||
type cluster interface {
|
||||
// Version is the cluster-wide minimum major.minor version.
|
||||
Version() *semver.Version
|
||||
}
|
||||
|
||||
type LessorConfig struct {
|
||||
MinLeaseTTL int64
|
||||
CheckpointInterval time.Duration
|
||||
ExpiredLeasesRetryInterval time.Duration
|
||||
CheckpointPersist bool
|
||||
}
|
||||
|
||||
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
|
||||
return newLessor(lg, b, cfg)
|
||||
func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
|
||||
return newLessor(lg, b, cluster, cfg)
|
||||
}
|
||||
|
||||
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
|
||||
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
|
||||
checkpointInterval := cfg.CheckpointInterval
|
||||
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
|
||||
if checkpointInterval == 0 {
|
||||
@ -210,11 +223,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
|
||||
minLeaseTTL: cfg.MinLeaseTTL,
|
||||
checkpointInterval: checkpointInterval,
|
||||
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
|
||||
checkpointPersist: cfg.CheckpointPersist,
|
||||
// expiredC is a small buffered chan to avoid unnecessary blocking.
|
||||
expiredC: make(chan []*Lease, 16),
|
||||
stopC: make(chan struct{}),
|
||||
doneC: make(chan struct{}),
|
||||
lg: lg,
|
||||
cluster: cluster,
|
||||
}
|
||||
l.initAndRecover()
|
||||
|
||||
@ -351,6 +366,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
|
||||
if l, ok := le.leaseMap[id]; ok {
|
||||
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
|
||||
l.remainingTTL = remainingTTL
|
||||
if le.shouldPersistCheckpoints() {
|
||||
l.persistTo(le.b)
|
||||
}
|
||||
if le.isPrimary() {
|
||||
// schedule the next checkpoint as needed
|
||||
le.scheduleCheckpointIfNeeded(l)
|
||||
@ -359,6 +377,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (le *lessor) shouldPersistCheckpoints() bool {
|
||||
cv := le.cluster.Version()
|
||||
return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6))
|
||||
}
|
||||
|
||||
func greaterOrEqual(first, second semver.Version) bool {
|
||||
return !first.LessThan(second)
|
||||
}
|
||||
|
||||
// Renew renews an existing lease. If the given lease does not exist or
|
||||
// has expired, an error will be returned.
|
||||
func (le *lessor) Renew(id LeaseID) (int64, error) {
|
||||
@ -446,6 +473,7 @@ func (le *lessor) Promote(extend time.Duration) {
|
||||
l.refresh(extend)
|
||||
item := &LeaseWithTime{id: l.ID, time: l.expiry}
|
||||
le.leaseExpiredNotifier.RegisterOrUpdate(item)
|
||||
le.scheduleCheckpointIfNeeded(l)
|
||||
}
|
||||
|
||||
if len(le.leaseMap) < leaseRevokeRate {
|
||||
@ -783,9 +811,10 @@ func (le *lessor) initAndRecover() {
|
||||
ttl: lpb.TTL,
|
||||
// itemSet will be filled in when recover key-value pairs
|
||||
// set expiry to forever, refresh when promoted
|
||||
itemSet: make(map[LeaseItem]struct{}),
|
||||
expiry: forever,
|
||||
revokec: make(chan struct{}),
|
||||
itemSet: make(map[LeaseItem]struct{}),
|
||||
expiry: forever,
|
||||
revokec: make(chan struct{}),
|
||||
remainingTTL: lpb.RemainingTTL,
|
||||
}
|
||||
}
|
||||
le.leaseExpiredNotifier.Init()
|
||||
|
@ -68,7 +68,7 @@ func setUp(t testing.TB) (le *lessor, tearDown func()) {
|
||||
be, _ := betesting.NewDefaultTmpBackend(t)
|
||||
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
|
||||
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
|
||||
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
|
||||
le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
|
||||
le.SetRangeDeleter(func() TxnDelete {
|
||||
ftd := &FakeTxnDelete{be.BatchTx()}
|
||||
ftd.Lock()
|
||||
|
@ -25,7 +25,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/go-semver/semver"
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/version"
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||
"go.uber.org/zap"
|
||||
@ -45,7 +47,7 @@ func TestLessorGrant(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
defer le.Stop()
|
||||
le.Promote(0)
|
||||
|
||||
@ -108,7 +110,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
defer le.Stop()
|
||||
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
||||
|
||||
@ -157,7 +159,7 @@ func TestLessorRevoke(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
defer le.Stop()
|
||||
var fd *fakeDeleter
|
||||
le.SetRangeDeleter(func() TxnDelete {
|
||||
@ -211,7 +213,7 @@ func TestLessorRenew(t *testing.T) {
|
||||
defer be.Close()
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
defer le.Stop()
|
||||
le.Promote(0)
|
||||
|
||||
@ -244,7 +246,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
|
||||
defer be.Close()
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
|
||||
for _, cp := range cp.GetCheckpoints() {
|
||||
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
|
||||
@ -293,7 +295,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
|
||||
dir, be := NewTestBackend(t)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
ttl := int64(10)
|
||||
for i := 1; i <= leaseRevokeRate*10; i++ {
|
||||
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
|
||||
@ -312,7 +314,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
|
||||
bcfg.Path = filepath.Join(dir, "be")
|
||||
be = backend.New(bcfg)
|
||||
defer be.Close()
|
||||
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
le = newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
defer le.Stop()
|
||||
|
||||
// extend after recovery should extend expiration on lease pile-up
|
||||
@ -342,7 +344,7 @@ func TestLessorDetach(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
defer le.Stop()
|
||||
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
||||
|
||||
@ -383,7 +385,7 @@ func TestLessorRecover(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
defer le.Stop()
|
||||
l1, err1 := le.Grant(1, 10)
|
||||
l2, err2 := le.Grant(2, 20)
|
||||
@ -392,7 +394,7 @@ func TestLessorRecover(t *testing.T) {
|
||||
}
|
||||
|
||||
// Create a new lessor with the same backend
|
||||
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
nle := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
defer nle.Stop()
|
||||
nl1 := nle.Lookup(l1.ID)
|
||||
if nl1 == nil || nl1.ttl != l1.ttl {
|
||||
@ -413,7 +415,7 @@ func TestLessorExpire(t *testing.T) {
|
||||
|
||||
testMinTTL := int64(1)
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL})
|
||||
defer le.Stop()
|
||||
|
||||
le.Promote(1 * time.Second)
|
||||
@ -466,7 +468,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
|
||||
|
||||
testMinTTL := int64(1)
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: testMinTTL})
|
||||
defer le.Stop()
|
||||
|
||||
le.Promote(1 * time.Second)
|
||||
@ -515,7 +517,7 @@ func TestLessorMaxTTL(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
defer le.Stop()
|
||||
|
||||
_, err := le.Grant(1, MaxLeaseTTL+1)
|
||||
@ -531,7 +533,8 @@ func TestLessorCheckpointScheduling(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
|
||||
defer le.Stop()
|
||||
le.minLeaseTTL = 1
|
||||
checkpointedC := make(chan struct{})
|
||||
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
|
||||
@ -544,13 +547,11 @@ func TestLessorCheckpointScheduling(t *testing.T) {
|
||||
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
|
||||
}
|
||||
})
|
||||
defer le.Stop()
|
||||
le.Promote(0)
|
||||
|
||||
_, err := le.Grant(1, 2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
le.Promote(0)
|
||||
|
||||
// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
|
||||
select {
|
||||
@ -566,7 +567,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||
defer le.Stop()
|
||||
l, err := le.Grant(1, 10)
|
||||
if err != nil {
|
||||
@ -580,6 +581,75 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
|
||||
const ttl int64 = 10
|
||||
const checkpointTTL int64 = 5
|
||||
|
||||
tcs := []struct {
|
||||
name string
|
||||
cluster cluster
|
||||
checkpointPersist bool
|
||||
expectRemainingTTL int64
|
||||
}{
|
||||
{
|
||||
name: "Etcd v3.6 and newer persist remainingTTL on checkpoint",
|
||||
cluster: clusterLatest(),
|
||||
expectRemainingTTL: checkpointTTL,
|
||||
},
|
||||
{
|
||||
name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set",
|
||||
cluster: clusterV3_5(),
|
||||
checkpointPersist: true,
|
||||
expectRemainingTTL: checkpointTTL,
|
||||
},
|
||||
{
|
||||
name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set",
|
||||
cluster: clusterNil(),
|
||||
checkpointPersist: true,
|
||||
expectRemainingTTL: checkpointTTL,
|
||||
},
|
||||
{
|
||||
name: "Etcd v3.5 and older reset remainingTTL on checkpoint",
|
||||
cluster: clusterV3_5(),
|
||||
expectRemainingTTL: ttl,
|
||||
},
|
||||
{
|
||||
name: "Etcd with version unknown fallbacks to v3.5 behavior",
|
||||
cluster: clusterNil(),
|
||||
expectRemainingTTL: ttl,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
lg := zap.NewNop()
|
||||
dir, be := NewTestBackend(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer be.Close()
|
||||
|
||||
cfg := LessorConfig{MinLeaseTTL: minLeaseTTL}
|
||||
cfg.CheckpointPersist = tc.checkpointPersist
|
||||
le := newLessor(lg, be, tc.cluster, cfg)
|
||||
l, err := le.Grant(2, ttl)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if l.RemainingTTL() != ttl {
|
||||
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl)
|
||||
}
|
||||
le.Checkpoint(2, checkpointTTL)
|
||||
if l.RemainingTTL() != checkpointTTL {
|
||||
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL)
|
||||
}
|
||||
le.Stop()
|
||||
le2 := newLessor(lg, be, clusterLatest(), cfg)
|
||||
l = le2.Lookup(2)
|
||||
if l.RemainingTTL() != tc.expectRemainingTTL {
|
||||
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type fakeDeleter struct {
|
||||
deleted []string
|
||||
tx backend.BatchTx
|
||||
@ -607,3 +677,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
|
||||
bcfg.Path = filepath.Join(tmpPath, "be")
|
||||
return tmpPath, backend.New(bcfg)
|
||||
}
|
||||
|
||||
func clusterLatest() cluster {
|
||||
return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")}
|
||||
}
|
||||
|
||||
func clusterV3_5() cluster {
|
||||
return fakeCluster{semver.New("3.5.0")}
|
||||
}
|
||||
|
||||
func clusterNil() cluster {
|
||||
return fakeCluster{}
|
||||
}
|
||||
|
||||
type fakeCluster struct {
|
||||
version *semver.Version
|
||||
}
|
||||
|
||||
func (c fakeCluster) Version() *semver.Version {
|
||||
return c.version
|
||||
}
|
||||
|
@ -168,6 +168,7 @@ type ClusterConfig struct {
|
||||
|
||||
EnableLeaseCheckpoint bool
|
||||
LeaseCheckpointInterval time.Duration
|
||||
LeaseCheckpointPersist bool
|
||||
|
||||
WatchProgressNotifyInterval time.Duration
|
||||
ExperimentalMaxLearners int
|
||||
@ -331,6 +332,7 @@ func (c *Cluster) mustNewMember(t testutil.TB, memberNumber int64) *Member {
|
||||
UseTCP: c.Cfg.UseTCP,
|
||||
EnableLeaseCheckpoint: c.Cfg.EnableLeaseCheckpoint,
|
||||
LeaseCheckpointInterval: c.Cfg.LeaseCheckpointInterval,
|
||||
LeaseCheckpointPersist: c.Cfg.LeaseCheckpointPersist,
|
||||
WatchProgressNotifyInterval: c.Cfg.WatchProgressNotifyInterval,
|
||||
ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners,
|
||||
})
|
||||
@ -634,6 +636,7 @@ type MemberConfig struct {
|
||||
UseTCP bool
|
||||
EnableLeaseCheckpoint bool
|
||||
LeaseCheckpointInterval time.Duration
|
||||
LeaseCheckpointPersist bool
|
||||
WatchProgressNotifyInterval time.Duration
|
||||
ExperimentalMaxLearners int
|
||||
}
|
||||
@ -733,6 +736,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
|
||||
m.UseTCP = mcfg.UseTCP
|
||||
m.EnableLeaseCheckpoint = mcfg.EnableLeaseCheckpoint
|
||||
m.LeaseCheckpointInterval = mcfg.LeaseCheckpointInterval
|
||||
m.LeaseCheckpointPersist = mcfg.LeaseCheckpointPersist
|
||||
|
||||
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
|
||||
|
||||
|
@ -230,56 +230,109 @@ func TestV3LeaseKeepAlive(t *testing.T) {
|
||||
// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted
|
||||
// across leader elections.
|
||||
func TestV3LeaseCheckpoint(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
|
||||
var ttl int64 = 300
|
||||
leaseInterval := 2 * time.Second
|
||||
clus := integration.NewClusterV3(t, &integration.ClusterConfig{
|
||||
Size: 3,
|
||||
EnableLeaseCheckpoint: true,
|
||||
LeaseCheckpointInterval: leaseInterval,
|
||||
UseBridge: true,
|
||||
})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
// create lease
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
c := integration.ToGRPC(clus.RandClient())
|
||||
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
tcs := []struct {
|
||||
name string
|
||||
checkpointingEnabled bool
|
||||
ttl time.Duration
|
||||
checkpointingInterval time.Duration
|
||||
leaderChanges int
|
||||
clusterSize int
|
||||
expectTTLIsGT time.Duration
|
||||
expectTTLIsLT time.Duration
|
||||
}{
|
||||
{
|
||||
name: "Checkpointing disabled, lease TTL is reset",
|
||||
ttl: 300 * time.Second,
|
||||
leaderChanges: 1,
|
||||
clusterSize: 3,
|
||||
expectTTLIsGT: 298 * time.Second,
|
||||
},
|
||||
{
|
||||
name: "Checkpointing enabled 10s, lease TTL is preserved after leader change",
|
||||
ttl: 300 * time.Second,
|
||||
checkpointingEnabled: true,
|
||||
checkpointingInterval: 10 * time.Second,
|
||||
leaderChanges: 1,
|
||||
clusterSize: 3,
|
||||
expectTTLIsLT: 290 * time.Second,
|
||||
},
|
||||
{
|
||||
name: "Checkpointing enabled 10s, lease TTL is preserved after cluster restart",
|
||||
ttl: 300 * time.Second,
|
||||
checkpointingEnabled: true,
|
||||
checkpointingInterval: 10 * time.Second,
|
||||
leaderChanges: 1,
|
||||
clusterSize: 1,
|
||||
expectTTLIsLT: 290 * time.Second,
|
||||
},
|
||||
{
|
||||
// Checking if checkpointing continues after the first leader change.
|
||||
name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes",
|
||||
ttl: 300 * time.Second,
|
||||
checkpointingEnabled: true,
|
||||
checkpointingInterval: 10 * time.Second,
|
||||
leaderChanges: 2,
|
||||
clusterSize: 3,
|
||||
expectTTLIsLT: 280 * time.Second,
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
config := &integration.ClusterConfig{
|
||||
Size: tc.clusterSize,
|
||||
EnableLeaseCheckpoint: tc.checkpointingEnabled,
|
||||
LeaseCheckpointInterval: tc.checkpointingInterval,
|
||||
}
|
||||
clus := integration.NewClusterV3(t, config)
|
||||
defer clus.Terminate(t)
|
||||
|
||||
// wait for a checkpoint to occur
|
||||
time.Sleep(leaseInterval + 1*time.Second)
|
||||
|
||||
// Force a leader election
|
||||
leaderId := clus.WaitLeader(t)
|
||||
leader := clus.Members[leaderId]
|
||||
leader.Stop(t)
|
||||
time.Sleep(time.Duration(3*integration.ElectionTicks) * integration.TickDuration)
|
||||
leader.Restart(t)
|
||||
newLeaderId := clus.WaitLeader(t)
|
||||
c2 := integration.ToGRPC(clus.Client(newLeaderId))
|
||||
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
// Check the TTL of the new leader
|
||||
var ttlresp *pb.LeaseTimeToLiveResponse
|
||||
for i := 0; i < 10; i++ {
|
||||
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
|
||||
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
|
||||
time.Sleep(time.Millisecond * 250)
|
||||
} else {
|
||||
// create lease
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
c := integration.ToGRPC(clus.RandClient())
|
||||
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
expectedTTL := ttl - int64(leaseInterval.Seconds())
|
||||
if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL {
|
||||
t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL)
|
||||
for i := 0; i < tc.leaderChanges; i++ {
|
||||
// wait for a checkpoint to occur
|
||||
time.Sleep(tc.checkpointingInterval + 1*time.Second)
|
||||
|
||||
// Force a leader election
|
||||
leaderId := clus.WaitLeader(t)
|
||||
leader := clus.Members[leaderId]
|
||||
leader.Stop(t)
|
||||
time.Sleep(time.Duration(3*integration.ElectionTicks) * integration.TickDuration)
|
||||
leader.Restart(t)
|
||||
}
|
||||
|
||||
newLeaderId := clus.WaitLeader(t)
|
||||
c2 := integration.ToGRPC(clus.Client(newLeaderId))
|
||||
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
// Check the TTL of the new leader
|
||||
var ttlresp *pb.LeaseTimeToLiveResponse
|
||||
for i := 0; i < 10; i++ {
|
||||
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
|
||||
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
|
||||
time.Sleep(time.Millisecond * 250)
|
||||
} else {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT {
|
||||
t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT)
|
||||
}
|
||||
|
||||
if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT {
|
||||
t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user