backport 3.5: #13676 load all leases from backend

This commit is contained in:
Chao Chen
2022-02-09 11:54:06 -08:00
parent fa191c64bd
commit f634b44046
5 changed files with 330 additions and 10 deletions

View File

@@ -19,6 +19,7 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"math"
"sort"
"sync"
@@ -799,15 +800,9 @@ func (le *lessor) initAndRecover() {
tx.Lock()
tx.UnsafeCreateBucket(buckets.Lease)
_, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
for i := range vs {
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[i])
if err != nil {
tx.Unlock()
panic("failed to unmarshal lease proto item")
}
lpbs := unsafeGetAllLeases(tx)
tx.Unlock()
for _, lpb := range lpbs {
ID := LeaseID(lpb.ID)
if lpb.TTL < le.minLeaseTTL {
lpb.TTL = le.minLeaseTTL
@@ -825,7 +820,6 @@ func (le *lessor) initAndRecover() {
}
le.leaseExpiredNotifier.Init()
heap.Init(&le.leaseCheckpointHeap)
tx.Unlock()
le.b.ForceCommit()
}
@@ -923,6 +917,30 @@ func int64ToBytes(n int64) []byte {
return bytes
}
func bytesToLeaseID(bytes []byte) int64 {
if len(bytes) != 8 {
panic(fmt.Errorf("lease ID must be 8-byte"))
}
return int64(binary.BigEndian.Uint64(bytes))
}
func unsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
ls := make([]*leasepb.Lease, 0)
err := tx.UnsafeForEach(buckets.Lease, func(k, v []byte) error {
var lpb leasepb.Lease
err := lpb.Unmarshal(v)
if err != nil {
return fmt.Errorf("failed to Unmarshal lease proto item; lease ID=%016x", bytesToLeaseID(k))
}
ls = append(ls, &lpb)
return nil
})
if err != nil {
panic(err)
}
return ls
}
// FakeLessor is a fake implementation of Lessor interface.
// Used for testing only.
type FakeLessor struct{}

View File

@@ -18,6 +18,7 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"os"
"path/filepath"
"reflect"
@@ -27,9 +28,12 @@ import (
"time"
"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/assert"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/server/v3/lease/leasepb"
"go.etcd.io/etcd/server/v3/mvcc/backend"
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
"go.etcd.io/etcd/server/v3/mvcc/buckets"
"go.uber.org/zap"
)
@@ -649,6 +653,93 @@ func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) {
}
}
func TestLeaseBackend(t *testing.T) {
tcs := []struct {
name string
setup func(tx backend.BatchTx)
want []*leasepb.Lease
}{
{
name: "Empty by default",
setup: func(tx backend.BatchTx) {},
want: []*leasepb.Lease{},
},
{
name: "Returns data put before",
setup: func(tx backend.BatchTx) {
mustUnsafePutLease(tx, &leasepb.Lease{
ID: -1,
TTL: 2,
})
},
want: []*leasepb.Lease{
{
ID: -1,
TTL: 2,
},
},
},
{
name: "Skips deleted",
setup: func(tx backend.BatchTx) {
mustUnsafePutLease(tx, &leasepb.Lease{
ID: -1,
TTL: 2,
})
mustUnsafePutLease(tx, &leasepb.Lease{
ID: math.MinInt64,
TTL: 2,
})
mustUnsafePutLease(tx, &leasepb.Lease{
ID: math.MaxInt64,
TTL: 3,
})
tx.UnsafeDelete(buckets.Lease, int64ToBytes(-1))
},
want: []*leasepb.Lease{
{
ID: math.MaxInt64,
TTL: 3,
},
{
ID: math.MinInt64, // bytes bigger than MaxInt64
TTL: 2,
},
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
tx := be.BatchTx()
tx.Lock()
tx.UnsafeCreateBucket(buckets.Lease)
tc.setup(tx)
tx.Unlock()
be.ForceCommit()
be.Close()
be2 := backend.NewDefaultBackend(tmpPath)
defer be2.Close()
leases := unsafeGetAllLeases(be2.ReadTx())
assert.Equal(t, tc.want, leases)
})
}
}
func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
key := int64ToBytes(lpb.ID)
val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
}
tx.UnsafePut(buckets.Lease, key, val)
}
type fakeDeleter struct {
deleted []string
tx backend.BatchTx