Fix races in TestLessorRenewExtendPileup

Signed-off-by: Lucas Rodriguez <lucas.rodriguez9616@gmail.com>
This commit is contained in:
Lucas Rodriguez 2024-09-09 10:39:44 -05:00
parent f89dfed2dc
commit a042354947
2 changed files with 23 additions and 15 deletions

View File

@ -42,8 +42,8 @@ const MaxLeaseTTL = 9000000000
var ( var (
forever = time.Time{} forever = time.Time{}
// maximum number of leases to revoke per second; configurable for tests // default number of leases to revoke per second; configurable for tests
leaseRevokeRate = 1000 defaultLeaseRevokeRate = 1000
// maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests // maximum number of lease checkpoints recorded to the consensus log per second; configurable for tests
leaseCheckpointRate = 1000 leaseCheckpointRate = 1000
@ -170,6 +170,9 @@ type lessor struct {
// requests for shorter TTLs are extended to the minimum TTL. // requests for shorter TTLs are extended to the minimum TTL.
minLeaseTTL int64 minLeaseTTL int64
// maximum number of leases to revoke per second
leaseRevokeRate int
expiredC chan []*Lease expiredC chan []*Lease
// stopC is a channel whose closure indicates that the lessor should be stopped. // stopC is a channel whose closure indicates that the lessor should be stopped.
stopC chan struct{} stopC chan struct{}
@ -198,6 +201,8 @@ type LessorConfig struct {
CheckpointInterval time.Duration CheckpointInterval time.Duration
ExpiredLeasesRetryInterval time.Duration ExpiredLeasesRetryInterval time.Duration
CheckpointPersist bool CheckpointPersist bool
leaseRevokeRate int
} }
func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor { func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
@ -207,12 +212,16 @@ func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorCon
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor { func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor {
checkpointInterval := cfg.CheckpointInterval checkpointInterval := cfg.CheckpointInterval
expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval expiredLeaseRetryInterval := cfg.ExpiredLeasesRetryInterval
leaseRevokeRate := cfg.leaseRevokeRate
if checkpointInterval == 0 { if checkpointInterval == 0 {
checkpointInterval = defaultLeaseCheckpointInterval checkpointInterval = defaultLeaseCheckpointInterval
} }
if expiredLeaseRetryInterval == 0 { if expiredLeaseRetryInterval == 0 {
expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval expiredLeaseRetryInterval = defaultExpiredleaseRetryInterval
} }
if leaseRevokeRate == 0 {
leaseRevokeRate = defaultLeaseRevokeRate
}
l := &lessor{ l := &lessor{
leaseMap: make(map[LeaseID]*Lease), leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID), itemMap: make(map[LeaseItem]LeaseID),
@ -220,6 +229,7 @@ func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorCon
leaseCheckpointHeap: make(LeaseQueue, 0), leaseCheckpointHeap: make(LeaseQueue, 0),
b: b, b: b,
minLeaseTTL: cfg.MinLeaseTTL, minLeaseTTL: cfg.MinLeaseTTL,
leaseRevokeRate: leaseRevokeRate,
checkpointInterval: checkpointInterval, checkpointInterval: checkpointInterval,
expiredLeaseRetryInterval: expiredLeaseRetryInterval, expiredLeaseRetryInterval: expiredLeaseRetryInterval,
checkpointPersist: cfg.CheckpointPersist, checkpointPersist: cfg.CheckpointPersist,
@ -473,7 +483,7 @@ func (le *lessor) Promote(extend time.Duration) {
le.scheduleCheckpointIfNeeded(l) le.scheduleCheckpointIfNeeded(l)
} }
if len(le.leaseMap) < leaseRevokeRate { if len(le.leaseMap) < le.leaseRevokeRate {
// no possibility of lease pile-up // no possibility of lease pile-up
return return
} }
@ -487,7 +497,7 @@ func (le *lessor) Promote(extend time.Duration) {
expires := 0 expires := 0
// have fewer expires than the total revoke rate so piled up leases // have fewer expires than the total revoke rate so piled up leases
// don't consume the entire revoke limit // don't consume the entire revoke limit
targetExpiresPerSecond := (3 * leaseRevokeRate) / 4 targetExpiresPerSecond := (3 * le.leaseRevokeRate) / 4
for _, l := range leases { for _, l := range leases {
remaining := l.Remaining() remaining := l.Remaining()
if remaining > nextWindow { if remaining > nextWindow {
@ -623,7 +633,7 @@ func (le *lessor) revokeExpiredLeases() {
var ls []*Lease var ls []*Lease
// rate limit // rate limit
revokeLimit := leaseRevokeRate / 2 revokeLimit := le.leaseRevokeRate / 2
le.mu.RLock() le.mu.RLock()
if le.isPrimary() { if le.isPrimary() {

View File

@ -307,17 +307,15 @@ func TestLessorRenewWithCheckpointer(t *testing.T) {
// TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many // TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many
// expire at the same time. // expire at the same time.
func TestLessorRenewExtendPileup(t *testing.T) { func TestLessorRenewExtendPileup(t *testing.T) {
oldRevokeRate := leaseRevokeRate leaseRevokeRate := 10
defer func() { leaseRevokeRate = oldRevokeRate }()
lg := zap.NewNop() lg := zap.NewNop()
leaseRevokeRate = 10
dir, be := NewTestBackend(t) dir, be := NewTestBackend(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) le := newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL, leaseRevokeRate: leaseRevokeRate})
ttl := int64(10) ttl := int64(10)
for i := 1; i <= leaseRevokeRate*10; i++ { for i := 1; i <= le.leaseRevokeRate*10; i++ {
if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -334,7 +332,7 @@ func TestLessorRenewExtendPileup(t *testing.T) {
bcfg.Path = filepath.Join(dir, "be") bcfg.Path = filepath.Join(dir, "be")
be = backend.New(bcfg) be = backend.New(bcfg)
defer be.Close() defer be.Close()
le = newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL}) le = newLessor(lg, be, clusterLatest(), LessorConfig{MinLeaseTTL: minLeaseTTL, leaseRevokeRate: leaseRevokeRate})
defer le.Stop() defer le.Stop()
// extend after recovery should extend expiration on lease pile-up // extend after recovery should extend expiration on lease pile-up
@ -349,11 +347,11 @@ func TestLessorRenewExtendPileup(t *testing.T) {
for i := ttl; i < ttl+20; i++ { for i := ttl; i < ttl+20; i++ {
c := windowCounts[i] c := windowCounts[i]
if c > leaseRevokeRate { if c > le.leaseRevokeRate {
t.Errorf("expected at most %d expiring at %ds, got %d", leaseRevokeRate, i, c) t.Errorf("expected at most %d expiring at %ds, got %d", le.leaseRevokeRate, i, c)
} }
if c < leaseRevokeRate/2 { if c < le.leaseRevokeRate/2 {
t.Errorf("expected at least %d expiring at %ds, got %d", leaseRevokeRate/2, i, c) t.Errorf("expected at least %d expiring at %ds, got %d", le.leaseRevokeRate/2, i, c)
} }
} }
} }