mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
server: Require either cluster version v3.6 or --experimental-enable-lease-checkpoint-persist to persist lease remainingTTL
To avoid inconsistant behavior during cluster upgrade we are feature gating persistance behind cluster version. This should ensure that all cluster members are upgraded to v3.6 before changing behavior. To allow backporting this fix to v3.5 we are also introducing flag --experimental-enable-lease-checkpoint-persist that will allow for smooth upgrade in v3.5 clusters with this feature enabled. Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
This commit is contained in:
parent
8d83691d53
commit
8f4735dfd4
@ -391,7 +391,7 @@ func (s *v3Manager) saveDB() error {
|
|||||||
be := backend.NewDefaultBackend(dbpath)
|
be := backend.NewDefaultBackend(dbpath)
|
||||||
|
|
||||||
// a lessor never timeouts leases
|
// a lessor never timeouts leases
|
||||||
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
|
lessor := lease.NewLessor(s.lg, be, nil, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
|
||||||
|
|
||||||
mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
|
mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
|
||||||
txn := mvs.Write(traceutil.TODO())
|
txn := mvs.Write(traceutil.TODO())
|
||||||
|
@ -288,8 +288,13 @@ type Config struct {
|
|||||||
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
|
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
|
||||||
// ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses (array and map are supported types).
|
// ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses (array and map are supported types).
|
||||||
ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
|
ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
|
||||||
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
|
||||||
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
|
||||||
|
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
|
||||||
|
// Requires experimental-enable-lease-checkpoint to be enabled.
|
||||||
|
// Deprecated in v3.6.
|
||||||
|
// TODO: Delete in v3.7
|
||||||
|
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
|
||||||
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
|
||||||
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
|
||||||
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
|
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
|
||||||
@ -637,6 +642,14 @@ func (cfg *Config) Validate() error {
|
|||||||
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
|
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint {
|
||||||
|
cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist")
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint {
|
||||||
|
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,6 +208,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
|
|||||||
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
EnableGRPCGateway: cfg.EnableGRPCGateway,
|
||||||
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
UnsafeNoFsync: cfg.UnsafeNoFsync,
|
||||||
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
|
||||||
|
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
|
||||||
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
|
||||||
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
|
||||||
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
|
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,
|
||||||
|
@ -257,7 +257,9 @@ func newConfig() *config {
|
|||||||
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
|
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
|
||||||
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
|
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
|
||||||
fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
|
fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
|
||||||
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
|
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
|
||||||
|
// TODO: delete in v3.7
|
||||||
|
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
|
||||||
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
|
||||||
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
|
||||||
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
|
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")
|
||||||
|
@ -158,10 +158,12 @@ type ServerConfig struct {
|
|||||||
|
|
||||||
ForceNewCluster bool
|
ForceNewCluster bool
|
||||||
|
|
||||||
// EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
|
// EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
|
||||||
EnableLeaseCheckpoint bool
|
EnableLeaseCheckpoint bool
|
||||||
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
|
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
|
||||||
LeaseCheckpointInterval time.Duration
|
LeaseCheckpointInterval time.Duration
|
||||||
|
// LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
|
||||||
|
LeaseCheckpointPersist bool
|
||||||
|
|
||||||
EnableGRPCGateway bool
|
EnableGRPCGateway bool
|
||||||
|
|
||||||
|
@ -543,9 +543,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
|
|||||||
srv.lessor = lease.NewLessor(
|
srv.lessor = lease.NewLessor(
|
||||||
srv.getLogger(),
|
srv.getLogger(),
|
||||||
srv.be,
|
srv.be,
|
||||||
|
srv.cluster,
|
||||||
lease.LessorConfig{
|
lease.LessorConfig{
|
||||||
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
|
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
|
||||||
CheckpointInterval: cfg.LeaseCheckpointInterval,
|
CheckpointInterval: cfg.LeaseCheckpointInterval,
|
||||||
|
CheckpointPersist: cfg.LeaseCheckpointPersist,
|
||||||
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
|
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -152,6 +152,7 @@ type ClusterConfig struct {
|
|||||||
|
|
||||||
EnableLeaseCheckpoint bool
|
EnableLeaseCheckpoint bool
|
||||||
LeaseCheckpointInterval time.Duration
|
LeaseCheckpointInterval time.Duration
|
||||||
|
LeaseCheckpointPersist bool
|
||||||
|
|
||||||
WatchProgressNotifyInterval time.Duration
|
WatchProgressNotifyInterval time.Duration
|
||||||
CorruptCheckTime time.Duration
|
CorruptCheckTime time.Duration
|
||||||
@ -298,6 +299,7 @@ func (c *cluster) mustNewMember(t testing.TB) *member {
|
|||||||
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
|
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
|
||||||
useIP: c.cfg.UseIP,
|
useIP: c.cfg.UseIP,
|
||||||
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
|
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
|
||||||
|
leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist,
|
||||||
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
|
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
|
||||||
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
|
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
|
||||||
CorruptCheckTime: c.cfg.CorruptCheckTime,
|
CorruptCheckTime: c.cfg.CorruptCheckTime,
|
||||||
@ -590,6 +592,7 @@ type memberConfig struct {
|
|||||||
useIP bool
|
useIP bool
|
||||||
enableLeaseCheckpoint bool
|
enableLeaseCheckpoint bool
|
||||||
leaseCheckpointInterval time.Duration
|
leaseCheckpointInterval time.Duration
|
||||||
|
leaseCheckpointPersist bool
|
||||||
WatchProgressNotifyInterval time.Duration
|
WatchProgressNotifyInterval time.Duration
|
||||||
CorruptCheckTime time.Duration
|
CorruptCheckTime time.Duration
|
||||||
}
|
}
|
||||||
@ -684,6 +687,7 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member {
|
|||||||
m.useIP = mcfg.useIP
|
m.useIP = mcfg.useIP
|
||||||
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
|
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
|
||||||
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
|
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
|
||||||
|
m.LeaseCheckpointPersist = mcfg.leaseCheckpointPersist
|
||||||
|
|
||||||
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
|
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
|
||||||
|
|
||||||
|
@ -318,6 +318,7 @@ func TestV3LeaseCheckpoint(t *testing.T) {
|
|||||||
checkpointingEnabled bool
|
checkpointingEnabled bool
|
||||||
ttl time.Duration
|
ttl time.Duration
|
||||||
checkpointingInterval time.Duration
|
checkpointingInterval time.Duration
|
||||||
|
checkpointingPersist bool
|
||||||
leaderChanges int
|
leaderChanges int
|
||||||
clusterSize int
|
clusterSize int
|
||||||
expectTTLIsGT time.Duration
|
expectTTLIsGT time.Duration
|
||||||
@ -340,13 +341,23 @@ func TestV3LeaseCheckpoint(t *testing.T) {
|
|||||||
expectTTLIsLT: 290 * time.Second,
|
expectTTLIsLT: 290 * time.Second,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Checkpointing enabled 10s, lease TTL is preserved after cluster restart",
|
name: "Checkpointing enabled 10s with persist, lease TTL is preserved after cluster restart",
|
||||||
|
ttl: 300 * time.Second,
|
||||||
|
checkpointingEnabled: true,
|
||||||
|
checkpointingInterval: 10 * time.Second,
|
||||||
|
checkpointingPersist: true,
|
||||||
|
leaderChanges: 1,
|
||||||
|
clusterSize: 1,
|
||||||
|
expectTTLIsLT: 290 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Checkpointing enabled 10s, lease TTL is reset after restart",
|
||||||
ttl: 300 * time.Second,
|
ttl: 300 * time.Second,
|
||||||
checkpointingEnabled: true,
|
checkpointingEnabled: true,
|
||||||
checkpointingInterval: 10 * time.Second,
|
checkpointingInterval: 10 * time.Second,
|
||||||
leaderChanges: 1,
|
leaderChanges: 1,
|
||||||
clusterSize: 1,
|
clusterSize: 1,
|
||||||
expectTTLIsLT: 290 * time.Second,
|
expectTTLIsGT: 298 * time.Second,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// Checking if checkpointing continues after the first leader change.
|
// Checking if checkpointing continues after the first leader change.
|
||||||
@ -365,6 +376,7 @@ func TestV3LeaseCheckpoint(t *testing.T) {
|
|||||||
Size: tc.clusterSize,
|
Size: tc.clusterSize,
|
||||||
EnableLeaseCheckpoint: tc.checkpointingEnabled,
|
EnableLeaseCheckpoint: tc.checkpointingEnabled,
|
||||||
LeaseCheckpointInterval: tc.checkpointingInterval,
|
LeaseCheckpointInterval: tc.checkpointingInterval,
|
||||||
|
LeaseCheckpointPersist: tc.checkpointingPersist,
|
||||||
}
|
}
|
||||||
clus := NewClusterV3(t, config)
|
clus := NewClusterV3(t, config)
|
||||||
defer clus.Terminate(t)
|
defer clus.Terminate(t)
|
||||||
|
@ -33,7 +33,7 @@ func TestRenewHTTP(t *testing.T) {
|
|||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||||
le.Promote(time.Second)
|
le.Promote(time.Second)
|
||||||
l, err := le.Grant(1, int64(5))
|
l, err := le.Grant(1, int64(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -58,7 +58,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
|
|||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||||
le.Promote(time.Second)
|
le.Promote(time.Second)
|
||||||
l, err := le.Grant(1, int64(5))
|
l, err := le.Grant(1, int64(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -100,7 +100,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
|
|||||||
defer os.Remove(tmpPath)
|
defer os.Remove(tmpPath)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
|
||||||
le.Promote(time.Second)
|
le.Promote(time.Second)
|
||||||
l, err := le.Grant(1, int64(5))
|
l, err := le.Grant(1, int64(5))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -25,6 +25,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/coreos/go-semver/semver"
|
||||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||||
"go.etcd.io/etcd/lease/leasepb"
|
"go.etcd.io/etcd/lease/leasepb"
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
@ -37,6 +38,8 @@ const NoLease = LeaseID(0)
|
|||||||
// MaxLeaseTTL is the maximum lease TTL value
|
// MaxLeaseTTL is the maximum lease TTL value
|
||||||
const MaxLeaseTTL = 9000000000
|
const MaxLeaseTTL = 9000000000
|
||||||
|
|
||||||
|
var v3_6 = semver.Version{Major: 3, Minor: 6}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
forever = time.Time{}
|
forever = time.Time{}
|
||||||
|
|
||||||
@ -182,19 +185,29 @@ type lessor struct {
|
|||||||
checkpointInterval time.Duration
|
checkpointInterval time.Duration
|
||||||
// the interval to check if the expired lease is revoked
|
// the interval to check if the expired lease is revoked
|
||||||
expiredLeaseRetryInterval time.Duration
|
expiredLeaseRetryInterval time.Duration
|
||||||
|
// whether lessor should always persist remaining TTL (always enabled in v3.6).
|
||||||
|
checkpointPersist bool
|
||||||
|
// cluster is used to adapt lessor logic based on cluster version
|
||||||
|
cluster cluster
|
||||||
|
}
|
||||||
|
|
||||||
|
type cluster interface {
|
||||||
|
// Version is the cluster-wide minimum major.minor version.
|
||||||
|
Version() *semver.Version
|
||||||
}
|
}
|
||||||
|
|
||||||
type LessorConfig struct {
|
type LessorConfig struct {
|
||||||
MinLeaseTTL int64
|
MinLeaseTTL int64
|
||||||
CheckpointInterval time.Duration
|
CheckpointInterval time.Duration
|
||||||
ExpiredLeasesRetryInterval time.Duration
|
ExpiredLeasesRetryInterval time.Duration
|
||||||
|
CheckpointPersist bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
|
func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
|
||||||
return newLessor(lg, b, cfg)
|
return newLessor(lg, b, cluster, cfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
|
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
|
||||||
checkpointInterval := cfg.CheckpointInterval
|
checkpointInterval := cfg.CheckpointInterval
|
||||||
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
|
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
|
||||||
if checkpointInterval == 0 {
|
if checkpointInterval == 0 {
|
||||||
@ -212,11 +225,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
|
|||||||
minLeaseTTL: cfg.MinLeaseTTL,
|
minLeaseTTL: cfg.MinLeaseTTL,
|
||||||
checkpointInterval: checkpointInterval,
|
checkpointInterval: checkpointInterval,
|
||||||
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
|
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
|
||||||
|
checkpointPersist: cfg.CheckpointPersist,
|
||||||
// expiredC is a small buffered chan to avoid unnecessary blocking.
|
// expiredC is a small buffered chan to avoid unnecessary blocking.
|
||||||
expiredC: make(chan []*Lease, 16),
|
expiredC: make(chan []*Lease, 16),
|
||||||
stopC: make(chan struct{}),
|
stopC: make(chan struct{}),
|
||||||
doneC: make(chan struct{}),
|
doneC: make(chan struct{}),
|
||||||
lg: lg,
|
lg: lg,
|
||||||
|
cluster: cluster,
|
||||||
}
|
}
|
||||||
l.initAndRecover()
|
l.initAndRecover()
|
||||||
|
|
||||||
@ -353,7 +368,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
|
|||||||
if l, ok := le.leaseMap[id]; ok {
|
if l, ok := le.leaseMap[id]; ok {
|
||||||
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
|
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
|
||||||
l.remainingTTL = remainingTTL
|
l.remainingTTL = remainingTTL
|
||||||
|
if le.shouldPersistCheckpoints() {
|
||||||
l.persistTo(le.b)
|
l.persistTo(le.b)
|
||||||
|
}
|
||||||
if le.isPrimary() {
|
if le.isPrimary() {
|
||||||
// schedule the next checkpoint as needed
|
// schedule the next checkpoint as needed
|
||||||
le.scheduleCheckpointIfNeeded(l)
|
le.scheduleCheckpointIfNeeded(l)
|
||||||
@ -362,6 +379,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (le *lessor) shouldPersistCheckpoints() bool {
|
||||||
|
cv := le.cluster.Version()
|
||||||
|
return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6))
|
||||||
|
}
|
||||||
|
|
||||||
|
func greaterOrEqual(first, second semver.Version) bool {
|
||||||
|
return !first.LessThan(second)
|
||||||
|
}
|
||||||
|
|
||||||
// Renew renews an existing lease. If the given lease does not exist or
|
// Renew renews an existing lease. If the given lease does not exist or
|
||||||
// has expired, an error will be returned.
|
// has expired, an error will be returned.
|
||||||
func (le *lessor) Renew(id LeaseID) (int64, error) {
|
func (le *lessor) Renew(id LeaseID) (int64, error) {
|
||||||
|
@ -69,7 +69,7 @@ func setUp() (le *lessor, tearDown func()) {
|
|||||||
be, tmpPath := backend.NewDefaultTmpBackend()
|
be, tmpPath := backend.NewDefaultTmpBackend()
|
||||||
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
|
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
|
||||||
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
|
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
|
||||||
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
|
le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
|
||||||
le.SetRangeDeleter(func() TxnDelete {
|
le.SetRangeDeleter(func() TxnDelete {
|
||||||
ftd := &FakeTxnDelete{be.BatchTx()}
|
ftd := &FakeTxnDelete{be.BatchTx()}
|
||||||
ftd.Lock()
|
ftd.Lock()
|
||||||
|
@ -27,10 +27,12 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/coreos/go-semver/semver"
|
||||||
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
|
||||||
"go.etcd.io/etcd/lease/leasepb"
|
"go.etcd.io/etcd/lease/leasepb"
|
||||||
"go.etcd.io/etcd/mvcc/backend"
|
"go.etcd.io/etcd/mvcc/backend"
|
||||||
|
"go.etcd.io/etcd/pkg/testutil"
|
||||||
|
"go.etcd.io/etcd/version"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -48,7 +50,7 @@ func TestLessorGrant(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.Promote(0)
|
le.Promote(0)
|
||||||
|
|
||||||
@ -110,7 +112,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
||||||
|
|
||||||
@ -159,7 +161,7 @@ func TestLessorRevoke(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
var fd *fakeDeleter
|
var fd *fakeDeleter
|
||||||
le.SetRangeDeleter(func() TxnDelete {
|
le.SetRangeDeleter(func() TxnDelete {
|
||||||
@ -212,7 +214,7 @@ func TestLessorRenew(t *testing.T) {
|
|||||||
defer be.Close()
|
defer be.Close()
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.Promote(0)
|
le.Promote(0)
|
||||||
|
|
||||||
@ -245,7 +247,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
|
|||||||
defer be.Close()
|
defer be.Close()
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
|
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
|
||||||
for _, cp := range cp.GetCheckpoints() {
|
for _, cp := range cp.GetCheckpoints() {
|
||||||
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
|
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
|
||||||
@ -294,7 +296,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
|
|||||||
dir, be := NewTestBackend(t)
|
dir, be := NewTestBackend(t)
|
||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
ttl := int64(10)
|
ttl := int64(10)
|
||||||
for i := 1; i <= leaseRevokeRate*10; i++ {
|
for i := 1; i <= leaseRevokeRate*10; i++ {
|
||||||
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
|
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
|
||||||
@ -313,7 +315,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
|
|||||||
bcfg.Path = filepath.Join(dir, "be")
|
bcfg.Path = filepath.Join(dir, "be")
|
||||||
be = backend.New(bcfg)
|
be = backend.New(bcfg)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
// extend after recovery should extend expiration on lease pile-up
|
// extend after recovery should extend expiration on lease pile-up
|
||||||
@ -343,7 +345,7 @@ func TestLessorDetach(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
|
||||||
|
|
||||||
@ -384,7 +386,7 @@ func TestLessorRecover(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
l1, err1 := le.Grant(1, 10)
|
l1, err1 := le.Grant(1, 10)
|
||||||
l2, err2 := le.Grant(2, 20)
|
l2, err2 := le.Grant(2, 20)
|
||||||
@ -393,7 +395,7 @@ func TestLessorRecover(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create a new lessor with the same backend
|
// Create a new lessor with the same backend
|
||||||
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
nle := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer nle.Stop()
|
defer nle.Stop()
|
||||||
nl1 := nle.Lookup(l1.ID)
|
nl1 := nle.Lookup(l1.ID)
|
||||||
if nl1 == nil || nl1.ttl != l1.ttl {
|
if nl1 == nil || nl1.ttl != l1.ttl {
|
||||||
@ -414,7 +416,7 @@ func TestLessorExpire(t *testing.T) {
|
|||||||
|
|
||||||
testMinTTL := int64(1)
|
testMinTTL := int64(1)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
le.Promote(1 * time.Second)
|
le.Promote(1 * time.Second)
|
||||||
@ -467,7 +469,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
|
|||||||
|
|
||||||
testMinTTL := int64(1)
|
testMinTTL := int64(1)
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
le.Promote(1 * time.Second)
|
le.Promote(1 * time.Second)
|
||||||
@ -516,7 +518,7 @@ func TestLessorMaxTTL(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
|
|
||||||
_, err := le.Grant(1, MaxLeaseTTL+1)
|
_, err := le.Grant(1, MaxLeaseTTL+1)
|
||||||
@ -532,7 +534,7 @@ func TestLessorCheckpointScheduling(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
le.minLeaseTTL = 1
|
le.minLeaseTTL = 1
|
||||||
checkpointedC := make(chan struct{})
|
checkpointedC := make(chan struct{})
|
||||||
@ -566,7 +568,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
|
|||||||
defer os.RemoveAll(dir)
|
defer os.RemoveAll(dir)
|
||||||
defer be.Close()
|
defer be.Close()
|
||||||
|
|
||||||
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
|
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
|
||||||
defer le.Stop()
|
defer le.Stop()
|
||||||
l, err := le.Grant(1, 10)
|
l, err := le.Grant(1, 10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -652,7 +654,76 @@ func TestLeaseBackend(t *testing.T) {
|
|||||||
defer be2.Close()
|
defer be2.Close()
|
||||||
leases := unsafeGetAllLeases(be2.ReadTx())
|
leases := unsafeGetAllLeases(be2.ReadTx())
|
||||||
|
|
||||||
assert.Equal(t, tc.want, leases)
|
testutil.AssertEqual(t, tc.want, leases)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
|
||||||
|
const ttl int64 = 10
|
||||||
|
const checkpointTTL int64 = 5
|
||||||
|
|
||||||
|
tcs := []struct {
|
||||||
|
name string
|
||||||
|
cluster cluster
|
||||||
|
checkpointPersist bool
|
||||||
|
expectRemainingTTL int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Etcd v3.6 and newer persist remainingTTL on checkpoint",
|
||||||
|
cluster: clusterV3_6(),
|
||||||
|
expectRemainingTTL: checkpointTTL,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set",
|
||||||
|
cluster: clusterLatest(),
|
||||||
|
checkpointPersist: true,
|
||||||
|
expectRemainingTTL: checkpointTTL,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set",
|
||||||
|
cluster: clusterNil(),
|
||||||
|
checkpointPersist: true,
|
||||||
|
expectRemainingTTL: checkpointTTL,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Etcd v3.5 and older reset remainingTTL on checkpoint",
|
||||||
|
cluster: clusterLatest(),
|
||||||
|
expectRemainingTTL: ttl,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Etcd with version unknown fallbacks to v3.5 behavior",
|
||||||
|
cluster: clusterNil(),
|
||||||
|
expectRemainingTTL: ttl,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
lg := zap.NewNop()
|
||||||
|
dir, be := NewTestBackend(t)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
defer be.Close()
|
||||||
|
|
||||||
|
cfg := LessorConfig{MinLeaseTTL: minLeaseTTL}
|
||||||
|
cfg.CheckpointPersist = tc.checkpointPersist
|
||||||
|
le := newLessor(lg, be, tc.cluster, cfg)
|
||||||
|
l, err := le.Grant(2, ttl)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if l.RemainingTTL() != ttl {
|
||||||
|
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl)
|
||||||
|
}
|
||||||
|
le.Checkpoint(2, checkpointTTL)
|
||||||
|
if l.RemainingTTL() != checkpointTTL {
|
||||||
|
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL)
|
||||||
|
}
|
||||||
|
le.Stop()
|
||||||
|
le2 := newLessor(lg, be, clusterV3_6(), cfg)
|
||||||
|
l = le2.Lookup(2)
|
||||||
|
if l.RemainingTTL() != tc.expectRemainingTTL {
|
||||||
|
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -694,3 +765,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
|
|||||||
bcfg.Path = filepath.Join(tmpPath, "be")
|
bcfg.Path = filepath.Join(tmpPath, "be")
|
||||||
return tmpPath, backend.New(bcfg)
|
return tmpPath, backend.New(bcfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func clusterV3_6() cluster {
|
||||||
|
return fakeCluster{semver.New("3.6.0")}
|
||||||
|
}
|
||||||
|
|
||||||
|
func clusterLatest() cluster {
|
||||||
|
return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")}
|
||||||
|
}
|
||||||
|
|
||||||
|
func clusterNil() cluster {
|
||||||
|
return fakeCluster{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeCluster struct {
|
||||||
|
version *semver.Version
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c fakeCluster) Version() *semver.Version {
|
||||||
|
return c.version
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user