From fd51434b542d376bc567812f5685579f393d8993 Mon Sep 17 00:00:00 2001 From: Chao Chen Date: Tue, 19 Jul 2022 15:49:13 -0700 Subject: [PATCH] backport 3.5: #13676 load all leases from backend Signed-off-by: Chao Chen --- go.mod | 1 + integration/cluster.go | 7 +- integration/v3_alarm_test.go | 122 +++++++++++++++++++++++++++++++++++ integration/v3_lease_test.go | 86 ++++++++++++++++++++++++ lease/lessor.go | 39 ++++++++--- lease/lessor_test.go | 90 ++++++++++++++++++++++++++ 6 files changed, 334 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index 843d3ad0a..3d68270ee 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/soheilhy/cmux v0.1.4 github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.1 + github.com/stretchr/testify v1.3.0 // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966 github.com/urfave/cli v1.20.0 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 diff --git a/integration/cluster.go b/integration/cluster.go index d01520cef..121414e14 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -154,6 +154,7 @@ type ClusterConfig struct { LeaseCheckpointInterval time.Duration WatchProgressNotifyInterval time.Duration + CorruptCheckTime time.Duration } type cluster struct { @@ -299,6 +300,7 @@ func (c *cluster) mustNewMember(t testing.TB) *member { enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, + CorruptCheckTime: c.cfg.CorruptCheckTime, }) m.DiscoveryURL = c.cfg.DiscoveryURL if c.cfg.UseGRPC { @@ -589,6 +591,7 @@ type memberConfig struct { enableLeaseCheckpoint bool leaseCheckpointInterval time.Duration WatchProgressNotifyInterval time.Duration + CorruptCheckTime time.Duration } // mustNewMember return an inited member with the given name. If peerTLS is @@ -685,7 +688,9 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member { m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval m.InitialCorruptCheck = true - + if mcfg.CorruptCheckTime > time.Duration(0) { + m.CorruptCheckTime = mcfg.CorruptCheckTime + } lcfg := logutil.DefaultZapLoggerConfig m.LoggerConfig = &lcfg m.LoggerConfig.OutputPaths = []string{"/dev/null"} diff --git a/integration/v3_alarm_test.go b/integration/v3_alarm_test.go index 0b2dd05ce..9a71f87f2 100644 --- a/integration/v3_alarm_test.go +++ b/integration/v3_alarm_test.go @@ -16,6 +16,7 @@ package integration import ( "context" + "encoding/binary" "os" "path/filepath" "sync" @@ -24,10 +25,12 @@ import ( "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" + "go.etcd.io/etcd/lease/leasepb" "go.etcd.io/etcd/mvcc" "go.etcd.io/etcd/mvcc/backend" "go.etcd.io/etcd/pkg/testutil" "go.etcd.io/etcd/pkg/traceutil" + "go.uber.org/zap/zaptest" "go.uber.org/zap" ) @@ -232,3 +235,122 @@ func TestV3CorruptAlarm(t *testing.T) { } t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second) } + +var leaseBucketName = []byte("lease") + +func TestV3CorruptAlarmWithLeaseCorrupted(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{ + CorruptCheckTime: time.Second, + Size: 3, + SnapshotCount: 10, + SnapshotCatchUpEntries: 5, + }) + defer clus.Terminate(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: 1, TTL: 60}) + if err != nil { + t.Errorf("could not create lease 1 (%v)", err) + } + if lresp.ID != 1 { + t.Errorf("got id %v, wanted id %v", lresp.ID, 1) + } + + putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID} + // Trigger snapshot from the leader to new member + for i := 0; i < 15; i++ { + _, err := toGRPC(clus.RandClient()).KV.Put(ctx, putr) + if err != nil { + t.Errorf("#%d: couldn't put key (%v)", i, err) + } + } + + clus.RemoveMember(t, uint64(clus.Members[2].ID())) + oldMemberClient := clus.Client(2) + if err := oldMemberClient.Close(); err != nil { + t.Fatal(err) + } + + clus.AddMember(t) + // Wait for new member to catch up + clus.Members[2].WaitStarted(t) + newMemberClient, err := NewClientV3(clus.Members[2]) + if err != nil { + t.Fatal(err) + } + WaitClientV3(t, newMemberClient) + clus.clients[2] = newMemberClient + + // Corrupt member 2 by modifying backend lease bucket offline. + clus.Members[2].Stop(t) + fp := filepath.Join(clus.Members[2].DataDir, "member", "snap", "db") + bcfg := backend.DefaultBackendConfig() + bcfg.Path = fp + bcfg.Logger = zaptest.NewLogger(t) + be := backend.New(bcfg) + + tx := be.BatchTx() + tx.UnsafeDelete(leaseBucketName, leaseIdToBytes(1)) + lpb := leasepb.Lease{ID: int64(2), TTL: 60} + mustUnsafePutLease(tx, &lpb) + tx.Commit() + + if err := be.Close(); err != nil { + t.Fatal(err) + } + + if err := clus.Members[2].Restart(t); err != nil { + t.Fatal(err) + } + + clus.Members[1].WaitOK(t) + clus.Members[2].WaitOK(t) + + // Revoke lease should remove key except the member with corruption + _, err = toGRPC(clus.Client(0)).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp.ID}) + if err != nil { + t.Fatal(err) + } + resp0, err0 := clus.Client(1).KV.Get(context.TODO(), "foo") + if err0 != nil { + t.Fatal(err0) + } + resp1, err1 := clus.Client(2).KV.Get(context.TODO(), "foo") + if err1 != nil { + t.Fatal(err1) + } + + if resp0.Header.Revision == resp1.Header.Revision { + t.Fatalf("matching Revision values") + } + + // Wait for CorruptCheckTime + time.Sleep(time.Second) + presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa") + if perr != nil { + if !eqErrGRPC(perr, rpctypes.ErrCorrupt) { + t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr) + } else { + return + } + } +} + +func leaseIdToBytes(n int64) []byte { + bytes := make([]byte, 8) + binary.BigEndian.PutUint64(bytes, uint64(n)) + return bytes +} + +func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) { + key := leaseIdToBytes(lpb.ID) + + val, err := lpb.Marshal() + if err != nil { + panic("failed to marshal lease proto item") + } + tx.UnsafePut(leaseBucketName, key, val) +} diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 659ebd924..26fa25a94 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -17,6 +17,7 @@ package integration import ( "context" "fmt" + "math" "testing" "time" @@ -139,6 +140,91 @@ func TestV3LeaseGrantByID(t *testing.T) { } } +// TestV3LeaseNegativeID ensures restarted member lessor can recover negative leaseID from backend. +// +// When the negative leaseID is used for lease revoke, all etcd nodes will remove the lease +// and delete associated keys to ensure kv store data consistency +// +// It ensures issue 12535 is fixed by PR 13676 +func TestV3LeaseNegativeID(t *testing.T) { + tcs := []struct { + leaseID int64 + k []byte + v []byte + }{ + { + leaseID: -1, // int64 -1 is 2^64 -1 in uint64 + k: []byte("foo"), + v: []byte("bar"), + }, + { + leaseID: math.MaxInt64, + k: []byte("bar"), + v: []byte("foo"), + }, + { + leaseID: math.MinInt64, + k: []byte("hello"), + v: []byte("world"), + }, + } + for _, tc := range tcs { + t.Run(fmt.Sprintf("test with lease ID %16x", tc.leaseID), func(t *testing.T) { + defer testutil.AfterTest(t) + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cc := clus.RandClient() + lresp, err := toGRPC(cc).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: tc.leaseID, TTL: 300}) + if err != nil { + t.Errorf("could not create lease %d (%v)", tc.leaseID, err) + } + if lresp.ID != tc.leaseID { + t.Errorf("got id %v, wanted id %v", lresp.ID, tc.leaseID) + } + putr := &pb.PutRequest{Key: tc.k, Value: tc.v, Lease: tc.leaseID} + _, err = toGRPC(cc).KV.Put(ctx, putr) + if err != nil { + t.Errorf("couldn't put key (%v)", err) + } + + // wait for backend Commit + time.Sleep(100 * time.Millisecond) + // restore lessor from db file + clus.Members[2].Stop(t) + if err := clus.Members[2].Restart(t); err != nil { + t.Fatal(err) + } + + // revoke lease should remove key + WaitClientV3(t, clus.Client(2)) + _, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: tc.leaseID}) + if err != nil { + t.Errorf("could not revoke lease %d (%v)", tc.leaseID, err) + } + var revision int64 + for i := range clus.Members { + getr := &pb.RangeRequest{Key: tc.k} + getresp, err := toGRPC(clus.Client(i)).KV.Range(ctx, getr) + if err != nil { + t.Fatal(err) + } + if revision == 0 { + revision = getresp.Header.Revision + } + if revision != getresp.Header.Revision { + t.Errorf("expect revision %d, but got %d", revision, getresp.Header.Revision) + } + if len(getresp.Kvs) != 0 { + t.Errorf("lease removed but key remains") + } + } + }) + } +} + // TestV3LeaseExpire ensures a key is deleted once a key expires. func TestV3LeaseExpire(t *testing.T) { defer testutil.AfterTest(t) diff --git a/lease/lessor.go b/lease/lessor.go index 9612ed88f..ffbd15e14 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -19,6 +19,7 @@ import ( "context" "encoding/binary" "errors" + "fmt" "math" "sort" "sync" @@ -771,15 +772,10 @@ func (le *lessor) initAndRecover() { tx.Lock() tx.UnsafeCreateBucket(leaseBucketName) - _, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0) - // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue. - for i := range vs { - var lpb leasepb.Lease - err := lpb.Unmarshal(vs[i]) - if err != nil { - tx.Unlock() - panic("failed to unmarshal lease proto item") - } + lpbs := unsafeGetAllLeases(tx) + tx.Unlock() + + for _, lpb := range lpbs { ID := LeaseID(lpb.ID) if lpb.TTL < le.minLeaseTTL { lpb.TTL = le.minLeaseTTL @@ -796,7 +792,6 @@ func (le *lessor) initAndRecover() { } le.leaseExpiredNotifier.Init() heap.Init(&le.leaseCheckpointHeap) - tx.Unlock() le.b.ForceCommit() } @@ -894,6 +889,30 @@ func int64ToBytes(n int64) []byte { return bytes } +func bytesToLeaseID(bytes []byte) int64 { + if len(bytes) != 8 { + panic(fmt.Errorf("lease ID must be 8-byte")) + } + return int64(binary.BigEndian.Uint64(bytes)) +} + +func unsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease { + ls := make([]*leasepb.Lease, 0) + err := tx.UnsafeForEach(leaseBucketName, func(k, v []byte) error { + var lpb leasepb.Lease + err := lpb.Unmarshal(v) + if err != nil { + return fmt.Errorf("failed to Unmarshal lease proto item; lease ID=%016x", bytesToLeaseID(k)) + } + ls = append(ls, &lpb) + return nil + }) + if err != nil { + panic(err) + } + return ls +} + // FakeLessor is a fake implementation of Lessor interface. // Used for testing only. type FakeLessor struct{} diff --git a/lease/lessor_test.go b/lease/lessor_test.go index c79c32e7c..bf8b58460 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io/ioutil" + "math" "os" "path/filepath" "reflect" @@ -26,7 +27,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" pb "go.etcd.io/etcd/etcdserver/etcdserverpb" + "go.etcd.io/etcd/lease/leasepb" "go.etcd.io/etcd/mvcc/backend" "go.uber.org/zap" ) @@ -578,6 +581,93 @@ func TestLessorCheckpointsRestoredOnPromote(t *testing.T) { } } +func TestLeaseBackend(t *testing.T) { + tcs := []struct { + name string + setup func(tx backend.BatchTx) + want []*leasepb.Lease + }{ + { + name: "Empty by default", + setup: func(tx backend.BatchTx) {}, + want: []*leasepb.Lease{}, + }, + { + name: "Returns data put before", + setup: func(tx backend.BatchTx) { + mustUnsafePutLease(tx, &leasepb.Lease{ + ID: -1, + TTL: 2, + }) + }, + want: []*leasepb.Lease{ + { + ID: -1, + TTL: 2, + }, + }, + }, + { + name: "Skips deleted", + setup: func(tx backend.BatchTx) { + mustUnsafePutLease(tx, &leasepb.Lease{ + ID: -1, + TTL: 2, + }) + mustUnsafePutLease(tx, &leasepb.Lease{ + ID: math.MinInt64, + TTL: 2, + }) + mustUnsafePutLease(tx, &leasepb.Lease{ + ID: math.MaxInt64, + TTL: 3, + }) + tx.UnsafeDelete(leaseBucketName, int64ToBytes(-1)) + }, + want: []*leasepb.Lease{ + { + ID: math.MaxInt64, + TTL: 3, + }, + { + ID: math.MinInt64, // bytes bigger than MaxInt64 + TTL: 2, + }, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + be, tmpPath := backend.NewTmpBackend(time.Microsecond, 10) + tx := be.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket(leaseBucketName) + tc.setup(tx) + tx.Unlock() + + be.ForceCommit() + be.Close() + + be2 := backend.NewDefaultBackend(tmpPath) + defer be2.Close() + leases := unsafeGetAllLeases(be2.ReadTx()) + + assert.Equal(t, tc.want, leases) + }) + } +} + +func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) { + key := int64ToBytes(lpb.ID) + + val, err := lpb.Marshal() + if err != nil { + panic("failed to marshal lease proto item") + } + tx.UnsafePut(leaseBucketName, key, val) +} + type fakeDeleter struct { deleted []string tx backend.BatchTx