From f634b44046eedf8fe3eeb22876d9ac347fc40a0e Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Wed, 9 Feb 2022 11:54:06 -0800 Subject: [PATCH] backport 3.5: #13676 load all leases from backend --- server/lease/lessor.go | 38 ++++++--- server/lease/lessor_test.go | 91 ++++++++++++++++++++++ tests/integration/cluster.go | 6 ++ tests/integration/v3_alarm_test.go | 119 +++++++++++++++++++++++++++++ tests/integration/v3_lease_test.go | 86 +++++++++++++++++++++ 5 files changed, 330 insertions(+), 10 deletions(-) diff --git a/server/lease/lessor.go b/server/lease/lessor.go index ce7a7a89a..409fa5b2a 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -19,6 +19,7 @@ import ( "context" "encoding/binary" "errors" + "fmt" "math" "sort" "sync" @@ -799,15 +800,9 @@ func (le *lessor) initAndRecover() { tx.Lock() tx.UnsafeCreateBucket(buckets.Lease) - _, vs := tx.UnsafeRange(buckets.Lease, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0) - // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue. - for i := range vs { - var lpb leasepb.Lease - err := lpb.Unmarshal(vs[i]) - if err != nil { - tx.Unlock() - panic("failed to unmarshal lease proto item") - } + lpbs := unsafeGetAllLeases(tx) + tx.Unlock() + for _, lpb := range lpbs { ID := LeaseID(lpb.ID) if lpb.TTL < le.minLeaseTTL { lpb.TTL = le.minLeaseTTL @@ -825,7 +820,6 @@ func (le *lessor) initAndRecover() { } le.leaseExpiredNotifier.Init() heap.Init(&le.leaseCheckpointHeap) - tx.Unlock() le.b.ForceCommit() } @@ -923,6 +917,30 @@ func int64ToBytes(n int64) []byte { return bytes } +func bytesToLeaseID(bytes []byte) int64 { + if len(bytes) != 8 { + panic(fmt.Errorf("lease ID must be 8-byte")) + } + return int64(binary.BigEndian.Uint64(bytes)) +} + +func unsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease { + ls := make([]*leasepb.Lease, 0) + err := tx.UnsafeForEach(buckets.Lease, func(k, v []byte) error { + var lpb leasepb.Lease + err := lpb.Unmarshal(v) + if err != nil { + return fmt.Errorf("failed to Unmarshal lease proto item; lease ID=%016x", bytesToLeaseID(k)) + } + ls = append(ls, &lpb) + return nil + }) + if err != nil { + panic(err) + } + return ls +} + // FakeLessor is a fake implementation of Lessor interface. // Used for testing only. type FakeLessor struct{} diff --git a/server/lease/lessor_test.go b/server/lease/lessor_test.go index 65118ab3d..0edebdadd 100644 --- a/server/lease/lessor_test.go +++ b/server/lease/lessor_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io/ioutil" + "math" "os" "path/filepath" "reflect" @@ -27,9 +28,12 @@ import ( "time" "github.com/coreos/go-semver/semver" + "github.com/stretchr/testify/assert" pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/version" + "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/mvcc/backend" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap" ) @@ -649,6 +653,93 @@ func TestLessorCheckpointPersistenceAfterRestart(t *testing.T) { } } +func TestLeaseBackend(t *testing.T) { + tcs := []struct { + name string + setup func(tx backend.BatchTx) + want []*leasepb.Lease + }{ + { + name: "Empty by default", + setup: func(tx backend.BatchTx) {}, + want: []*leasepb.Lease{}, + }, + { + name: "Returns data put before", + setup: func(tx backend.BatchTx) { + mustUnsafePutLease(tx, &leasepb.Lease{ + ID: -1, + TTL: 2, + }) + }, + want: []*leasepb.Lease{ + { + ID: -1, + TTL: 2, + }, + }, + }, + { + name: "Skips deleted", + setup: func(tx backend.BatchTx) { + mustUnsafePutLease(tx, &leasepb.Lease{ + ID: -1, + TTL: 2, + }) + mustUnsafePutLease(tx, &leasepb.Lease{ + ID: math.MinInt64, + TTL: 2, + }) + mustUnsafePutLease(tx, &leasepb.Lease{ + ID: math.MaxInt64, + TTL: 3, + }) + tx.UnsafeDelete(buckets.Lease, int64ToBytes(-1)) + }, + want: []*leasepb.Lease{ + { + ID: math.MaxInt64, + TTL: 3, + }, + { + ID: math.MinInt64, // bytes bigger than MaxInt64 + TTL: 2, + }, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) + tx := be.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(buckets.Lease) + tc.setup(tx) + tx.Unlock() + + be.ForceCommit() + be.Close() + + be2 := backend.NewDefaultBackend(tmpPath) + defer be2.Close() + leases := unsafeGetAllLeases(be2.ReadTx()) + + assert.Equal(t, tc.want, leases) + }) + } +} + +func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) { + key := int64ToBytes(lpb.ID) + + val, err := lpb.Marshal() + if err != nil { + panic("failed to marshal lease proto item") + } + tx.UnsafePut(buckets.Lease, key, val) +} + type fakeDeleter struct { deleted []string tx backend.BatchTx diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index b088df677..8b2cb53f3 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -170,6 +170,7 @@ type ClusterConfig struct { LeaseCheckpointPersist bool WatchProgressNotifyInterval time.Duration + CorruptCheckTime time.Duration } type cluster struct { @@ -332,6 +333,7 @@ func (c *cluster) mustNewMember(t testutil.TB, memberNumber int64) *member { leaseCheckpointPersist: c.cfg.LeaseCheckpointPersist, leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, + CorruptCheckTime: c.cfg.CorruptCheckTime, }) m.DiscoveryURL = c.cfg.DiscoveryURL if c.cfg.UseGRPC { @@ -635,6 +637,7 @@ type memberConfig struct { leaseCheckpointInterval time.Duration leaseCheckpointPersist bool WatchProgressNotifyInterval time.Duration + CorruptCheckTime time.Duration } // mustNewMember return an inited member with the given name. If peerTLS is @@ -737,6 +740,9 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member { m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval m.InitialCorruptCheck = true + if mcfg.CorruptCheckTime > time.Duration(0) { + m.CorruptCheckTime = mcfg.CorruptCheckTime + } m.WarningApplyDuration = embed.DefaultWarningApplyDuration m.V2Deprecation = config.V2_DEPR_DEFAULT diff --git a/tests/integration/v3_alarm_test.go b/tests/integration/v3_alarm_test.go index f39bd48cc..3d66eb78e 100644 --- a/tests/integration/v3_alarm_test.go +++ b/tests/integration/v3_alarm_test.go @@ -16,6 +16,7 @@ package integration import ( "context" + "encoding/binary" "os" "path/filepath" "sync" @@ -25,8 +26,10 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/pkg/v3/traceutil" + "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/mvcc" "go.etcd.io/etcd/server/v3/mvcc/backend" + "go.etcd.io/etcd/server/v3/mvcc/buckets" "go.uber.org/zap/zaptest" ) @@ -228,3 +231,119 @@ func TestV3CorruptAlarm(t *testing.T) { } t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second) } + +func TestV3CorruptAlarmWithLeaseCorrupted(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{ + CorruptCheckTime: time.Second, + Size: 3, + SnapshotCount: 10, + SnapshotCatchUpEntries: 5, + }) + defer clus.Terminate(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: 1, TTL: 60}) + if err != nil { + t.Errorf("could not create lease 1 (%v)", err) + } + if lresp.ID != 1 { + t.Errorf("got id %v, wanted id %v", lresp.ID, 1) + } + + putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID} + // Trigger snapshot from the leader to new member + for i := 0; i < 15; i++ { + _, err := toGRPC(clus.RandClient()).KV.Put(ctx, putr) + if err != nil { + t.Errorf("#%d: couldn't put key (%v)", i, err) + } + } + + clus.RemoveMember(t, uint64(clus.Members[2].ID())) + oldMemberClient := clus.Client(2) + if err := oldMemberClient.Close(); err != nil { + t.Fatal(err) + } + + clus.AddMember(t) + // Wait for new member to catch up + newMemberClient, err := clus.NewClientV3(2) + if err != nil { + t.Fatal(err) + } + WaitClientV3(t, newMemberClient) + clus.clients[2] = newMemberClient + + // Corrupt member 2 by modifying backend lease bucket offline. + clus.Members[2].Stop(t) + fp := filepath.Join(clus.Members[2].DataDir, "member", "snap", "db") + bcfg := backend.DefaultBackendConfig() + bcfg.Path = fp + bcfg.Logger = zaptest.NewLogger(t) + be := backend.New(bcfg) + + tx := be.BatchTx() + tx.UnsafeDelete(buckets.Lease, leaseIdToBytes(1)) + lpb := leasepb.Lease{ID: int64(2), TTL: 60} + mustUnsafePutLease(tx, &lpb) + tx.Commit() + + if err := be.Close(); err != nil { + t.Fatal(err) + } + + if err := clus.Members[2].Restart(t); err != nil { + t.Fatal(err) + } + + clus.Members[1].WaitOK(t) + clus.Members[2].WaitOK(t) + + // Revoke lease should remove key except the member with corruption + _, err = toGRPC(clus.Client(0)).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp.ID}) + if err != nil { + t.Fatal(err) + } + resp0, err0 := clus.Client(1).KV.Get(context.TODO(), "foo") + if err0 != nil { + t.Fatal(err0) + } + resp1, err1 := clus.Client(2).KV.Get(context.TODO(), "foo") + if err1 != nil { + t.Fatal(err1) + } + + if resp0.Header.Revision == resp1.Header.Revision { + t.Fatalf("matching Revision values") + } + + // Wait for CorruptCheckTime + time.Sleep(time.Second) + presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa") + if perr != nil { + if !eqErrGRPC(perr, rpctypes.ErrCorrupt) { + t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr) + } else { + return + } + } +} + +func leaseIdToBytes(n int64) []byte { + bytes := make([]byte, 8) + binary.BigEndian.PutUint64(bytes, uint64(n)) + return bytes +} + +func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) { + key := leaseIdToBytes(lpb.ID) + + val, err := lpb.Marshal() + if err != nil { + panic("failed to marshal lease proto item") + } + tx.UnsafePut(buckets.Lease, key, val) +} diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 04ecc0979..29b612e23 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -17,6 +17,7 @@ package integration import ( "context" "fmt" + "math" "testing" "time" @@ -141,6 +142,91 @@ func TestV3LeaseGrantByID(t *testing.T) { } } +// TestV3LeaseNegativeID ensures restarted member lessor can recover negative leaseID from backend. +// +// When the negative leaseID is used for lease revoke, all etcd nodes will remove the lease +// and delete associated keys to ensure kv store data consistency +// +// It ensures issue 12535 is fixed by PR 13676 +func TestV3LeaseNegativeID(t *testing.T) { + tcs := []struct { + leaseID int64 + k []byte + v []byte + }{ + { + leaseID: -1, // int64 -1 is 2^64 -1 in uint64 + k: []byte("foo"), + v: []byte("bar"), + }, + { + leaseID: math.MaxInt64, + k: []byte("bar"), + v: []byte("foo"), + }, + { + leaseID: math.MinInt64, + k: []byte("hello"), + v: []byte("world"), + }, + } + for _, tc := range tcs { + t.Run(fmt.Sprintf("test with lease ID %16x", tc.leaseID), func(t *testing.T) { + BeforeTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cc := clus.RandClient() + lresp, err := toGRPC(cc).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: tc.leaseID, TTL: 300}) + if err != nil { + t.Errorf("could not create lease %d (%v)", tc.leaseID, err) + } + if lresp.ID != tc.leaseID { + t.Errorf("got id %v, wanted id %v", lresp.ID, tc.leaseID) + } + putr := &pb.PutRequest{Key: tc.k, Value: tc.v, Lease: tc.leaseID} + _, err = toGRPC(cc).KV.Put(ctx, putr) + if err != nil { + t.Errorf("couldn't put key (%v)", err) + } + + // wait for backend Commit + time.Sleep(100 * time.Millisecond) + // restore lessor from db file + clus.Members[2].Stop(t) + if err := clus.Members[2].Restart(t); err != nil { + t.Fatal(err) + } + + // revoke lease should remove key + WaitClientV3(t, clus.Client(2)) + _, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: tc.leaseID}) + if err != nil { + t.Errorf("could not revoke lease %d (%v)", tc.leaseID, err) + } + var revision int64 + for i := range clus.Members { + getr := &pb.RangeRequest{Key: tc.k} + getresp, err := toGRPC(clus.Client(i)).KV.Range(ctx, getr) + if err != nil { + t.Fatal(err) + } + if revision == 0 { + revision = getresp.Header.Revision + } + if revision != getresp.Header.Revision { + t.Errorf("expect revision %d, but got %d", revision, getresp.Header.Revision) + } + if len(getresp.Kvs) != 0 { + t.Errorf("lease removed but key remains") + } + } + }) + } +} + // TestV3LeaseExpire ensures a key is deleted once a key expires. func TestV3LeaseExpire(t *testing.T) { BeforeTest(t)