diff --git a/scripts/build.sh b/scripts/build.sh index bf41ce958..e587dc189 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -18,7 +18,7 @@ GO_BUILD_ENV=("CGO_ENABLED=0" "GO_BUILD_FLAGS=${GO_BUILD_FLAGS}" "GOOS=${GOOS}" toggle_failpoints() { mode="$1" if command -v gofail >/dev/null 2>&1; then - run gofail "$mode" server/etcdserver/ server/mvcc/backend/ + run gofail "$mode" server/etcdserver/ server/storage/backend/ elif [[ "$mode" != "disable" ]]; then log_error "FAILPOINTS set but gofail not found" exit 1 diff --git a/server/storage/schema/lease.go b/server/storage/schema/lease.go index 43a0c06e4..1d06dfe9d 100644 --- a/server/storage/schema/lease.go +++ b/server/storage/schema/lease.go @@ -16,7 +16,7 @@ package schema import ( "encoding/binary" - "math" + "fmt" "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/storage/backend" @@ -27,15 +27,18 @@ func UnsafeCreateLeaseBucket(tx backend.BatchTx) { } func MustUnsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease { - _, vs := tx.UnsafeRange(Lease, leaseIdToBytes(0), leaseIdToBytes(math.MaxInt64), 0) - ls := make([]*leasepb.Lease, 0, len(vs)) - for i := range vs { + ls := make([]*leasepb.Lease, 0) + err := tx.UnsafeForEach(Lease, func(k, v []byte) error { var lpb leasepb.Lease - err := lpb.Unmarshal(vs[i]) + err := lpb.Unmarshal(v) if err != nil { - panic("failed to unmarshal lease proto item") + return fmt.Errorf("failed to Unmarshal lease proto item; lease ID=%016x", bytesToLeaseID(k)) } ls = append(ls, &lpb) + return nil + }) + if err != nil { + panic(err) } return ls } @@ -72,3 +75,10 @@ func leaseIdToBytes(n int64) []byte { binary.BigEndian.PutUint64(bytes, uint64(n)) return bytes } + +func bytesToLeaseID(bytes []byte) int64 { + if len(bytes) != 8 { + panic(fmt.Errorf("lease ID must be 8-byte")) + } + return int64(binary.BigEndian.Uint64(bytes)) +} diff --git a/server/storage/schema/lease_test.go b/server/storage/schema/lease_test.go new file mode 100644 index 000000000..8ac4f163d --- /dev/null +++ b/server/storage/schema/lease_test.go @@ -0,0 +1,106 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schema + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/server/v3/lease/leasepb" + "go.etcd.io/etcd/server/v3/storage/backend" + betesting "go.etcd.io/etcd/server/v3/storage/backend/testing" +) + +func TestLeaseBackend(t *testing.T) { + tcs := []struct { + name string + setup func(tx backend.BatchTx) + want []*leasepb.Lease + }{ + { + name: "Empty by default", + setup: func(tx backend.BatchTx) {}, + want: []*leasepb.Lease{}, + }, + { + name: "Returns data put before", + setup: func(tx backend.BatchTx) { + MustUnsafePutLease(tx, &leasepb.Lease{ + ID: -1, + TTL: 2, + }) + }, + want: []*leasepb.Lease{ + { + ID: -1, + TTL: 2, + }, + }, + }, + { + name: "Skips deleted", + setup: func(tx backend.BatchTx) { + MustUnsafePutLease(tx, &leasepb.Lease{ + ID: -1, + TTL: 2, + }) + MustUnsafePutLease(tx, &leasepb.Lease{ + ID: math.MinInt64, + TTL: 2, + }) + MustUnsafePutLease(tx, &leasepb.Lease{ + ID: math.MaxInt64, + TTL: 3, + }) + UnsafeDeleteLease(tx, &leasepb.Lease{ + ID: -1, + TTL: 2, + }) + }, + want: []*leasepb.Lease{ + { + ID: math.MaxInt64, + TTL: 3, + }, + { + ID: math.MinInt64, // bytes bigger than MaxInt64 + TTL: 2, + }, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) + tx := be.BatchTx() + tx.Lock() + UnsafeCreateLeaseBucket(tx) + tc.setup(tx) + tx.Unlock() + + be.ForceCommit() + be.Close() + + be2 := backend.NewDefaultBackend(tmpPath) + defer be2.Close() + leases := MustUnsafeGetAllLeases(be2.ReadTx()) + + assert.Equal(t, tc.want, leases) + }) + } +} diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index 4600e092d..49723b50c 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -170,6 +170,7 @@ type ClusterConfig struct { WatchProgressNotifyInterval time.Duration ExperimentalMaxLearners int StrictReconfigCheck bool + CorruptCheckTime time.Duration } type Cluster struct { @@ -282,6 +283,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member { WatchProgressNotifyInterval: c.Cfg.WatchProgressNotifyInterval, ExperimentalMaxLearners: c.Cfg.ExperimentalMaxLearners, StrictReconfigCheck: c.Cfg.StrictReconfigCheck, + CorruptCheckTime: c.Cfg.CorruptCheckTime, }) m.DiscoveryURL = c.Cfg.DiscoveryURL return m @@ -571,6 +573,7 @@ type MemberConfig struct { WatchProgressNotifyInterval time.Duration ExperimentalMaxLearners int StrictReconfigCheck bool + CorruptCheckTime time.Duration } // MustNewMember return an inited member with the given name. If peerTLS is @@ -673,6 +676,9 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval m.InitialCorruptCheck = true + if mcfg.CorruptCheckTime > time.Duration(0) { + m.CorruptCheckTime = mcfg.CorruptCheckTime + } m.WarningApplyDuration = embed.DefaultWarningApplyDuration m.WarningUnaryRequestDuration = embed.DefaultWarningUnaryRequestDuration m.ExperimentalMaxLearners = membership.DefaultMaxLearners diff --git a/tests/integration/v3_alarm_test.go b/tests/integration/v3_alarm_test.go index 89dc26abd..5b886d126 100644 --- a/tests/integration/v3_alarm_test.go +++ b/tests/integration/v3_alarm_test.go @@ -25,8 +25,10 @@ import ( pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" "go.etcd.io/etcd/pkg/v3/traceutil" + "go.etcd.io/etcd/server/v3/lease/leasepb" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/mvcc" + "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/tests/v3/framework/integration" "go.uber.org/zap/zaptest" ) @@ -253,3 +255,99 @@ func TestV3CorruptAlarm(t *testing.T) { } t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second) } + +func TestV3CorruptAlarmWithLeaseCorrupted(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewCluster(t, &integration.ClusterConfig{ + CorruptCheckTime: time.Second, + Size: 3, + SnapshotCount: 10, + SnapshotCatchUpEntries: 5, + }) + defer clus.Terminate(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + lresp, err := integration.ToGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: 1, TTL: 60}) + if err != nil { + t.Errorf("could not create lease 1 (%v)", err) + } + if lresp.ID != 1 { + t.Errorf("got id %v, wanted id %v", lresp.ID, 1) + } + + putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID} + // Trigger snapshot from the leader to new member + for i := 0; i < 15; i++ { + _, err := integration.ToGRPC(clus.RandClient()).KV.Put(ctx, putr) + if err != nil { + t.Errorf("#%d: couldn't put key (%v)", i, err) + } + } + + if err := clus.RemoveMember(t, clus.Client(1), uint64(clus.Members[2].ID())); err != nil { + t.Fatal(err) + } + clus.WaitMembersForLeader(t, clus.Members) + + clus.AddMember(t) + clus.WaitMembersForLeader(t, clus.Members) + // Wait for new member to catch up + integration.WaitClientV3(t, clus.Members[2].Client) + + // Corrupt member 2 by modifying backend lease bucket offline. + clus.Members[2].Stop(t) + fp := filepath.Join(clus.Members[2].DataDir, "member", "snap", "db") + bcfg := backend.DefaultBackendConfig() + bcfg.Path = fp + bcfg.Logger = zaptest.NewLogger(t) + be := backend.New(bcfg) + + olpb := leasepb.Lease{ID: int64(1), TTL: 60} + tx := be.BatchTx() + schema.UnsafeDeleteLease(tx, &olpb) + lpb := leasepb.Lease{ID: int64(2), TTL: 60} + schema.MustUnsafePutLease(tx, &lpb) + tx.Commit() + + if err := be.Close(); err != nil { + t.Fatal(err) + } + + if err := clus.Members[2].Restart(t); err != nil { + t.Fatal(err) + } + + clus.Members[1].WaitOK(t) + clus.Members[2].WaitOK(t) + + // Revoke lease should remove key except the member with corruption + _, err = integration.ToGRPC(clus.Members[0].Client).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp.ID}) + if err != nil { + t.Fatal(err) + } + resp0, err0 := clus.Members[1].Client.KV.Get(context.TODO(), "foo") + if err0 != nil { + t.Fatal(err0) + } + resp1, err1 := clus.Members[2].Client.KV.Get(context.TODO(), "foo") + if err1 != nil { + t.Fatal(err1) + } + + if resp0.Header.Revision == resp1.Header.Revision { + t.Fatalf("matching Revision values") + } + + // Wait for CorruptCheckTime + time.Sleep(time.Second) + presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa") + if perr != nil { + if !eqErrGRPC(perr, rpctypes.ErrCorrupt) { + t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr) + } else { + return + } + } +} diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 412dd7899..520c983a3 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -17,6 +17,7 @@ package integration import ( "context" "fmt" + "math" "testing" "time" @@ -142,6 +143,91 @@ func TestV3LeaseGrantByID(t *testing.T) { } } +// TestV3LeaseNegativeID ensures restarted member lessor can recover negative leaseID from backend. +// +// When the negative leaseID is used for lease revoke, all etcd nodes will remove the lease +// and delete associated keys to ensure kv store data consistency +// +// It ensures issue 12535 is fixed by PR 13676 +func TestV3LeaseNegativeID(t *testing.T) { + tcs := []struct { + leaseID int64 + k []byte + v []byte + }{ + { + leaseID: -1, // int64 -1 is 2^64 -1 in uint64 + k: []byte("foo"), + v: []byte("bar"), + }, + { + leaseID: math.MaxInt64, + k: []byte("bar"), + v: []byte("foo"), + }, + { + leaseID: math.MinInt64, + k: []byte("hello"), + v: []byte("world"), + }, + } + for _, tc := range tcs { + t.Run(fmt.Sprintf("test with lease ID %16x", tc.leaseID), func(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cc := clus.RandClient() + lresp, err := integration.ToGRPC(cc).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: tc.leaseID, TTL: 300}) + if err != nil { + t.Errorf("could not create lease %d (%v)", tc.leaseID, err) + } + if lresp.ID != tc.leaseID { + t.Errorf("got id %v, wanted id %v", lresp.ID, tc.leaseID) + } + putr := &pb.PutRequest{Key: tc.k, Value: tc.v, Lease: tc.leaseID} + _, err = integration.ToGRPC(cc).KV.Put(ctx, putr) + if err != nil { + t.Errorf("couldn't put key (%v)", err) + } + + // wait for backend Commit + time.Sleep(100 * time.Millisecond) + // restore lessor from db file + clus.Members[2].Stop(t) + if err := clus.Members[2].Restart(t); err != nil { + t.Fatal(err) + } + + // revoke lease should remove key + integration.WaitClientV3(t, clus.Members[2].Client) + _, err = integration.ToGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: tc.leaseID}) + if err != nil { + t.Errorf("could not revoke lease %d (%v)", tc.leaseID, err) + } + var revision int64 + for _, m := range clus.Members { + getr := &pb.RangeRequest{Key: tc.k} + getresp, err := integration.ToGRPC(m.Client).KV.Range(ctx, getr) + if err != nil { + t.Fatal(err) + } + if revision == 0 { + revision = getresp.Header.Revision + } + if revision != getresp.Header.Revision { + t.Errorf("expect revision %d, but got %d", revision, getresp.Header.Revision) + } + if len(getresp.Kvs) != 0 { + t.Errorf("lease removed but key remains") + } + } + }) + } +} + // TestV3LeaseExpire ensures a key is deleted once a key expires. func TestV3LeaseExpire(t *testing.T) { integration.BeforeTest(t)