mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #13676 from chaochn47/fix_12535
Load all leases from backend
This commit is contained in:
commit
09e35a44ec
@ -45,6 +45,7 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
|
||||
- Fix [A client can panic etcd by passing invalid utf-8 in the client-api-version header](https://github.com/etcd-io/etcd/pull/13560)
|
||||
- Fix [etcd gateway doesn't format the endpoint of IPv6 address correctly](https://github.com/etcd-io/etcd/pull/13551)
|
||||
- Fix [A client can cause a nil dereference in etcd by passing an invalid SortTarget](https://github.com/etcd-io/etcd/pull/13555)
|
||||
- Fix [Grant lease with negative ID can possibly cause db out of sync](https://github.com/etcd-io/etcd/pull/13676)
|
||||
|
||||
### tools/benchmark
|
||||
|
||||
|
@ -16,7 +16,7 @@ package schema
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"math"
|
||||
"fmt"
|
||||
|
||||
"go.etcd.io/etcd/server/v3/lease/leasepb"
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
@ -27,15 +27,18 @@ func UnsafeCreateLeaseBucket(tx backend.BatchTx) {
|
||||
}
|
||||
|
||||
func MustUnsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
|
||||
_, vs := tx.UnsafeRange(Lease, leaseIdToBytes(0), leaseIdToBytes(math.MaxInt64), 0)
|
||||
ls := make([]*leasepb.Lease, 0, len(vs))
|
||||
for i := range vs {
|
||||
ls := make([]*leasepb.Lease, 0)
|
||||
err := tx.UnsafeForEach(Lease, func(k, v []byte) error {
|
||||
var lpb leasepb.Lease
|
||||
err := lpb.Unmarshal(vs[i])
|
||||
err := lpb.Unmarshal(v)
|
||||
if err != nil {
|
||||
panic("failed to unmarshal lease proto item")
|
||||
return fmt.Errorf("failed to Unmarshal lease proto item; lease ID=%016x", bytesToLeaseID(k))
|
||||
}
|
||||
ls = append(ls, &lpb)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return ls
|
||||
}
|
||||
@ -72,3 +75,10 @@ func leaseIdToBytes(n int64) []byte {
|
||||
binary.BigEndian.PutUint64(bytes, uint64(n))
|
||||
return bytes
|
||||
}
|
||||
|
||||
func bytesToLeaseID(bytes []byte) int64 {
|
||||
if len(bytes) != 8 {
|
||||
panic(fmt.Errorf("lease ID must be 8-byte"))
|
||||
}
|
||||
return int64(binary.BigEndian.Uint64(bytes))
|
||||
}
|
||||
|
106
server/storage/schema/lease_test.go
Normal file
106
server/storage/schema/lease_test.go
Normal file
@ -0,0 +1,106 @@
|
||||
// Copyright 2022 The etcd Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package schema
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.etcd.io/etcd/server/v3/lease/leasepb"
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
|
||||
)
|
||||
|
||||
func TestLeaseBackend(t *testing.T) {
|
||||
tcs := []struct {
|
||||
name string
|
||||
setup func(tx backend.BatchTx)
|
||||
want []*leasepb.Lease
|
||||
}{
|
||||
{
|
||||
name: "Empty by default",
|
||||
setup: func(tx backend.BatchTx) {},
|
||||
want: []*leasepb.Lease{},
|
||||
},
|
||||
{
|
||||
name: "Returns data put before",
|
||||
setup: func(tx backend.BatchTx) {
|
||||
MustUnsafePutLease(tx, &leasepb.Lease{
|
||||
ID: -1,
|
||||
TTL: 2,
|
||||
})
|
||||
},
|
||||
want: []*leasepb.Lease{
|
||||
{
|
||||
ID: -1,
|
||||
TTL: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Skips deleted",
|
||||
setup: func(tx backend.BatchTx) {
|
||||
MustUnsafePutLease(tx, &leasepb.Lease{
|
||||
ID: -1,
|
||||
TTL: 2,
|
||||
})
|
||||
MustUnsafePutLease(tx, &leasepb.Lease{
|
||||
ID: math.MinInt64,
|
||||
TTL: 2,
|
||||
})
|
||||
MustUnsafePutLease(tx, &leasepb.Lease{
|
||||
ID: math.MaxInt64,
|
||||
TTL: 3,
|
||||
})
|
||||
UnsafeDeleteLease(tx, &leasepb.Lease{
|
||||
ID: -1,
|
||||
TTL: 2,
|
||||
})
|
||||
},
|
||||
want: []*leasepb.Lease{
|
||||
{
|
||||
ID: math.MaxInt64,
|
||||
TTL: 3,
|
||||
},
|
||||
{
|
||||
ID: math.MinInt64, // bytes bigger than MaxInt64
|
||||
TTL: 2,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10)
|
||||
tx := be.BatchTx()
|
||||
tx.Lock()
|
||||
UnsafeCreateLeaseBucket(tx)
|
||||
tc.setup(tx)
|
||||
tx.Unlock()
|
||||
|
||||
be.ForceCommit()
|
||||
be.Close()
|
||||
|
||||
be2 := backend.NewDefaultBackend(tmpPath)
|
||||
defer be2.Close()
|
||||
leases := MustUnsafeGetAllLeases(be2.ReadTx())
|
||||
|
||||
assert.Equal(t, tc.want, leases)
|
||||
})
|
||||
}
|
||||
}
|
@ -170,6 +170,7 @@ type ClusterConfig struct {
|
||||
WatchProgressNotifyInterval time.Duration
|
||||
ExperimentalMaxLearners int
|
||||
StrictReconfigCheck bool
|
||||
CorruptCheckTime time.Duration
|
||||
}
|
||||
|
||||
type Cluster struct {
|
||||
@ -282,6 +283,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
|
||||
WatchProgressNotifyInterval: c.Cfg.WatchProgressNotifyInterval,
|
||||
ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners,
|
||||
StrictReconfigCheck: c.Cfg.StrictReconfigCheck,
|
||||
CorruptCheckTime: c.Cfg.CorruptCheckTime,
|
||||
})
|
||||
m.DiscoveryURL = c.Cfg.DiscoveryURL
|
||||
return m
|
||||
@ -571,6 +573,7 @@ type MemberConfig struct {
|
||||
WatchProgressNotifyInterval time.Duration
|
||||
ExperimentalMaxLearners int
|
||||
StrictReconfigCheck bool
|
||||
CorruptCheckTime time.Duration
|
||||
}
|
||||
|
||||
// MustNewMember return an inited member with the given name. If peerTLS is
|
||||
@ -673,6 +676,9 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
|
||||
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval
|
||||
|
||||
m.InitialCorruptCheck = true
|
||||
if mcfg.CorruptCheckTime > time.Duration(0) {
|
||||
m.CorruptCheckTime = mcfg.CorruptCheckTime
|
||||
}
|
||||
m.WarningApplyDuration = embed.DefaultWarningApplyDuration
|
||||
m.WarningUnaryRequestDuration = embed.DefaultWarningUnaryRequestDuration
|
||||
m.ExperimentalMaxLearners = membership.DefaultMaxLearners
|
||||
|
@ -25,8 +25,10 @@ import (
|
||||
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
|
||||
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
|
||||
"go.etcd.io/etcd/pkg/v3/traceutil"
|
||||
"go.etcd.io/etcd/server/v3/lease/leasepb"
|
||||
"go.etcd.io/etcd/server/v3/storage/backend"
|
||||
"go.etcd.io/etcd/server/v3/storage/mvcc"
|
||||
"go.etcd.io/etcd/server/v3/storage/schema"
|
||||
"go.etcd.io/etcd/tests/v3/framework/integration"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
@ -253,3 +255,99 @@ func TestV3CorruptAlarm(t *testing.T) {
|
||||
}
|
||||
t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
|
||||
}
|
||||
|
||||
func TestV3CorruptAlarmWithLeaseCorrupted(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
clus := integration.NewCluster(t, &integration.ClusterConfig{
|
||||
CorruptCheckTime: time.Second,
|
||||
Size: 3,
|
||||
SnapshotCount: 10,
|
||||
SnapshotCatchUpEntries: 5,
|
||||
})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
lresp, err := integration.ToGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: 1, TTL: 60})
|
||||
if err != nil {
|
||||
t.Errorf("could not create lease 1 (%v)", err)
|
||||
}
|
||||
if lresp.ID != 1 {
|
||||
t.Errorf("got id %v, wanted id %v", lresp.ID, 1)
|
||||
}
|
||||
|
||||
putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID}
|
||||
// Trigger snapshot from the leader to new member
|
||||
for i := 0; i < 15; i++ {
|
||||
_, err := integration.ToGRPC(clus.RandClient()).KV.Put(ctx, putr)
|
||||
if err != nil {
|
||||
t.Errorf("#%d: couldn't put key (%v)", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := clus.RemoveMember(t, clus.Client(1), uint64(clus.Members[2].ID())); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
clus.WaitMembersForLeader(t, clus.Members)
|
||||
|
||||
clus.AddMember(t)
|
||||
clus.WaitMembersForLeader(t, clus.Members)
|
||||
// Wait for new member to catch up
|
||||
integration.WaitClientV3(t, clus.Members[2].Client)
|
||||
|
||||
// Corrupt member 2 by modifying backend lease bucket offline.
|
||||
clus.Members[2].Stop(t)
|
||||
fp := filepath.Join(clus.Members[2].DataDir, "member", "snap", "db")
|
||||
bcfg := backend.DefaultBackendConfig()
|
||||
bcfg.Path = fp
|
||||
bcfg.Logger = zaptest.NewLogger(t)
|
||||
be := backend.New(bcfg)
|
||||
|
||||
olpb := leasepb.Lease{ID: int64(1), TTL: 60}
|
||||
tx := be.BatchTx()
|
||||
schema.UnsafeDeleteLease(tx, &olpb)
|
||||
lpb := leasepb.Lease{ID: int64(2), TTL: 60}
|
||||
schema.MustUnsafePutLease(tx, &lpb)
|
||||
tx.Commit()
|
||||
|
||||
if err := be.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := clus.Members[2].Restart(t); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clus.Members[1].WaitOK(t)
|
||||
clus.Members[2].WaitOK(t)
|
||||
|
||||
// Revoke lease should remove key except the member with corruption
|
||||
_, err = integration.ToGRPC(clus.Members[0].Client).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp.ID})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
resp0, err0 := clus.Members[1].Client.KV.Get(context.TODO(), "foo")
|
||||
if err0 != nil {
|
||||
t.Fatal(err0)
|
||||
}
|
||||
resp1, err1 := clus.Members[2].Client.KV.Get(context.TODO(), "foo")
|
||||
if err1 != nil {
|
||||
t.Fatal(err1)
|
||||
}
|
||||
|
||||
if resp0.Header.Revision == resp1.Header.Revision {
|
||||
t.Fatalf("matching Revision values")
|
||||
}
|
||||
|
||||
// Wait for CorruptCheckTime
|
||||
time.Sleep(time.Second)
|
||||
presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
|
||||
if perr != nil {
|
||||
if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
|
||||
t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ package integration
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -142,6 +143,91 @@ func TestV3LeaseGrantByID(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3LeaseNegativeID ensures restarted member lessor can recover negative leaseID from backend.
|
||||
//
|
||||
// When the negative leaseID is used for lease revoke, all etcd nodes will remove the lease
|
||||
// and delete associated keys to ensure kv store data consistency
|
||||
//
|
||||
// It ensures issue 12535 is fixed by PR 13676
|
||||
func TestV3LeaseNegativeID(t *testing.T) {
|
||||
tcs := []struct {
|
||||
leaseID int64
|
||||
k []byte
|
||||
v []byte
|
||||
}{
|
||||
{
|
||||
leaseID: -1, // int64 -1 is 2^64 -1 in uint64
|
||||
k: []byte("foo"),
|
||||
v: []byte("bar"),
|
||||
},
|
||||
{
|
||||
leaseID: math.MaxInt64,
|
||||
k: []byte("bar"),
|
||||
v: []byte("foo"),
|
||||
},
|
||||
{
|
||||
leaseID: math.MinInt64,
|
||||
k: []byte("hello"),
|
||||
v: []byte("world"),
|
||||
},
|
||||
}
|
||||
for _, tc := range tcs {
|
||||
t.Run(fmt.Sprintf("test with lease ID %16x", tc.leaseID), func(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3})
|
||||
defer clus.Terminate(t)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
cc := clus.RandClient()
|
||||
lresp, err := integration.ToGRPC(cc).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: tc.leaseID, TTL: 300})
|
||||
if err != nil {
|
||||
t.Errorf("could not create lease %d (%v)", tc.leaseID, err)
|
||||
}
|
||||
if lresp.ID != tc.leaseID {
|
||||
t.Errorf("got id %v, wanted id %v", lresp.ID, tc.leaseID)
|
||||
}
|
||||
putr := &pb.PutRequest{Key: tc.k, Value: tc.v, Lease: tc.leaseID}
|
||||
_, err = integration.ToGRPC(cc).KV.Put(ctx, putr)
|
||||
if err != nil {
|
||||
t.Errorf("couldn't put key (%v)", err)
|
||||
}
|
||||
|
||||
// wait for backend Commit
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
// restore lessor from db file
|
||||
clus.Members[2].Stop(t)
|
||||
if err := clus.Members[2].Restart(t); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// revoke lease should remove key
|
||||
integration.WaitClientV3(t, clus.Members[2].Client)
|
||||
_, err = integration.ToGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: tc.leaseID})
|
||||
if err != nil {
|
||||
t.Errorf("could not revoke lease %d (%v)", tc.leaseID, err)
|
||||
}
|
||||
var revision int64
|
||||
for _, m := range clus.Members {
|
||||
getr := &pb.RangeRequest{Key: tc.k}
|
||||
getresp, err := integration.ToGRPC(m.Client).KV.Range(ctx, getr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if revision == 0 {
|
||||
revision = getresp.Header.Revision
|
||||
}
|
||||
if revision != getresp.Header.Revision {
|
||||
t.Errorf("expect revision %d, but got %d", revision, getresp.Header.Revision)
|
||||
}
|
||||
if len(getresp.Kvs) != 0 {
|
||||
t.Errorf("lease removed but key remains")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestV3LeaseExpire ensures a key is deleted once a key expires.
|
||||
func TestV3LeaseExpire(t *testing.T) {
|
||||
integration.BeforeTest(t)
|
||||
|
Loading…
x
Reference in New Issue
Block a user