diff --git a/clientv3/integration/leasing_test.go b/clientv3/integration/leasing_test.go index ed1a2df7b..3b641899d 100644 --- a/clientv3/integration/leasing_test.go +++ b/clientv3/integration/leasing_test.go @@ -619,16 +619,29 @@ func TestLeasingTxnOwnerGet(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) - lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") - testutil.AssertNil(t, err) - defer closeLKV() + client := clus.Client(0) + lkv, closeLKV, err := leasing.NewKV(client, "pfx/") + testutil.AssertNil(t, err) + + defer func() { + // In '--tags cluster_proxy' mode the client need to be closed before + // closeLKV(). This interrupts all outstanding watches. Closing by closeLKV() + // is not sufficient as (unfortunately) context close does not interrupts Watches. + // See ./clientv3/watch.go: + // >> Currently, client contexts are overwritten with "valCtx" that never closes. << + clus.TakeClient(0) // avoid double Close() of the client. + client.Close() + closeLKV() + }() + + // TODO: Randomization in tests is a bad practice (except explicitly exploratory). keyCount := rand.Intn(10) + 1 var ops []clientv3.Op presps := make([]*clientv3.PutResponse, keyCount) for i := range presps { k := fmt.Sprintf("k-%d", i) - presp, err := clus.Client(0).Put(context.TODO(), k, k+k) + presp, err := client.Put(context.TODO(), k, k+k) if err != nil { t.Fatal(err) } @@ -639,6 +652,8 @@ func TestLeasingTxnOwnerGet(t *testing.T) { } ops = append(ops, clientv3.OpGet(k)) } + + // TODO: Randomization in unit tests is a bad practice (except explicitly exploratory). ops = ops[:rand.Intn(len(ops))] // served through cache @@ -648,7 +663,6 @@ func TestLeasingTxnOwnerGet(t *testing.T) { cmps, useThen := randCmps("k-", presps) if useThen { - thenOps = ops elseOps = []clientv3.Op{clientv3.OpPut("k", "1")} } else { diff --git a/integration/cluster_direct.go b/integration/cluster_direct.go index 600cabe30..a3764d9ce 100644 --- a/integration/cluster_direct.go +++ b/integration/cluster_direct.go @@ -23,6 +23,8 @@ import ( pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" ) +const throughProxy = false + func toGRPC(c *clientv3.Client) grpcAPI { return grpcAPI{ pb.NewClusterClient(c.ActiveConnection()), diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 8ad7f6910..0b024c719 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -27,6 +27,8 @@ import ( "go.uber.org/zap" ) +const throughProxy = true + var ( pmu sync.Mutex proxies map[*clientv3.Client]grpcClientProxy = make(map[*clientv3.Client]grpcClientProxy) @@ -45,6 +47,8 @@ func toGRPC(c *clientv3.Client) grpcAPI { pmu.Lock() defer pmu.Unlock() + lg := zap.NewExample() + if v, ok := proxies[c]; ok { return v.grpc } @@ -55,10 +59,10 @@ func toGRPC(c *clientv3.Client) grpcAPI { c.Lease = namespace.NewLease(c.Lease, proxyNamespace) // test coalescing/caching proxy kvp, kvpch := grpcproxy.NewKvProxy(c) - wp, wpch := grpcproxy.NewWatchProxy(c) + wp, wpch := grpcproxy.NewWatchProxy(lg, c) lp, lpch := grpcproxy.NewLeaseProxy(c) mp := grpcproxy.NewMaintenanceProxy(c) - clp, _ := grpcproxy.NewClusterProxy(zap.NewExample(), c, "", "") // without registering proxy URLs + clp, _ := grpcproxy.NewClusterProxy(lg, c, "", "") // without registering proxy URLs authp := grpcproxy.NewAuthProxy(c) lockp := grpcproxy.NewLockProxy(c) electp := grpcproxy.NewElectionProxy(c) diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 4ae371973..1717c0537 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -1241,8 +1241,16 @@ func TestV3WatchCancellation(t *testing.T) { t.Fatal(err) } - if minWatches != "1" { - t.Fatalf("expected one watch, got %s", minWatches) + var expected string + if throughProxy { + // grpc proxy has additional 2 watches open + expected = "3" + } else { + expected = "1" + } + + if minWatches != expected { + t.Fatalf("expected %s watch, got %s", expected, minWatches) } } @@ -1270,7 +1278,15 @@ func TestV3WatchCloseCancelRace(t *testing.T) { t.Fatal(err) } - if minWatches != "0" { - t.Fatalf("expected zero watches, got %s", minWatches) + var expected string + if throughProxy { + // grpc proxy has additional 2 watches open + expected = "2" + } else { + expected = "0" + } + + if minWatches != expected { + t.Fatalf("expected %s watch, got %s", expected, minWatches) } }