mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
etcdserver: Move get/put/delete on Lease bucket to bucket package
This commit is contained in:
parent
a6317392f4
commit
9e511665c5
@ -336,7 +336,7 @@ func (le *lessor) Revoke(id LeaseID) error {
|
|||||||
// lease deletion needs to be in the same backend transaction with the
|
// lease deletion needs to be in the same backend transaction with the
|
||||||
// kv deletion. Or we might end up with not executing the revoke or not
|
// kv deletion. Or we might end up with not executing the revoke or not
|
||||||
// deleting the keys if etcdserver fails in between.
|
// deleting the keys if etcdserver fails in between.
|
||||||
le.b.BatchTx().UnsafeDelete(buckets.Lease, int64ToBytes(int64(l.ID)))
|
buckets.UnsafeDeleteLease(le.b.BatchTx(), &leasepb.Lease{ID: int64(l.ID)})
|
||||||
|
|
||||||
txn.End()
|
txn.End()
|
||||||
|
|
||||||
@ -768,18 +768,12 @@ func (le *lessor) findDueScheduledCheckpoints(checkpointLimit int) []*pb.LeaseCh
|
|||||||
|
|
||||||
func (le *lessor) initAndRecover() {
|
func (le *lessor) initAndRecover() {
|
||||||
tx := le.b.BatchTx()
|
tx := le.b.BatchTx()
|
||||||
tx.Lock()
|
|
||||||
|
|
||||||
tx.UnsafeCreateBucket(buckets.Lease)
|
tx.Lock()
|
||||||
_, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
|
buckets.UnsafeCreateLeaseBucket(tx)
|
||||||
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
|
lpbs := buckets.MustUnsafeGetAllLeases(tx)
|
||||||
for i := range vs {
|
tx.Unlock()
|
||||||
var lpb leasepb.Lease
|
for _, lpb := range lpbs {
|
||||||
err := lpb.Unmarshal(vs[i])
|
|
||||||
if err != nil {
|
|
||||||
tx.Unlock()
|
|
||||||
panic("failed to unmarshal lease proto item")
|
|
||||||
}
|
|
||||||
ID := LeaseID(lpb.ID)
|
ID := LeaseID(lpb.ID)
|
||||||
if lpb.TTL < le.minLeaseTTL {
|
if lpb.TTL < le.minLeaseTTL {
|
||||||
lpb.TTL = le.minLeaseTTL
|
lpb.TTL = le.minLeaseTTL
|
||||||
@ -796,7 +790,6 @@ func (le *lessor) initAndRecover() {
|
|||||||
}
|
}
|
||||||
le.leaseExpiredNotifier.Init()
|
le.leaseExpiredNotifier.Init()
|
||||||
heap.Init(&le.leaseCheckpointHeap)
|
heap.Init(&le.leaseCheckpointHeap)
|
||||||
tx.Unlock()
|
|
||||||
|
|
||||||
le.b.ForceCommit()
|
le.b.ForceCommit()
|
||||||
}
|
}
|
||||||
@ -821,17 +814,11 @@ func (l *Lease) expired() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *Lease) persistTo(b backend.Backend) {
|
func (l *Lease) persistTo(b backend.Backend) {
|
||||||
key := int64ToBytes(int64(l.ID))
|
|
||||||
|
|
||||||
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
|
lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
|
||||||
val, err := lpb.Marshal()
|
tx := b.BatchTx()
|
||||||
if err != nil {
|
tx.Lock()
|
||||||
panic("failed to marshal lease proto item")
|
defer tx.Unlock()
|
||||||
}
|
buckets.MustUnsafePutLease(tx, &lpb)
|
||||||
|
|
||||||
b.BatchTx().Lock()
|
|
||||||
b.BatchTx().UnsafePut(buckets.Lease, key, val)
|
|
||||||
b.BatchTx().Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TTL returns the TTL of the Lease.
|
// TTL returns the TTL of the Lease.
|
||||||
|
@ -92,12 +92,13 @@ func TestLessorGrant(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
be.BatchTx().Lock()
|
tx := be.BatchTx()
|
||||||
_, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0)
|
tx.Lock()
|
||||||
if len(vs) != 1 {
|
defer tx.Unlock()
|
||||||
t.Errorf("len(vs) = %d, want 1", len(vs))
|
lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID))
|
||||||
|
if lpb == nil {
|
||||||
|
t.Errorf("lpb = %d, want not nil", lpb)
|
||||||
}
|
}
|
||||||
be.BatchTx().Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
|
// TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded
|
||||||
@ -195,12 +196,13 @@ func TestLessorRevoke(t *testing.T) {
|
|||||||
t.Errorf("deleted= %v, want %v", fd.deleted, wdeleted)
|
t.Errorf("deleted= %v, want %v", fd.deleted, wdeleted)
|
||||||
}
|
}
|
||||||
|
|
||||||
be.BatchTx().Lock()
|
tx := be.BatchTx()
|
||||||
_, vs := be.BatchTx().UnsafeRange(buckets.Lease, int64ToBytes(int64(l.ID)), nil, 0)
|
tx.Lock()
|
||||||
if len(vs) != 0 {
|
defer tx.Unlock()
|
||||||
t.Errorf("len(vs) = %d, want 0", len(vs))
|
lpb := buckets.MustUnsafeGetLease(tx, int64(l.ID))
|
||||||
|
if lpb != nil {
|
||||||
|
t.Errorf("lpb = %d, want nil", lpb)
|
||||||
}
|
}
|
||||||
be.BatchTx().Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestLessorRenew ensures Lessor can renew an existing lease.
|
// TestLessorRenew ensures Lessor can renew an existing lease.
|
||||||
|
74
server/mvcc/buckets/lease.go
Normal file
74
server/mvcc/buckets/lease.go
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
// Copyright 2021 The etcd Authors
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package buckets
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"math"
|
||||||
|
|
||||||
|
"go.etcd.io/etcd/server/v3/lease/leasepb"
|
||||||
|
"go.etcd.io/etcd/server/v3/mvcc/backend"
|
||||||
|
)
|
||||||
|
|
||||||
|
func UnsafeCreateLeaseBucket(tx backend.BatchTx) {
|
||||||
|
tx.UnsafeCreateBucket(Lease)
|
||||||
|
}
|
||||||
|
|
||||||
|
func MustUnsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
|
||||||
|
_, vs := tx.UnsafeRange(Lease, leaseIdToBytes(0), leaseIdToBytes(math.MaxInt64), 0)
|
||||||
|
ls := make([]*leasepb.Lease, 0, len(vs))
|
||||||
|
for i := range vs {
|
||||||
|
var lpb leasepb.Lease
|
||||||
|
err := lpb.Unmarshal(vs[i])
|
||||||
|
if err != nil {
|
||||||
|
panic("failed to unmarshal lease proto item")
|
||||||
|
}
|
||||||
|
ls = append(ls, &lpb)
|
||||||
|
}
|
||||||
|
return ls
|
||||||
|
}
|
||||||
|
|
||||||
|
func MustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
|
||||||
|
key := leaseIdToBytes(lpb.ID)
|
||||||
|
|
||||||
|
val, err := lpb.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
panic("failed to marshal lease proto item")
|
||||||
|
}
|
||||||
|
tx.UnsafePut(Lease, key, val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func UnsafeDeleteLease(tx backend.BatchTx, lpb *leasepb.Lease) {
|
||||||
|
tx.UnsafeDelete(Lease, leaseIdToBytes(lpb.ID))
|
||||||
|
}
|
||||||
|
|
||||||
|
func MustUnsafeGetLease(tx backend.BatchTx, leaseID int64) *leasepb.Lease {
|
||||||
|
_, vs := tx.UnsafeRange(Lease, leaseIdToBytes(leaseID), nil, 0)
|
||||||
|
if len(vs) != 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var lpb leasepb.Lease
|
||||||
|
err := lpb.Unmarshal(vs[0])
|
||||||
|
if err != nil {
|
||||||
|
panic("failed to unmarshal lease proto item")
|
||||||
|
}
|
||||||
|
return &lpb
|
||||||
|
}
|
||||||
|
|
||||||
|
func leaseIdToBytes(n int64) []byte {
|
||||||
|
bytes := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint64(bytes, uint64(n))
|
||||||
|
return bytes
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user