From 6f271d8bf1628cd3fd9f2956b8cb59c60deedeac Mon Sep 17 00:00:00 2001 From: micah Date: Fri, 9 Mar 2018 21:14:14 +0000 Subject: [PATCH] lease: Add a heap to optimize lease expiration checks This adds a heap acting as a priority queue to keep track of lease exiprations. Previously the whole lease map had to be iterated through each time. The queue allows us to check only those leases which might be expired. When the expiration changes, we add an additional entry. If we check an entry that isn't expired, it means that the lease got extended. If we find a entry in the heap that doesn't have a corresponding entry in the map, we know that the lease has already been expired or revoked. --- lease/lease_queue.go | 51 ++++++++++++++++++++++++++++++++++++++++++++ lease/lessor.go | 38 +++++++++++++++++++++++++++------ 2 files changed, 83 insertions(+), 6 deletions(-) create mode 100644 lease/lease_queue.go diff --git a/lease/lease_queue.go b/lease/lease_queue.go new file mode 100644 index 000000000..a7c3cf553 --- /dev/null +++ b/lease/lease_queue.go @@ -0,0 +1,51 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lease + +type LeaseWithTime struct { + leaseId LeaseID + expiration int64 + index int +} + +type LeaseQueue []*LeaseWithTime + +func (pq LeaseQueue) Len() int { return len(pq) } + +func (pq LeaseQueue) Less(i, j int) bool { + return pq[i].expiration < pq[j].expiration +} + +func (pq LeaseQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *LeaseQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*LeaseWithTime) + item.index = n + *pq = append(*pq, item) +} + +func (pq *LeaseQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} diff --git a/lease/lessor.go b/lease/lessor.go index 31f645fa3..8609e47b6 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -15,6 +15,7 @@ package lease import ( + "container/heap" "encoding/binary" "errors" "math" @@ -128,9 +129,9 @@ type lessor struct { // We want to make Grant, Revoke, and findExpiredLeases all O(logN) and // Renew O(1). // findExpiredLeases and Renew should be the most frequent operations. - leaseMap map[LeaseID]*Lease - - itemMap map[LeaseItem]LeaseID + leaseMap map[LeaseID]*Lease + leaseHeap LeaseQueue + itemMap map[LeaseItem]LeaseID // When a lease expires, the lessor will delete the // leased range (or key) by the RangeDeleter. @@ -159,6 +160,7 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor { l := &lessor{ leaseMap: make(map[LeaseID]*Lease), itemMap: make(map[LeaseItem]LeaseID), + leaseHeap: make(LeaseQueue, 0), b: b, minLeaseTTL: minLeaseTTL, // expiredC is a small buffered chan to avoid unnecessary blocking. @@ -233,6 +235,8 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { } le.leaseMap[id] = l + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) l.persistTo(le.b) return l, nil @@ -315,6 +319,8 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { } l.refresh(0) + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) return l.ttl, nil } @@ -349,6 +355,8 @@ func (le *lessor) Promote(extend time.Duration) { // refresh the expiries of all leases. for _, l := range le.leaseMap { l.refresh(extend) + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) } if len(le.leaseMap) < leaseRevokeRate { @@ -384,6 +392,8 @@ func (le *lessor) Promote(extend time.Duration) { delay := time.Duration(rateDelay) nextWindow = baseWindow + delay l.refresh(delay + extend) + item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()} + heap.Push(&le.leaseHeap, item) } } @@ -516,9 +526,24 @@ func (le *lessor) runLoop() { func (le *lessor) findExpiredLeases(limit int) []*Lease { leases := make([]*Lease, 0, 16) - for _, l := range le.leaseMap { - // TODO: probably should change to <= 100-500 millisecond to - // make up committing latency. + for { + if le.leaseHeap.Len() == 0 { + break + } + + item := heap.Pop(&le.leaseHeap).(*LeaseWithTime) + l := le.leaseMap[item.leaseId] + if l == nil { + // lease has expired or been revoked, continue + continue + } + if time.Now().UnixNano() < item.expiration { + // Candidate expirations are caught up, reinsert this item + heap.Push(&le.leaseHeap, item) + break + } + // if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap + if l.expired() { leases = append(leases, l) @@ -560,6 +585,7 @@ func (le *lessor) initAndRecover() { revokec: make(chan struct{}), } } + heap.Init(&le.leaseHeap) tx.Unlock() le.b.ForceCommit()