diff --git a/lease/lease_queue.go b/lease/lease_queue.go new file mode 100644 index 000000000..a7c3cf553 --- /dev/null +++ b/lease/lease_queue.go @@ -0,0 +1,51 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lease + +type LeaseWithTime struct { + leaseId LeaseID + expiration int64 + index int +} + +type LeaseQueue []*LeaseWithTime + +func (pq LeaseQueue) Len() int { return len(pq) } + +func (pq LeaseQueue) Less(i, j int) bool { + return pq[i].expiration < pq[j].expiration +} + +func (pq LeaseQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *LeaseQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*LeaseWithTime) + item.index = n + *pq = append(*pq, item) +} + +func (pq *LeaseQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} diff --git a/lease/lessor.go b/lease/lessor.go index 31f645fa3..8609e47b6 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -15,6 +15,7 @@ package lease import ( + "container/heap" "encoding/binary" "errors" "math" @@ -128,9 +129,9 @@ type lessor struct { // We want to make Grant, Revoke, and findExpiredLeases all O(logN) and // Renew O(1). // findExpiredLeases and Renew should be the most frequent operations. - leaseMap map[LeaseID]*Lease - - itemMap map[LeaseItem]LeaseID + leaseMap map[LeaseID]*Lease + leaseHeap LeaseQueue + itemMap map[LeaseItem]LeaseID // When a lease expires, the lessor will delete the // leased range (or key) by the RangeDeleter. @@ -159,6 +160,7 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor { l := &lessor{ leaseMap: make(map[LeaseID]*Lease), itemMap: make(map[LeaseItem]LeaseID), + leaseHeap: make(LeaseQueue, 0), b: b, minLeaseTTL: minLeaseTTL, // expiredC is a small buffered chan to avoid unnecessary blocking. @@ -233,6 +235,8 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { } le.leaseMap[id] = l + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) l.persistTo(le.b) return l, nil @@ -315,6 +319,8 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { } l.refresh(0) + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) return l.ttl, nil } @@ -349,6 +355,8 @@ func (le *lessor) Promote(extend time.Duration) { // refresh the expiries of all leases. for _, l := range le.leaseMap { l.refresh(extend) + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) } if len(le.leaseMap) < leaseRevokeRate { @@ -384,6 +392,8 @@ func (le *lessor) Promote(extend time.Duration) { delay := time.Duration(rateDelay) nextWindow = baseWindow + delay l.refresh(delay + extend) + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) } } @@ -516,9 +526,24 @@ func (le *lessor) runLoop() { func (le *lessor) findExpiredLeases(limit int) []*Lease { leases := make([]*Lease, 0, 16) - for _, l := range le.leaseMap { - // TODO: probably should change to <= 100-500 millisecond to - // make up committing latency. + for { + if le.leaseHeap.Len() == 0 { + break + } + + item := heap.Pop(&le.leaseHeap).(*LeaseWithTime) + l := le.leaseMap[item.leaseId] + if l == nil { + // lease has expired or been revoked, continue + continue + } + if time.Now().UnixNano() < item.expiration { + // Candidate expirations are caught up, reinsert this item + heap.Push(&le.leaseHeap, item) + break + } + // if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap + if l.expired() { leases = append(leases, l) @@ -560,6 +585,7 @@ func (le *lessor) initAndRecover() { revokec: make(chan struct{}), } } + heap.Init(&le.leaseHeap) tx.Unlock() le.b.ForceCommit()