Merge pull request #12278 from ptabor/20200907-fix-cov-integration

integration: Fix 'go test --tags cluster_proxy --timeout=30m -v ./integration/...'
This commit is contained in:
Jingyi Hu 2020-09-16 01:55:09 -07:00 committed by GitHub
commit 2aaf4f3d7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 11 deletions

View File

@ -619,16 +619,29 @@ func TestLeasingTxnOwnerGet(t *testing.T) {
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t) defer clus.Terminate(t)
lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") client := clus.Client(0)
testutil.AssertNil(t, err)
defer closeLKV()
lkv, closeLKV, err := leasing.NewKV(client, "pfx/")
testutil.AssertNil(t, err)
defer func() {
// In '--tags cluster_proxy' mode the client need to be closed before
// closeLKV(). This interrupts all outstanding watches. Closing by closeLKV()
// is not sufficient as (unfortunately) context close does not interrupts Watches.
// See ./clientv3/watch.go:
// >> Currently, client contexts are overwritten with "valCtx" that never closes. <<
clus.TakeClient(0) // avoid double Close() of the client.
client.Close()
closeLKV()
}()
// TODO: Randomization in tests is a bad practice (except explicitly exploratory).
keyCount := rand.Intn(10) + 1 keyCount := rand.Intn(10) + 1
var ops []clientv3.Op var ops []clientv3.Op
presps := make([]*clientv3.PutResponse, keyCount) presps := make([]*clientv3.PutResponse, keyCount)
for i := range presps { for i := range presps {
k := fmt.Sprintf("k-%d", i) k := fmt.Sprintf("k-%d", i)
presp, err := clus.Client(0).Put(context.TODO(), k, k+k) presp, err := client.Put(context.TODO(), k, k+k)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -639,6 +652,8 @@ func TestLeasingTxnOwnerGet(t *testing.T) {
} }
ops = append(ops, clientv3.OpGet(k)) ops = append(ops, clientv3.OpGet(k))
} }
// TODO: Randomization in unit tests is a bad practice (except explicitly exploratory).
ops = ops[:rand.Intn(len(ops))] ops = ops[:rand.Intn(len(ops))]
// served through cache // served through cache
@ -648,7 +663,6 @@ func TestLeasingTxnOwnerGet(t *testing.T) {
cmps, useThen := randCmps("k-", presps) cmps, useThen := randCmps("k-", presps)
if useThen { if useThen {
thenOps = ops thenOps = ops
elseOps = []clientv3.Op{clientv3.OpPut("k", "1")} elseOps = []clientv3.Op{clientv3.OpPut("k", "1")}
} else { } else {

View File

@ -23,6 +23,8 @@ import (
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
) )
const throughProxy = false
func toGRPC(c *clientv3.Client) grpcAPI { func toGRPC(c *clientv3.Client) grpcAPI {
return grpcAPI{ return grpcAPI{
pb.NewClusterClient(c.ActiveConnection()), pb.NewClusterClient(c.ActiveConnection()),

View File

@ -27,6 +27,8 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const throughProxy = true
var ( var (
pmu sync.Mutex pmu sync.Mutex
proxies map[*clientv3.Client]grpcClientProxy = make(map[*clientv3.Client]grpcClientProxy) proxies map[*clientv3.Client]grpcClientProxy = make(map[*clientv3.Client]grpcClientProxy)
@ -45,6 +47,8 @@ func toGRPC(c *clientv3.Client) grpcAPI {
pmu.Lock() pmu.Lock()
defer pmu.Unlock() defer pmu.Unlock()
lg := zap.NewExample()
if v, ok := proxies[c]; ok { if v, ok := proxies[c]; ok {
return v.grpc return v.grpc
} }
@ -55,10 +59,10 @@ func toGRPC(c *clientv3.Client) grpcAPI {
c.Lease = namespace.NewLease(c.Lease, proxyNamespace) c.Lease = namespace.NewLease(c.Lease, proxyNamespace)
// test coalescing/caching proxy // test coalescing/caching proxy
kvp, kvpch := grpcproxy.NewKvProxy(c) kvp, kvpch := grpcproxy.NewKvProxy(c)
wp, wpch := grpcproxy.NewWatchProxy(c) wp, wpch := grpcproxy.NewWatchProxy(lg, c)
lp, lpch := grpcproxy.NewLeaseProxy(c) lp, lpch := grpcproxy.NewLeaseProxy(c)
mp := grpcproxy.NewMaintenanceProxy(c) mp := grpcproxy.NewMaintenanceProxy(c)
clp, _ := grpcproxy.NewClusterProxy(zap.NewExample(), c, "", "") // without registering proxy URLs clp, _ := grpcproxy.NewClusterProxy(lg, c, "", "") // without registering proxy URLs
authp := grpcproxy.NewAuthProxy(c) authp := grpcproxy.NewAuthProxy(c)
lockp := grpcproxy.NewLockProxy(c) lockp := grpcproxy.NewLockProxy(c)
electp := grpcproxy.NewElectionProxy(c) electp := grpcproxy.NewElectionProxy(c)

View File

@ -1241,8 +1241,16 @@ func TestV3WatchCancellation(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if minWatches != "1" { var expected string
t.Fatalf("expected one watch, got %s", minWatches) if throughProxy {
// grpc proxy has additional 2 watches open
expected = "3"
} else {
expected = "1"
}
if minWatches != expected {
t.Fatalf("expected %s watch, got %s", expected, minWatches)
} }
} }
@ -1270,7 +1278,15 @@ func TestV3WatchCloseCancelRace(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if minWatches != "0" { var expected string
t.Fatalf("expected zero watches, got %s", minWatches) if throughProxy {
// grpc proxy has additional 2 watches open
expected = "2"
} else {
expected = "0"
}
if minWatches != expected {
t.Fatalf("expected %s watch, got %s", expected, minWatches)
} }
} }