diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go index 28b257cc7..bdc367035 100644 --- a/clientv3/integration/leasing_test.go +++ b/clientv3/integration/leasing_test.go @@ -35,9 +35,17 @@ func TestLeasingPutGet(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - lKV1, err := leasing.NewKV(clus.Client(0), "foo/") - lKV2, err := leasing.NewKV(clus.Client(1), "foo/") - lKV3, err := leasing.NewKV(clus.Client(2), "foo/") + lKV1, closeLKV1, err := leasing.NewKV(clus.Client(0), "foo/") + testutil.AssertNil(t, err) + defer closeLKV1() + + lKV2, closeLKV2, err := leasing.NewKV(clus.Client(1), "foo/") + testutil.AssertNil(t, err) + defer closeLKV2() + + lKV3, closeLKV3, err := leasing.NewKV(clus.Client(2), "foo/") + testutil.AssertNil(t, err) + defer closeLKV3() resp, err := lKV1.Get(context.TODO(), "abc") if err != nil { @@ -85,10 +93,9 @@ func TestLeasingInterval(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() keys := []string{"abc/a", "abc/b", "abc/a/a"} for _, k := range keys { @@ -125,10 +132,9 @@ func TestLeasingPutInvalidateNew(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() if _, err = lkv.Get(context.TODO(), "k"); err != nil { t.Fatal(err) @@ -160,10 +166,9 @@ func TestLeasingPutInvalidatExisting(t *testing.T) { t.Fatal(err) } - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() if _, err = lkv.Get(context.TODO(), "k"); err != nil { t.Fatal(err) @@ -192,10 +197,9 @@ func TestLeasingGetSerializable(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() if _, err = clus.Client(0).Put(context.TODO(), "cached", "abc"); err != nil { t.Fatal(err) @@ -233,10 +237,10 @@ func TestLeasingPrevKey(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { t.Fatal(err) } @@ -260,10 +264,9 @@ func TestLeasingRevGet(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() putResp, err := clus.Client(0).Put(context.TODO(), "k", "abc") if err != nil { @@ -297,10 +300,10 @@ func TestLeasingGetWithOpts(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { t.Fatal(err) } @@ -342,10 +345,10 @@ func TestLeasingConcurrentPut(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() + // force key into leasing key cache if _, err = lkv.Get(context.TODO(), "k"); err != nil { t.Fatal(err) @@ -389,10 +392,10 @@ func TestLeasingDisconnectedGet(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() + if _, err = clus.Client(0).Put(context.TODO(), "cached", "abc"); err != nil { t.Fatal(err) } @@ -418,10 +421,10 @@ func TestLeasingDeleteOwner(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { t.Fatal(err) } @@ -452,14 +455,13 @@ func TestLeasingDeleteNonOwner(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv1, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } - lkv2, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv1, closeLKV1, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV1() + + lkv2, closeLKV2, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV2() if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { t.Fatal(err) @@ -488,10 +490,10 @@ func TestLeasingOverwriteResponse(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { t.Fatal(err) } @@ -522,10 +524,10 @@ func TestLeasingOwnerPutResponse(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { t.Fatal(err) } @@ -560,10 +562,9 @@ func TestLeasingTxnOwnerGetRange(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() keyCount := rand.Intn(10) + 1 for i := 0; i < keyCount; i++ { @@ -590,10 +591,9 @@ func TestLeasingTxnOwnerGet(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() keyCount := rand.Intn(10) + 1 var ops []clientv3.Op @@ -663,10 +663,10 @@ func TestLeasingTxnOwnerIf(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() + if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { t.Fatal(err) } @@ -757,14 +757,13 @@ func TestLeasingTxnCancel(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - lkv1, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } - lkv2, err := leasing.NewKV(clus.Client(1), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv1, closeLKV1, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV1() + + lkv2, closeLKV2, err := leasing.NewKV(clus.Client(1), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV2() // acquire lease but disconnect so no revoke in time if _, err = lkv1.Get(context.TODO(), "k"); err != nil { @@ -792,11 +791,13 @@ func TestLeasingTxnNonOwnerPut(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } - lkv2, err := leasing.NewKV(clus.Client(0), "pfx/") + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() + + lkv2, closeLKV2, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV2() if _, err = clus.Client(0).Put(context.TODO(), "k", "abc"); err != nil { t.Fatal(err) @@ -867,14 +868,13 @@ func TestLeasingTxnRandIfThenOrElse(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv1, err1 := leasing.NewKV(clus.Client(0), "pfx/") - if err1 != nil { - t.Fatal(err1) - } - lkv2, err2 := leasing.NewKV(clus.Client(0), "pfx/") - if err2 != nil { - t.Fatal(err2) - } + lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err1) + defer closeLKV1() + + lkv2, closeLKV2, err2 := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err2) + defer closeLKV2() keyCount := 16 dat := make([]*clientv3.PutResponse, keyCount) @@ -974,10 +974,10 @@ func TestLeasingOwnerPutError(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() + if _, err = lkv.Get(context.TODO(), "k"); err != nil { t.Fatal(err) } @@ -995,10 +995,10 @@ func TestLeasingOwnerDeleteError(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() + if _, err = lkv.Get(context.TODO(), "k"); err != nil { t.Fatal(err) } @@ -1016,10 +1016,9 @@ func TestLeasingNonOwnerPutError(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() clus.Members[0].Stop(t) ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond) @@ -1042,10 +1041,9 @@ func testLeasingOwnerDelete(t *testing.T, del clientv3.Op) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "0/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "0/") + testutil.AssertNil(t, err) + defer closeLKV() for i := 0; i < 8; i++ { if _, err = clus.Client(0).Put(context.TODO(), fmt.Sprintf("key/%d", i), "123"); err != nil { @@ -1092,15 +1090,13 @@ func TestLeasingDeleteRangeBounds(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - delkv, err := leasing.NewKV(clus.Client(0), "0/") - if err != nil { - t.Fatal(err) - } + delkv, closeDelKV, err := leasing.NewKV(clus.Client(0), "0/") + testutil.AssertNil(t, err) + defer closeDelKV() - getkv, err := leasing.NewKV(clus.Client(0), "0/") - if err != nil { - t.Fatal(err) - } + getkv, closeGetKv, err := leasing.NewKV(clus.Client(0), "0/") + testutil.AssertNil(t, err) + defer closeGetKv() for _, k := range []string{"j", "m"} { if _, err = clus.Client(0).Put(context.TODO(), k, "123"); err != nil { @@ -1152,14 +1148,13 @@ func testLeasingDeleteRangeContend(t *testing.T, op clientv3.Op) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - delkv, err := leasing.NewKV(clus.Client(0), "0/") - if err != nil { - t.Fatal(err) - } - putkv, err := leasing.NewKV(clus.Client(0), "0/") - if err != nil { - t.Fatal(err) - } + delkv, closeDelKV, err := leasing.NewKV(clus.Client(0), "0/") + testutil.AssertNil(t, err) + defer closeDelKV() + + putkv, closePutKV, err := leasing.NewKV(clus.Client(0), "0/") + testutil.AssertNil(t, err) + defer closePutKV() for i := 0; i < 8; i++ { key := fmt.Sprintf("key/%d", i) @@ -1213,10 +1208,9 @@ func TestLeasingPutGetDeleteConcurrent(t *testing.T) { lkvs := make([]clientv3.KV, 16) for i := range lkvs { - lkv, err := leasing.NewKV(clus.Client(0), "pfx/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") + testutil.AssertNil(t, err) + defer closeLKV() lkvs[i] = lkv } @@ -1271,14 +1265,13 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - lkv1, err1 := leasing.NewKV(clus.Client(0), "foo/") - if err1 != nil { - t.Fatal(err1) - } - lkv2, err2 := leasing.NewKV(clus.Client(1), "foo/") - if err2 != nil { - t.Fatal(err2) - } + lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/") + testutil.AssertNil(t, err1) + defer closeLKV1() + + lkv2, closeLKV2, err2 := leasing.NewKV(clus.Client(1), "foo/") + testutil.AssertNil(t, err2) + defer closeLKV2() if _, err := lkv1.Get(context.TODO(), "k"); err != nil { t.Fatal(err) @@ -1332,14 +1325,13 @@ func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - lkv1, err1 := leasing.NewKV(clus.Client(0), "foo/") - if err1 != nil { - t.Fatal(err1) - } - lkv2, err2 := leasing.NewKV(clus.Client(1), "foo/") - if err2 != nil { - t.Fatal(err2) - } + lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/") + testutil.AssertNil(t, err1) + defer closeLKV1() + + lkv2, closeLKV2, err2 := leasing.NewKV(clus.Client(1), "foo/") + testutil.AssertNil(t, err2) + defer closeLKV2() if _, err := lkv1.Get(context.TODO(), "k"); err != nil { t.Fatal(err) @@ -1386,10 +1378,9 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "foo/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") + defer closeLKV() + testutil.AssertNil(t, err) if _, err = lkv.Put(context.TODO(), "k", "x"); err != nil { t.Fatal(err) @@ -1461,10 +1452,9 @@ func TestLeasingTxnAtomicCache(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "foo/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") + testutil.AssertNil(t, err) + defer closeLKV() puts, gets := make([]clientv3.Op, 16), make([]clientv3.Op, 16) for i := range puts { @@ -1539,10 +1529,10 @@ func TestLeasingReconnectTxn(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "foo/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") + testutil.AssertNil(t, err) + defer closeLKV() + if _, err = lkv.Get(context.TODO(), "k"); err != nil { t.Fatal(err) } @@ -1574,10 +1564,9 @@ func TestLeasingReconnectNonOwnerGet(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "foo/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") + testutil.AssertNil(t, err) + defer closeLKV() // populate a few keys so some leasing gets have keys for i := 0; i < 4; i++ { @@ -1626,10 +1615,9 @@ func TestLeasingTxnRangeCmp(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "foo/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") + testutil.AssertNil(t, err) + defer closeLKV() if _, err = clus.Client(0).Put(context.TODO(), "k", "a"); err != nil { t.Fatal(err) @@ -1662,10 +1650,9 @@ func TestLeasingDo(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "foo/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") + testutil.AssertNil(t, err) + defer closeLKV() ops := []clientv3.Op{ clientv3.OpTxn(nil, nil, nil), @@ -1705,10 +1692,9 @@ func TestLeasingTxnOwnerPutBranch(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "foo/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") + testutil.AssertNil(t, err) + defer closeLKV() n := 0 treeOp := makePutTreeOp("tree", &n, 4) @@ -1800,14 +1786,13 @@ func TestLeasingSessionExpire(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) defer clus.Terminate(t) - lkv, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) - if err != nil { - t.Fatal(err) - } - lkv2, err := leasing.NewKV(clus.Client(0), "foo/") - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) + testutil.AssertNil(t, err) + defer closeLKV() + + lkv2, closeLKV2, err := leasing.NewKV(clus.Client(0), "foo/") + testutil.AssertNil(t, err) + defer closeLKV2() // acquire lease on abc if _, err = lkv.Get(context.TODO(), "abc"); err != nil { @@ -1876,10 +1861,10 @@ func TestLeasingSessionExpireCancel(t *testing.T) { }, } for i := range tests { - lkv, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) - if err != nil { - t.Fatal(err) - } + lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) + testutil.AssertNil(t, err) + defer closeLKV() + if _, err = lkv.Get(context.TODO(), "abc"); err != nil { t.Fatal(err) } diff --git a/clientv3/leasing/kv.go b/clientv3/leasing/kv.go index d899f2ce7..1a5a1d0b0 100644 --- a/clientv3/leasing/kv.go +++ b/clientv3/leasing/kv.go @@ -16,6 +16,7 @@ package leasing import ( "strings" + "sync" "time" v3 "github.com/coreos/etcd/clientv3" @@ -33,8 +34,10 @@ type leasingKV struct { kv v3.KV pfx string leases leaseCache + ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup sessionOpts []concurrency.SessionOption session *concurrency.Session @@ -49,9 +52,9 @@ func init() { } // NewKV wraps a KV instance so that all requests are wired through a leasing protocol. -func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV, error) { +func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV, func(), error) { cctx, cancel := context.WithCancel(cl.Ctx()) - lkv := leasingKV{ + lkv := &leasingKV{ cl: cl, kv: cl.KV, pfx: pfx, @@ -61,9 +64,21 @@ func NewKV(cl *v3.Client, pfx string, opts ...concurrency.SessionOption) (v3.KV, sessionOpts: opts, sessionc: make(chan struct{}), } - go lkv.monitorSession() - go lkv.leases.clearOldRevokes(cctx) - return &lkv, lkv.waitSession(cctx) + lkv.wg.Add(2) + go func() { + defer lkv.wg.Done() + lkv.monitorSession() + }() + go func() { + defer lkv.wg.Done() + lkv.leases.clearOldRevokes(cctx) + }() + return lkv, lkv.Close, lkv.waitSession(cctx) +} + +func (lkv *leasingKV) Close() { + lkv.cancel() + lkv.wg.Wait() } func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetResponse, error) { @@ -301,7 +316,11 @@ func (lkv *leasingKV) get(ctx context.Context, op v3.Op) (*v3.GetResponse, error getResp.Header = resp.Header if resp.Succeeded { getResp = lkv.leases.Add(key, getResp, op) - go lkv.monitorLease(ctx, key, resp.Header.Revision) + lkv.wg.Add(1) + go func() { + defer lkv.wg.Done() + lkv.monitorLease(ctx, key, resp.Header.Revision) + }() } return getResp, nil } diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 2e4adc45e..183b20c19 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -285,7 +285,7 @@ func newGRPCProxyServer(client *clientv3.Client) *grpc.Server { } if len(grpcProxyLeasing) > 0 { - client.KV, _ = leasing.NewKV(client, grpcProxyLeasing) + client.KV, _, _ = leasing.NewKV(client, grpcProxyLeasing) } kvp, _ := grpcproxy.NewKvProxy(client)