diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index fc0338641..ffde1980f 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -366,6 +366,234 @@ func TestV3LeaseFailover(t *testing.T) { } } +const fiveMinTTL int64 = 300 + +// TestV3LeaseRecoverAndRevoke ensures that revoking a lease after restart deletes the attached key. +func TestV3LeaseRecoverAndRevoke(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + kvc := toGRPC(clus.Client(0)).KV + lsc := toGRPC(clus.Client(0)).Lease + + lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL}) + if err != nil { + t.Fatal(err) + } + if lresp.Error != "" { + t.Fatal(lresp.Error) + } + _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID}) + if err != nil { + t.Fatal(err) + } + + // restart server and ensure lease still exists + clus.Members[0].Stop(t) + clus.Members[0].Restart(t) + clus.waitLeader(t, clus.Members) + + // overwrite old client with newly dialed connection + // otherwise, error with "grpc: RPC failed fast due to transport failure" + nc, err := NewClientV3(clus.Members[0]) + if err != nil { + t.Fatal(err) + } + kvc = toGRPC(nc).KV + lsc = toGRPC(nc).Lease + defer nc.Close() + + // revoke should delete the key + _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID}) + if err != nil { + t.Fatal(err) + } + rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}) + if err != nil { + t.Fatal(err) + } + if len(rresp.Kvs) != 0 { + t.Fatalf("lease removed but key remains") + } +} + +// TestV3LeaseRevokeAndRecover ensures that revoked key stays deleted after restart. +func TestV3LeaseRevokeAndRecover(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + kvc := toGRPC(clus.Client(0)).KV + lsc := toGRPC(clus.Client(0)).Lease + + lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL}) + if err != nil { + t.Fatal(err) + } + if lresp.Error != "" { + t.Fatal(lresp.Error) + } + _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID}) + if err != nil { + t.Fatal(err) + } + + // revoke should delete the key + _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID}) + if err != nil { + t.Fatal(err) + } + + // restart server and ensure revoked key doesn't exist + clus.Members[0].Stop(t) + clus.Members[0].Restart(t) + clus.waitLeader(t, clus.Members) + + // overwrite old client with newly dialed connection + // otherwise, error with "grpc: RPC failed fast due to transport failure" + nc, err := NewClientV3(clus.Members[0]) + if err != nil { + t.Fatal(err) + } + kvc = toGRPC(nc).KV + defer nc.Close() + + rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}) + if err != nil { + t.Fatal(err) + } + if len(rresp.Kvs) != 0 { + t.Fatalf("lease removed but key remains") + } +} + +// TestV3LeaseRecoverKeyWithDetachedLease ensures that revoking a detached lease after restart +// does not delete the key. +func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + kvc := toGRPC(clus.Client(0)).KV + lsc := toGRPC(clus.Client(0)).Lease + + lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL}) + if err != nil { + t.Fatal(err) + } + if lresp.Error != "" { + t.Fatal(lresp.Error) + } + _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID}) + if err != nil { + t.Fatal(err) + } + + // overwrite lease with none + _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) + if err != nil { + t.Fatal(err) + } + + // restart server and ensure lease still exists + clus.Members[0].Stop(t) + clus.Members[0].Restart(t) + clus.waitLeader(t, clus.Members) + + // overwrite old client with newly dialed connection + // otherwise, error with "grpc: RPC failed fast due to transport failure" + nc, err := NewClientV3(clus.Members[0]) + if err != nil { + t.Fatal(err) + } + kvc = toGRPC(nc).KV + lsc = toGRPC(nc).Lease + defer nc.Close() + + // revoke the detached lease + _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: lresp.ID}) + if err != nil { + t.Fatal(err) + } + rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}) + if err != nil { + t.Fatal(err) + } + if len(rresp.Kvs) != 1 { + t.Fatalf("only detached lease removed, key should remain") + } +} + +func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + kvc := toGRPC(clus.Client(0)).KV + lsc := toGRPC(clus.Client(0)).Lease + + var leaseIDs []int64 + for i := 0; i < 2; i++ { + lresp, err := lsc.LeaseGrant(context.TODO(), &pb.LeaseGrantRequest{TTL: fiveMinTTL}) + if err != nil { + t.Fatal(err) + } + if lresp.Error != "" { + t.Fatal(lresp.Error) + } + leaseIDs = append(leaseIDs, lresp.ID) + + _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID}) + if err != nil { + t.Fatal(err) + } + } + + // restart server and ensure lease still exists + clus.Members[0].Stop(t) + clus.Members[0].Restart(t) + clus.waitLeader(t, clus.Members) + for i, leaseID := range leaseIDs { + if !leaseExist(t, clus, leaseID) { + t.Errorf("#%d: unexpected lease not exists", i) + } + } + + // overwrite old client with newly dialed connection + // otherwise, error with "grpc: RPC failed fast due to transport failure" + nc, err := NewClientV3(clus.Members[0]) + if err != nil { + t.Fatal(err) + } + kvc = toGRPC(nc).KV + lsc = toGRPC(nc).Lease + defer nc.Close() + + // revoke the old lease + _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseIDs[0]}) + if err != nil { + t.Fatal(err) + } + // key should still exist + rresp, err := kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}) + if err != nil { + t.Fatal(err) + } + if len(rresp.Kvs) != 1 { + t.Fatalf("only detached lease removed, key should remain") + } + + // revoke the latest lease + _, err = lsc.LeaseRevoke(context.TODO(), &pb.LeaseRevokeRequest{ID: leaseIDs[1]}) + if err != nil { + t.Fatal(err) + } + rresp, err = kvc.Range(context.TODO(), &pb.RangeRequest{Key: []byte("foo")}) + if err != nil { + t.Fatal(err) + } + if len(rresp.Kvs) != 0 { + t.Fatalf("lease removed but key remains") + } +} + // acquireLeaseAndKey creates a new lease and creates an attached key. func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) { // create lease