Merge pull request #13165 from serathius/lease

etcdserver: Move get/put/delete on Lease bucket to bucket package
This commit is contained in:
Piotr Tabor 2021-07-03 11:31:04 +02:00 committed by GitHub
commit f7ad896691
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 96 additions and 33 deletions

View File

@ -336,7 +336,7 @@ func (le *lessor) Revoke(id LeaseID) error {
// lease deletion needs to be in the same backend transaction with the
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID)))
buckets.UnsafeDeleteLease(le.b.BatchTx(), &leasepb.Lease{ID: int64(l.ID)})
txn.End()
@ -768,18 +768,12 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
func (le *lessor) initAndRecover() {
tx := le.b.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Lease)
_, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
for i := range vs {
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[i])
if err != nil {
tx.Unlock()
panic("failed to unmarshal lease proto item")
}
tx.Lock()
buckets.UnsafeCreateLeaseBucket(tx)
lpbs := buckets.MustUnsafeGetAllLeases(tx)
tx.Unlock()
for _, lpb := range lpbs {
ID := LeaseID(lpb.ID)
if lpb.TTL < le.minLeaseTTL {
lpb.TTL = le.minLeaseTTL
@ -796,7 +790,6 @@ func (le *lessor) initAndRecover() {
}
le.leaseExpiredNotifier.Init()
heap.Init(&le.leaseCheckpointHeap)
tx.Unlock()
le.b.ForceCommit()
}
@ -821,17 +814,11 @@ func (l *Lease) expired() bool {
}
func (l *Lease) persistTo(b backend.Backend) {
key := int64ToBytes(int64(l.ID))
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
}
b.BatchTx().Lock()
b.BatchTx().UnsafePut(buckets.Lease, key, val)
b.BatchTx().Unlock()
tx := b.BatchTx()
tx.Lock()
defer tx.Unlock()
buckets.MustUnsafePutLease(tx, &lpb)
}
// TTL returns the TTL of the Lease.

View File

@ -92,12 +92,13 @@ func TestLessorGrant(t *testing.T) {
}
}
be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0)
if len(vs) != 1 {
t.Errorf("len(vs) = %d, want 1", len(vs))
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID))
if lpb == nil {
t.Errorf("lpb = %d, want not nil", lpb)
}
be.BatchTx().Unlock()
}
// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
@ -195,12 +196,13 @@ func TestLessorRevoke(t *testing.T) {
t.Errorf("deleted= %v, want %v", fd.deleted, wdeleted)
}
be.BatchTx().Lock()
_, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0)
if len(vs) != 0 {
t.Errorf("len(vs) = %d, want 0", len(vs))
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID))
if lpb != nil {
t.Errorf("lpb = %d, want nil", lpb)
}
be.BatchTx().Unlock()
}
// TestLessorRenew ensures Lessor can renew an existing lease.

View File

@ -0,0 +1,74 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package buckets
import (
"encoding/binary"
"math"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
)
func UnsafeCreateLeaseBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(Lease)
}
func MustUnsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
_, vs := tx.UnsafeRange(Lease, leaseIdToBytes(0), leaseIdToBytes(math.MaxInt64), 0)
ls := make([]*leasepb.Lease, 0, len(vs))
for i := range vs {
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[i])
if err != nil {
panic("failed to unmarshal lease proto item")
}
ls = append(ls, &lpb)
}
return ls
}
func MustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
key := leaseIdToBytes(lpb.ID)
val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
}
tx.UnsafePut(Lease, key, val)
}
func UnsafeDeleteLease(tx backend.BatchTx, lpb *leasepb.Lease) {
tx.UnsafeDelete(Lease, leaseIdToBytes(lpb.ID))
}
func MustUnsafeGetLease(tx backend.BatchTx, leaseID int64) *leasepb.Lease {
_, vs := tx.UnsafeRange(Lease, leaseIdToBytes(leaseID), nil, 0)
if len(vs) != 1 {
return nil
}
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[0])
if err != nil {
panic("failed to unmarshal lease proto item")
}
return &lpb
}
func leaseIdToBytes(n int64) []byte {
bytes := make([]byte, 8)
binary.BigEndian.PutUint64(bytes, uint64(n))
return bytes
}