Merge pull request #14253 from serathius/checkpoints-fix-3.4

[3.4] Checkpoints fix 3.4
This commit is contained in:
Benjamin Wang 2022-07-22 16:56:17 +08:00 committed by GitHub
commit e2b36f8879
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 290 additions and 80 deletions

View File

@ -391,7 +391,7 @@ func (s *v3Manager) saveDB() error {
be := backend.NewDefaultBackend(dbpath)
// a lessor never timeouts leases
lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
lessor := lease.NewLessor(s.lg, be, nil, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
txn := mvs.Write(traceutil.TODO())

View File

@ -288,10 +288,15 @@ type Config struct {
ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"`
// ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses (array and map are supported types).
ExperimentalBackendFreelistType string `json:"experimental-backend-bbolt-freelist-type"`
// ExperimentalEnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalEnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
ExperimentalEnableLeaseCheckpoint bool `json:"experimental-enable-lease-checkpoint"`
// ExperimentalEnableLeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
// Requires experimental-enable-lease-checkpoint to be enabled.
// Deprecated in v3.6.
// TODO: Delete in v3.7
ExperimentalEnableLeaseCheckpointPersist bool `json:"experimental-enable-lease-checkpoint-persist"`
ExperimentalCompactionBatchLimit int `json:"experimental-compaction-batch-limit"`
ExperimentalWatchProgressNotifyInterval time.Duration `json:"experimental-watch-progress-notify-interval"`
// ExperimentalWarningApplyDuration is the time duration after which a warning is generated if applying request
// takes more time than this value.
ExperimentalWarningApplyDuration time.Duration `json:"experimental-warning-apply-duration"`
@ -637,6 +642,14 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("unknown auto-compaction-mode %q", cfg.AutoCompactionMode)
}
if !cfg.ExperimentalEnableLeaseCheckpointPersist && cfg.ExperimentalEnableLeaseCheckpoint {
cfg.logger.Warn("Detected that checkpointing is enabled without persistence. Consider enabling experimental-enable-lease-checkpoint-persist")
}
if cfg.ExperimentalEnableLeaseCheckpointPersist && !cfg.ExperimentalEnableLeaseCheckpoint {
return fmt.Errorf("setting experimental-enable-lease-checkpoint-persist requires experimental-enable-lease-checkpoint")
}
return nil
}

View File

@ -208,6 +208,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
EnableGRPCGateway: cfg.EnableGRPCGateway,
UnsafeNoFsync: cfg.UnsafeNoFsync,
EnableLeaseCheckpoint: cfg.ExperimentalEnableLeaseCheckpoint,
LeaseCheckpointPersist: cfg.ExperimentalEnableLeaseCheckpointPersist,
CompactionBatchLimit: cfg.ExperimentalCompactionBatchLimit,
WatchProgressNotifyInterval: cfg.ExperimentalWatchProgressNotifyInterval,
WarningApplyDuration: cfg.ExperimentalWarningApplyDuration,

View File

@ -257,7 +257,9 @@ func newConfig() *config {
fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.")
fs.StringVar(&cfg.ec.ExperimentalEnableV2V3, "experimental-enable-v2v3", cfg.ec.ExperimentalEnableV2V3, "v3 prefix for serving emulated v2 state.")
fs.StringVar(&cfg.ec.ExperimentalBackendFreelistType, "experimental-backend-bbolt-freelist-type", cfg.ec.ExperimentalBackendFreelistType, "ExperimentalBackendFreelistType specifies the type of freelist that boltdb backend uses(array and map are supported types)")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable to persist lease remaining TTL to prevent indefinite auto-renewal of long lived leases.")
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpoint, "experimental-enable-lease-checkpoint", false, "Enable leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.")
// TODO: delete in v3.7
fs.BoolVar(&cfg.ec.ExperimentalEnableLeaseCheckpointPersist, "experimental-enable-lease-checkpoint-persist", false, "Enable persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled. Requires experimental-enable-lease-checkpoint to be enabled.")
fs.IntVar(&cfg.ec.ExperimentalCompactionBatchLimit, "experimental-compaction-batch-limit", cfg.ec.ExperimentalCompactionBatchLimit, "Sets the maximum revisions deleted in each compaction batch.")
fs.DurationVar(&cfg.ec.ExperimentalWatchProgressNotifyInterval, "experimental-watch-progress-notify-interval", cfg.ec.ExperimentalWatchProgressNotifyInterval, "Duration of periodic watch progress notifications.")
fs.DurationVar(&cfg.ec.ExperimentalWarningApplyDuration, "experimental-warning-apply-duration", cfg.ec.ExperimentalWarningApplyDuration, "Time duration after which a warning is generated if request takes more time.")

View File

@ -158,10 +158,12 @@ type ServerConfig struct {
ForceNewCluster bool
// EnableLeaseCheckpoint enables primary lessor to persist lease remainingTTL to prevent indefinite auto-renewal of long lived leases.
// EnableLeaseCheckpoint enables leader to send regular checkpoints to other members to prevent reset of remaining TTL on leader change.
EnableLeaseCheckpoint bool
// LeaseCheckpointInterval time.Duration is the wait duration between lease checkpoints.
LeaseCheckpointInterval time.Duration
// LeaseCheckpointPersist enables persisting remainingTTL to prevent indefinite auto-renewal of long lived leases. Always enabled in v3.6. Should be used to ensure smooth upgrade from v3.5 clusters with this feature enabled.
LeaseCheckpointPersist bool
EnableGRPCGateway bool

View File

@ -555,9 +555,11 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
srv.lessor = lease.NewLessor(
srv.getLogger(),
srv.be,
srv.cluster,
lease.LessorConfig{
MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
CheckpointInterval: cfg.LeaseCheckpointInterval,
CheckpointPersist: cfg.LeaseCheckpointPersist,
ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
})

View File

@ -152,6 +152,7 @@ type ClusterConfig struct {
EnableLeaseCheckpoint bool
LeaseCheckpointInterval time.Duration
LeaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
@ -298,6 +299,7 @@ func (c *cluster) mustNewMember(t testing.TB) *member {
clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize,
useIP: c.cfg.UseIP,
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
CorruptCheckTime: c.cfg.CorruptCheckTime,
@ -590,6 +592,7 @@ type memberConfig struct {
useIP bool
enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration
leaseCheckpointPersist bool
WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
}
@ -684,6 +687,7 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member {
m.useIP = mcfg.useIP
m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint
m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval
m.LeaseCheckpointPersist = mcfg.leaseCheckpointPersist
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval

View File

@ -313,54 +313,121 @@ func TestV3LeaseKeepAlive(t *testing.T) {
// TestV3LeaseCheckpoint ensures a lease checkpoint results in a remaining TTL being persisted
// across leader elections.
func TestV3LeaseCheckpoint(t *testing.T) {
var ttl int64 = 300
leaseInterval := 2 * time.Second
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{
Size: 3,
EnableLeaseCheckpoint: true,
LeaseCheckpointInterval: leaseInterval,
})
defer clus.Terminate(t)
// create lease
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := toGRPC(clus.RandClient())
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: ttl})
if err != nil {
t.Fatal(err)
tcs := []struct {
name string
checkpointingEnabled bool
ttl time.Duration
checkpointingInterval time.Duration
checkpointingPersist bool
leaderChanges int
clusterSize int
expectTTLIsGT time.Duration
expectTTLIsLT time.Duration
}{
{
name: "Checkpointing disabled, lease TTL is reset",
ttl: 300 * time.Second,
leaderChanges: 1,
clusterSize: 3,
expectTTLIsGT: 298 * time.Second,
},
{
name: "Checkpointing enabled 10s, lease TTL is preserved after leader change",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 1,
clusterSize: 3,
expectTTLIsLT: 290 * time.Second,
},
{
name: "Checkpointing enabled 10s with persist, lease TTL is preserved after cluster restart",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
checkpointingPersist: true,
leaderChanges: 1,
clusterSize: 1,
expectTTLIsLT: 290 * time.Second,
},
{
name: "Checkpointing enabled 10s, lease TTL is reset after restart",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 1,
clusterSize: 1,
expectTTLIsGT: 298 * time.Second,
},
{
// Checking if checkpointing continues after the first leader change.
name: "Checkpointing enabled 10s, lease TTL is preserved after 2 leader changes",
ttl: 300 * time.Second,
checkpointingEnabled: true,
checkpointingInterval: 10 * time.Second,
leaderChanges: 2,
clusterSize: 3,
expectTTLIsLT: 280 * time.Second,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
config := &ClusterConfig{
Size: tc.clusterSize,
EnableLeaseCheckpoint: tc.checkpointingEnabled,
LeaseCheckpointInterval: tc.checkpointingInterval,
LeaseCheckpointPersist: tc.checkpointingPersist,
SnapshotCount: 5,
}
clus := NewClusterV3(t, config)
defer clus.Terminate(t)
// wait for a checkpoint to occur
time.Sleep(leaseInterval + 1*time.Second)
// Force a leader election
leaderId := clus.WaitLeader(t)
leader := clus.Members[leaderId]
leader.Stop(t)
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
leader.Restart(t)
newLeaderId := clus.WaitLeader(t)
c2 := toGRPC(clus.Client(newLeaderId))
time.Sleep(250 * time.Millisecond)
// Check the TTL of the new leader
var ttlresp *pb.LeaseTimeToLiveResponse
for i := 0; i < 10; i++ {
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
time.Sleep(time.Millisecond * 250)
} else {
// create lease
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := toGRPC(clus.RandClient())
lresp, err := c.Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{TTL: int64(tc.ttl.Seconds())})
if err != nil {
t.Fatal(err)
}
}
}
expectedTTL := ttl - int64(leaseInterval.Seconds())
if ttlresp.TTL < expectedTTL-1 || ttlresp.TTL > expectedTTL {
t.Fatalf("expected lease to be checkpointed after restart such that %d < TTL <%d, but got TTL=%d", expectedTTL-1, expectedTTL, ttlresp.TTL)
for i := 0; i < tc.leaderChanges; i++ {
// wait for a checkpoint to occur
time.Sleep(tc.checkpointingInterval + 1*time.Second)
// Force a leader election
leaderId := clus.WaitLeader(t)
leader := clus.Members[leaderId]
leader.Stop(t)
time.Sleep(time.Duration(3*electionTicks) * tickDuration)
leader.Restart(t)
}
newLeaderId := clus.WaitLeader(t)
c2 := toGRPC(clus.Client(newLeaderId))
time.Sleep(250 * time.Millisecond)
// Check the TTL of the new leader
var ttlresp *pb.LeaseTimeToLiveResponse
for i := 0; i < 10; i++ {
if ttlresp, err = c2.Lease.LeaseTimeToLive(ctx, &pb.LeaseTimeToLiveRequest{ID: lresp.ID}); err != nil {
if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
time.Sleep(time.Millisecond * 250)
} else {
t.Fatal(err)
}
}
}
if tc.expectTTLIsGT != 0 && time.Duration(ttlresp.TTL)*time.Second <= tc.expectTTLIsGT {
t.Errorf("Expected lease ttl (%v) to be greather than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsGT)
}
if tc.expectTTLIsLT != 0 && time.Duration(ttlresp.TTL)*time.Second > tc.expectTTLIsLT {
t.Errorf("Expected lease ttl (%v) to be lower than (%v)", time.Duration(ttlresp.TTL)*time.Second, tc.expectTTLIsLT)
}
})
}
}

View File

@ -33,7 +33,7 @@ func TestRenewHTTP(t *testing.T) {
defer os.Remove(tmpPath)
defer be.Close()
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
@ -58,7 +58,7 @@ func TestTimeToLiveHTTP(t *testing.T) {
defer os.Remove(tmpPath)
defer be.Close()
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {
@ -100,7 +100,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) {
defer os.Remove(tmpPath)
defer be.Close()
le := lease.NewLessor(lg, be, lease.LessorConfig{MinLeaseTTL: int64(5)})
le := lease.NewLessor(lg, be, nil, lease.LessorConfig{MinLeaseTTL: int64(5)})
le.Promote(time.Second)
l, err := le.Grant(1, int64(5))
if err != nil {

View File

@ -25,6 +25,7 @@ import (
"sync"
"time"
"github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease/leasepb"
"go.etcd.io/etcd/mvcc/backend"
@ -37,6 +38,8 @@ const NoLease = LeaseID(0)
// MaxLeaseTTL is the maximum lease TTL value
const MaxLeaseTTL = 9000000000
var v3_6 = semver.Version{Major: 3, Minor: 6}
var (
forever = time.Time{}
@ -182,19 +185,29 @@ type lessor struct {
checkpointInterval time.Duration
// the interval to check if the expired lease is revoked
expiredLeaseRetryInterval time.Duration
// whether lessor should always persist remaining TTL (always enabled in v3.6).
checkpointPersist bool
// cluster is used to adapt lessor logic based on cluster version
cluster cluster
}
type cluster interface {
// Version is the cluster-wide minimum major.minor version.
Version() *semver.Version
}
type LessorConfig struct {
MinLeaseTTL int64
CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration
CheckpointPersist bool
}
func NewLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) Lessor {
return newLessor(lg, b, cfg)
func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
return newLessor(lg, b, cluster, cfg)
}
func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
if checkpointInterval == 0 {
@ -212,11 +225,13 @@ func newLessor(lg *zap.Logger, b backend.Backend, cfg LessorConfig) *lessor {
minLeaseTTL: cfg.MinLeaseTTL,
checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval,
checkpointPersist: cfg.CheckpointPersist,
// expiredC is a small buffered chan to avoid unnecessary blocking.
expiredC: make(chan []*Lease, 16),
stopC: make(chan struct{}),
doneC: make(chan struct{}),
lg: lg,
cluster: cluster,
}
l.initAndRecover()
@ -353,6 +368,9 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
if l, ok := le.leaseMap[id]; ok {
// when checkpointing, we only update the remainingTTL, Promote is responsible for applying this to lease expiry
l.remainingTTL = remainingTTL
if le.shouldPersistCheckpoints() {
l.persistTo(le.b)
}
if le.isPrimary() {
// schedule the next checkpoint as needed
le.scheduleCheckpointIfNeeded(l)
@ -361,6 +379,15 @@ func (le *lessor) Checkpoint(id LeaseID, remainingTTL int64) error {
return nil
}
func (le *lessor) shouldPersistCheckpoints() bool {
cv := le.cluster.Version()
return le.checkpointPersist || (cv != nil && greaterOrEqual(*cv, v3_6))
}
func greaterOrEqual(first, second semver.Version) bool {
return !first.LessThan(second)
}
// Renew renews an existing lease. If the given lease does not exist or
// has expired, an error will be returned.
func (le *lessor) Renew(id LeaseID) (int64, error) {
@ -448,6 +475,7 @@ func (le *lessor) Promote(extend time.Duration) {
l.refresh(extend)
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
le.leaseExpiredNotifier.RegisterOrUpdate(item)
le.scheduleCheckpointIfNeeded(l)
}
if len(le.leaseMap) < leaseRevokeRate {
@ -785,9 +813,10 @@ func (le *lessor) initAndRecover() {
ttl: lpb.TTL,
// itemSet will be filled in when recover key-value pairs
// set expiry to forever, refresh when promoted
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
itemSet: make(map[LeaseItem]struct{}),
expiry: forever,
revokec: make(chan struct{}),
remainingTTL: lpb.RemainingTTL,
}
}
le.leaseExpiredNotifier.Init()

View File

@ -69,7 +69,7 @@ func setUp() (le *lessor, tearDown func()) {
be, tmpPath := backend.NewDefaultTmpBackend()
// MinLeaseTTL is negative, so we can grant expired lease in benchmark.
// ExpiredLeasesRetryInterval should small, so benchmark of findExpired will recheck expired lease.
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le = newLessor(lg, be, nil, LessorConfig{MinLeaseTTL: -1000, ExpiredLeasesRetryInterval: 10 * time.Microsecond})
le.SetRangeDeleter(func() TxnDelete {
ftd := &FakeTxnDelete{be.BatchTx()}
ftd.Lock()

View File

@ -27,10 +27,12 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/coreos/go-semver/semver"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease/leasepb"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/pkg/testutil"
"go.etcd.io/etcd/version"
"go.uber.org/zap"
)
@ -48,7 +50,7 @@ func TestLessorGrant(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)
@ -110,7 +112,7 @@ func TestLeaseConcurrentKeys(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -159,7 +161,7 @@ func TestLessorRevoke(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
var fd *fakeDeleter
le.SetRangeDeleter(func() TxnDelete {
@ -212,7 +214,7 @@ func TestLessorRenew(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.Promote(0)
@ -245,7 +247,7 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
defer be.Close()
defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
fakerCheckerpointer := func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
for _, cp := range cp.GetCheckpoints() {
le.Checkpoint(LeaseID(cp.GetID()), cp.GetRemaining_TTL())
@ -294,7 +296,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
@ -313,7 +315,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg)
defer be.Close()
le = newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le = newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
// extend after recovery should extend expiration on lease pile-up
@ -343,7 +345,7 @@ func TestLessorDetach(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) })
@ -384,7 +386,7 @@ func TestLessorRecover(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
l1, err1 := le.Grant(1, 10)
l2, err2 := le.Grant(2, 20)
@ -393,7 +395,7 @@ func TestLessorRecover(t *testing.T) {
}
// Create a new lessor with the same backend
nle := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
nle := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer nle.Stop()
nl1 := nle.Lookup(l1.ID)
if nl1 == nil || nl1.ttl != l1.ttl {
@ -414,7 +416,7 @@ func TestLessorExpire(t *testing.T) {
testMinTTL := int64(1)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()
le.Promote(1 * time.Second)
@ -467,7 +469,7 @@ func TestLessorExpireAndDemote(t *testing.T) {
testMinTTL := int64(1)
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: testMinTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: testMinTTL})
defer le.Stop()
le.Promote(1 * time.Second)
@ -516,7 +518,7 @@ func TestLessorMaxTTL(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
_, err := le.Grant(1, MaxLeaseTTL+1)
@ -532,7 +534,8 @@ func TestLessorCheckpointScheduling(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL, CheckpointInterval: 1 * time.Second})
defer le.Stop()
le.minLeaseTTL = 1
checkpointedC := make(chan struct{})
le.SetCheckpointer(func(ctx context.Context, lc *pb.LeaseCheckpointRequest) {
@ -545,13 +548,11 @@ func TestLessorCheckpointScheduling(t *testing.T) {
t.Errorf("expected checkpoint to be called with Remaining_TTL=%d but got %d", 1, c.Remaining_TTL)
}
})
defer le.Stop()
le.Promote(0)
_, err := le.Grant(1, 2)
if err != nil {
t.Fatal(err)
}
le.Promote(0)
// TODO: Is there any way to avoid doing this wait? Lease TTL granularity is in seconds.
select {
@ -567,7 +568,7 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) {
defer os.RemoveAll(dir)
defer be.Close()
le := newLessor(lg, be, LessorConfig{MinLeaseTTL: minLeaseTTL})
le := newLessor(lg, be, clusterV3_6(), LessorConfig{MinLeaseTTL: minLeaseTTL})
defer le.Stop()
l, err := le.Grant(1, 10)
if err != nil {
@ -653,7 +654,76 @@ func TestLeaseBackend(t *testing.T) {
defer be2.Close()
leases := unsafeGetAllLeases(be2.ReadTx())
assert.Equal(t, tc.want, leases)
testutil.AssertEqual(t, tc.want, leases)
})
}
}
func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
const ttl int64 = 10
const checkpointTTL int64 = 5
tcs := []struct {
name string
cluster cluster
checkpointPersist bool
expectRemainingTTL int64
}{
{
name: "Etcd v3.6 and newer persist remainingTTL on checkpoint",
cluster: clusterV3_6(),
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd v3.5 and older persist remainingTTL if CheckpointPersist is set",
cluster: clusterLatest(),
checkpointPersist: true,
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd with version unknown persists remainingTTL if CheckpointPersist is set",
cluster: clusterNil(),
checkpointPersist: true,
expectRemainingTTL: checkpointTTL,
},
{
name: "Etcd v3.5 and older reset remainingTTL on checkpoint",
cluster: clusterLatest(),
expectRemainingTTL: ttl,
},
{
name: "Etcd with version unknown fallbacks to v3.5 behavior",
cluster: clusterNil(),
expectRemainingTTL: ttl,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
lg := zap.NewNop()
dir, be := NewTestBackend(t)
defer os.RemoveAll(dir)
defer be.Close()
cfg := LessorConfig{MinLeaseTTL: minLeaseTTL}
cfg.CheckpointPersist = tc.checkpointPersist
le := newLessor(lg, be, tc.cluster, cfg)
l, err := le.Grant(2, ttl)
if err != nil {
t.Fatal(err)
}
if l.RemainingTTL() != ttl {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), ttl)
}
le.Checkpoint(2, checkpointTTL)
if l.RemainingTTL() != checkpointTTL {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), checkpointTTL)
}
le.Stop()
le2 := newLessor(lg, be, clusterV3_6(), cfg)
l = le2.Lookup(2)
if l.RemainingTTL() != tc.expectRemainingTTL {
t.Errorf("remainingTTL() = %d, expected: %d", l.RemainingTTL(), tc.expectRemainingTTL)
}
})
}
}
@ -695,3 +765,23 @@ func NewTestBackend(t *testing.T) (string, backend.Backend) {
bcfg.Path = filepath.Join(tmpPath, "be")
return tmpPath, backend.New(bcfg)
}
func clusterV3_6() cluster {
return fakeCluster{semver.New("3.6.0")}
}
func clusterLatest() cluster {
return fakeCluster{semver.New(version.Cluster(version.Version) + ".0")}
}
func clusterNil() cluster {
return fakeCluster{}
}
type fakeCluster struct {
version *semver.Version
}
func (c fakeCluster) Version() *semver.Version {
return c.version
}