mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
lease: fix lease expiry bases on wall clock (#12292)
fix https://github.com/etcd-io/etcd/issues/12291
This commit is contained in:
parent
8c192d99df
commit
4136df7933
@ -14,15 +14,17 @@
|
||||
|
||||
package lease
|
||||
|
||||
import "container/heap"
|
||||
import (
|
||||
"container/heap"
|
||||
"time"
|
||||
)
|
||||
|
||||
// LeaseWithTime contains lease object with a time.
|
||||
// For the lessor's lease heap, time identifies the lease expiration time.
|
||||
// For the lessor's lease checkpoint heap, the time identifies the next lease checkpoint time.
|
||||
type LeaseWithTime struct {
|
||||
id LeaseID
|
||||
// Unix nanos timestamp.
|
||||
time int64
|
||||
id LeaseID
|
||||
time time.Time
|
||||
index int
|
||||
}
|
||||
|
||||
@ -31,7 +33,7 @@ type LeaseQueue []*LeaseWithTime
|
||||
func (pq LeaseQueue) Len() int { return len(pq) }
|
||||
|
||||
func (pq LeaseQueue) Less(i, j int) bool {
|
||||
return pq[i].time < pq[j].time
|
||||
return pq[i].time.Before(pq[j].time)
|
||||
}
|
||||
|
||||
func (pq LeaseQueue) Swap(i, j int) {
|
||||
|
@ -30,9 +30,10 @@ func TestLeaseQueue(t *testing.T) {
|
||||
|
||||
// insert in reverse order of expiration time
|
||||
for i := 50; i >= 1; i-- {
|
||||
exp := time.Now().Add(time.Hour).UnixNano()
|
||||
now := time.Now()
|
||||
exp := now.Add(time.Hour)
|
||||
if i == 1 {
|
||||
exp = time.Now().UnixNano()
|
||||
exp = now
|
||||
}
|
||||
le.leaseMap[LeaseID(i)] = &Lease{ID: LeaseID(i)}
|
||||
le.leaseExpiredNotifier.RegisterOrUpdate(&LeaseWithTime{id: LeaseID(i), time: exp})
|
||||
|
@ -300,7 +300,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
|
||||
leaseGranted.Inc()
|
||||
|
||||
if le.isPrimary() {
|
||||
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
|
||||
item := &LeaseWithTime{id: l.ID, time: l.expiry}
|
||||
le.leaseExpiredNotifier.RegisterOrUpdate(item)
|
||||
le.scheduleCheckpointIfNeeded(l)
|
||||
}
|
||||
@ -413,7 +413,7 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
|
||||
|
||||
le.mu.Lock()
|
||||
l.refresh(0)
|
||||
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
|
||||
item := &LeaseWithTime{id: l.ID, time: l.expiry}
|
||||
le.leaseExpiredNotifier.RegisterOrUpdate(item)
|
||||
le.mu.Unlock()
|
||||
|
||||
@ -452,7 +452,7 @@ func (le *lessor) Promote(extend time.Duration) {
|
||||
// refresh the expiries of all leases.
|
||||
for _, l := range le.leaseMap {
|
||||
l.refresh(extend)
|
||||
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
|
||||
item := &LeaseWithTime{id: l.ID, time: l.expiry}
|
||||
le.leaseExpiredNotifier.RegisterOrUpdate(item)
|
||||
}
|
||||
|
||||
@ -490,7 +490,7 @@ func (le *lessor) Promote(extend time.Duration) {
|
||||
delay := time.Duration(rateDelay)
|
||||
nextWindow = baseWindow + delay
|
||||
l.refresh(delay + extend)
|
||||
item := &LeaseWithTime{id: l.ID, time: l.expiry.UnixNano()}
|
||||
item := &LeaseWithTime{id: l.ID, time: l.expiry}
|
||||
le.leaseExpiredNotifier.RegisterOrUpdate(item)
|
||||
le.scheduleCheckpointIfNeeded(l)
|
||||
}
|
||||
@ -677,14 +677,14 @@ func (le *lessor) expireExists() (l *Lease, ok bool, next bool) {
|
||||
return nil, false, true
|
||||
}
|
||||
now := time.Now()
|
||||
if now.UnixNano() < item.time /* expiration time */ {
|
||||
if now.Before(item.time) /* item.time: expiration time */ {
|
||||
// Candidate expirations are caught up, reinsert this item
|
||||
// and no need to revoke (nothing is expiry)
|
||||
return l, false, false
|
||||
}
|
||||
|
||||
// recheck if revoke is complete after retry interval
|
||||
item.time = now.Add(le.expiredLeaseRetryInterval).UnixNano()
|
||||
item.time = now.Add(le.expiredLeaseRetryInterval)
|
||||
le.leaseExpiredNotifier.RegisterOrUpdate(item)
|
||||
return l, true, false
|
||||
}
|
||||
@ -733,7 +733,7 @@ func (le *lessor) scheduleCheckpointIfNeeded(lease *Lease) {
|
||||
}
|
||||
heap.Push(&le.leaseCheckpointHeap, &LeaseWithTime{
|
||||
id: lease.ID,
|
||||
time: time.Now().Add(le.checkpointInterval).UnixNano(),
|
||||
time: time.Now().Add(le.checkpointInterval),
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -747,7 +747,7 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
|
||||
cps := []*pb.LeaseCheckpoint{}
|
||||
for le.leaseCheckpointHeap.Len() > 0 && len(cps) < checkpointLimit {
|
||||
lt := le.leaseCheckpointHeap[0]
|
||||
if lt.time /* next checkpoint time */ > now.UnixNano() {
|
||||
if lt.time.After(now) /* lt.time: next checkpoint time */ {
|
||||
return cps
|
||||
}
|
||||
heap.Pop(&le.leaseCheckpointHeap)
|
||||
|
Loading…
x
Reference in New Issue
Block a user