diff --git a/tests/integration/bridge.go b/tests/integration/bridge.go index 22040b882..67d6ae447 100644 --- a/tests/integration/bridge.go +++ b/tests/integration/bridge.go @@ -29,8 +29,8 @@ type Dialer interface { // to disconnect grpc network connections without closing the logical grpc connection. type bridge struct { dialer Dialer - l net.Listener - conns map[*bridgeConn]struct{} + l net.Listener + conns map[*bridgeConn]struct{} stopc chan struct{} pausec chan struct{} @@ -43,8 +43,8 @@ type bridge struct { func newBridge(dialer Dialer, listener net.Listener) (*bridge, error) { b := &bridge{ // bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number - dialer: dialer, - l: listener, + dialer: dialer, + l: listener, conns: make(map[*bridgeConn]struct{}), stopc: make(chan struct{}), pausec: make(chan struct{}), diff --git a/tests/integration/clientv3/connectivity/black_hole_test.go b/tests/integration/clientv3/connectivity/black_hole_test.go index a4415322f..c9a199558 100644 --- a/tests/integration/clientv3/connectivity/black_hole_test.go +++ b/tests/integration/clientv3/connectivity/black_hole_test.go @@ -38,6 +38,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{ Size: 2, GRPCKeepAliveMinTime: time.Millisecond, // avoid too_many_pings + UseBridge: true, }) defer clus.Terminate(t) @@ -170,6 +171,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien clus := integration.NewClusterV3(t, &integration.ClusterConfig{ Size: 2, SkipCreatingClient: true, + UseBridge: true, }) defer clus.Terminate(t) diff --git a/tests/integration/clientv3/connectivity/server_shutdown_test.go b/tests/integration/clientv3/connectivity/server_shutdown_test.go index e7660852b..5b888e6fe 100644 --- a/tests/integration/clientv3/connectivity/server_shutdown_test.go +++ b/tests/integration/clientv3/connectivity/server_shutdown_test.go @@ -35,6 +35,7 @@ func TestBalancerUnderServerShutdownWatch(t *testing.T) { clus := integration.NewClusterV3(t, &integration.ClusterConfig{ Size: 3, SkipCreatingClient: true, + UseBridge: true, }) defer clus.Terminate(t) @@ -278,6 +279,7 @@ func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizabl cfg := &integration.ClusterConfig{ Size: 2, SkipCreatingClient: true, + UseBridge: true, } if linearizable { cfg.Size = 3 diff --git a/tests/integration/clientv3/kv_test.go b/tests/integration/clientv3/kv_test.go index e7b0b0f3a..9cfd5b02c 100644 --- a/tests/integration/clientv3/kv_test.go +++ b/tests/integration/clientv3/kv_test.go @@ -712,7 +712,7 @@ func TestKVGetRetry(t *testing.T) { integration.BeforeTest(t) clusterSize := 3 - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: clusterSize, UseBridge: true}) defer clus.Terminate(t) // because killing leader and following election @@ -765,7 +765,7 @@ func TestKVGetRetry(t *testing.T) { func TestKVPutFailGetRetry(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) kv := clus.Client(0) @@ -876,7 +876,7 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { // in the presence of network errors. func TestKVPutAtMostOnce(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) if _, err := clus.Client(0).Put(context.TODO(), "k", "1"); err != nil { diff --git a/tests/integration/clientv3/lease/lease_test.go b/tests/integration/clientv3/lease/lease_test.go index 326289949..6a6cf7dd3 100644 --- a/tests/integration/clientv3/lease/lease_test.go +++ b/tests/integration/clientv3/lease/lease_test.go @@ -190,7 +190,7 @@ func TestLeaseKeepAliveHandleFailure(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) // TODO: change this line to get a cluster client @@ -416,7 +416,7 @@ func TestLeaseRevokeNewAfterClose(t *testing.T) { func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) @@ -462,7 +462,7 @@ func TestLeaseKeepAliveCloseAfterDisconnectRevoke(t *testing.T) { func TestLeaseKeepAliveInitTimeout(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) @@ -495,7 +495,7 @@ func TestLeaseKeepAliveInitTimeout(t *testing.T) { func TestLeaseKeepAliveTTLTimeout(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) @@ -530,7 +530,7 @@ func TestLeaseKeepAliveTTLTimeout(t *testing.T) { func TestLeaseTimeToLive(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) c := clus.RandClient() @@ -656,7 +656,7 @@ func TestLeaseLeases(t *testing.T) { func TestLeaseRenewLostQuorum(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) @@ -728,7 +728,7 @@ func TestLeaseKeepAliveLoopExit(t *testing.T) { // transient cluster failure. func TestV3LeaseFailureOverlap(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true}) defer clus.Terminate(t) numReqs := 5 @@ -782,7 +782,7 @@ func TestV3LeaseFailureOverlap(t *testing.T) { func TestLeaseWithRequireLeader(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true}) defer clus.Terminate(t) c := clus.Client(0) diff --git a/tests/integration/clientv3/lease/leasing_test.go b/tests/integration/clientv3/lease/leasing_test.go index 02814aa46..47b776c5a 100644 --- a/tests/integration/clientv3/lease/leasing_test.go +++ b/tests/integration/clientv3/lease/leasing_test.go @@ -194,7 +194,7 @@ func TestLeasingPutInvalidateExisting(t *testing.T) { // TestLeasingGetNoLeaseTTL checks a key with a TTL is not leased. func TestLeasingGetNoLeaseTTL(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -223,7 +223,7 @@ func TestLeasingGetNoLeaseTTL(t *testing.T) { // when the etcd cluster is partitioned. func TestLeasingGetSerializable(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -325,7 +325,7 @@ func TestLeasingRevGet(t *testing.T) { // TestLeasingGetWithOpts checks options that can be served through the cache do not depend on the server. func TestLeasingGetWithOpts(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -417,7 +417,7 @@ func TestLeasingConcurrentPut(t *testing.T) { func TestLeasingDisconnectedGet(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -549,7 +549,7 @@ func TestLeasingOverwriteResponse(t *testing.T) { func TestLeasingOwnerPutResponse(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -616,7 +616,7 @@ func TestLeasingTxnOwnerGetRange(t *testing.T) { func TestLeasingTxnOwnerGet(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) client := clus.Client(0) @@ -772,7 +772,7 @@ func TestLeasingTxnOwnerDelete(t *testing.T) { func TestLeasingTxnOwnerIf(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -866,7 +866,7 @@ func TestLeasingTxnOwnerIf(t *testing.T) { func TestLeasingTxnCancel(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv1, closeLKV1, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -1084,7 +1084,7 @@ func TestLeasingTxnRandIfThenOrElse(t *testing.T) { func TestLeasingOwnerPutError(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -1105,7 +1105,7 @@ func TestLeasingOwnerPutError(t *testing.T) { func TestLeasingOwnerDeleteError(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -1126,7 +1126,7 @@ func TestLeasingOwnerDeleteError(t *testing.T) { func TestLeasingNonOwnerPutError(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "pfx/") @@ -1200,7 +1200,7 @@ func testLeasingOwnerDelete(t *testing.T, del clientv3.Op) { func TestLeasingDeleteRangeBounds(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) delkv, closeDelKV, err := leasing.NewKV(clus.Client(0), "0/") @@ -1375,7 +1375,7 @@ func TestLeasingPutGetDeleteConcurrent(t *testing.T) { // disconnected when trying to submit revoke txn. func TestLeasingReconnectOwnerRevoke(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/") @@ -1436,7 +1436,7 @@ func TestLeasingReconnectOwnerRevoke(t *testing.T) { // disconnected and the watch is compacted. func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv1, closeLKV1, err1 := leasing.NewKV(clus.Client(0), "foo/") @@ -1489,7 +1489,7 @@ func TestLeasingReconnectOwnerRevokeCompact(t *testing.T) { // not cause inconsistency between the server and the client. func TestLeasingReconnectOwnerConsistency(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") @@ -1649,7 +1649,7 @@ func TestLeasingTxnAtomicCache(t *testing.T) { // TestLeasingReconnectTxn checks that Txn is resilient to disconnects. func TestLeasingReconnectTxn(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") @@ -1685,7 +1685,7 @@ func TestLeasingReconnectTxn(t *testing.T) { // not cause inconsistency between the server and the client. func TestLeasingReconnectNonOwnerGet(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") @@ -1813,7 +1813,7 @@ func TestLeasingDo(t *testing.T) { func TestLeasingTxnOwnerPutBranch(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/") @@ -1907,7 +1907,7 @@ func randCmps(pfx string, dat []*clientv3.PutResponse) (cmps []clientv3.Cmp, the func TestLeasingSessionExpire(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) @@ -1983,7 +1983,7 @@ func TestLeasingSessionExpireCancel(t *testing.T) { for i := range tests { t.Run(fmt.Sprintf("test %d", i), func(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) lkv, closeLKV, err := leasing.NewKV(clus.Client(0), "foo/", concurrency.WithTTL(1)) diff --git a/tests/integration/clientv3/maintenance_test.go b/tests/integration/clientv3/maintenance_test.go index 8a72bb0f1..53c3c9c90 100644 --- a/tests/integration/clientv3/maintenance_test.go +++ b/tests/integration/clientv3/maintenance_test.go @@ -142,7 +142,7 @@ func TestMaintenanceSnapshotError(t *testing.T) { func TestMaintenanceSnapshotErrorInflight(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) // take about 1-second to read snapshot diff --git a/tests/integration/clientv3/ordering_kv_test.go b/tests/integration/clientv3/ordering_kv_test.go index fdce92495..b6b3ce71f 100644 --- a/tests/integration/clientv3/ordering_kv_test.go +++ b/tests/integration/clientv3/ordering_kv_test.go @@ -30,7 +30,7 @@ func TestDetectKvOrderViolation(t *testing.T) { var errOrderViolation = errors.New("DetectedOrderViolation") integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) cfg := clientv3.Config{ @@ -97,7 +97,7 @@ func TestDetectTxnOrderViolation(t *testing.T) { var errOrderViolation = errors.New("DetectedOrderViolation") integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) cfg := clientv3.Config{ diff --git a/tests/integration/clientv3/ordering_util_test.go b/tests/integration/clientv3/ordering_util_test.go index 85c61f407..a4b65ec38 100644 --- a/tests/integration/clientv3/ordering_util_test.go +++ b/tests/integration/clientv3/ordering_util_test.go @@ -80,7 +80,7 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) { func TestUnresolvableOrderViolation(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, SkipCreatingClient: true}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 5, SkipCreatingClient: true, UseBridge: true}) defer clus.Terminate(t) cfg := clientv3.Config{ Endpoints: []string{ diff --git a/tests/integration/clientv3/txn_test.go b/tests/integration/clientv3/txn_test.go index ffe93e096..679b9868f 100644 --- a/tests/integration/clientv3/txn_test.go +++ b/tests/integration/clientv3/txn_test.go @@ -53,7 +53,7 @@ func TestTxnError(t *testing.T) { func TestTxnWriteFail(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) kv := clus.Client(0) @@ -103,7 +103,7 @@ func TestTxnReadRetry(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) kv := clus.Client(0) diff --git a/tests/integration/clientv3/watch_test.go b/tests/integration/clientv3/watch_test.go index 2fea3c9ba..b5a0dd08f 100644 --- a/tests/integration/clientv3/watch_test.go +++ b/tests/integration/clientv3/watch_test.go @@ -47,7 +47,7 @@ type watchctx struct { func runWatchTest(t *testing.T, f watcherTest) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) wclientMember := rand.Intn(3) @@ -348,7 +348,7 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) { func TestWatchResumeInitRev(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) @@ -404,7 +404,7 @@ func TestWatchResumeInitRev(t *testing.T) { func TestWatchResumeCompacted(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) // create a waiting watcher at rev 1 @@ -955,7 +955,7 @@ func TestWatchWithCreatedNotification(t *testing.T) { func TestWatchWithCreatedNotificationDropConn(t *testing.T) { integration.BeforeTest(t) - cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer cluster.Terminate(t) client := cluster.RandClient() @@ -1063,7 +1063,7 @@ func TestWatchOverlapDropConnContextCancel(t *testing.T) { func testWatchOverlapContextCancel(t *testing.T, f func(*integration.ClusterV3)) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) n := 100 @@ -1154,7 +1154,7 @@ func TestWatchCancelAndCloseClient(t *testing.T) { // then closes the watcher interface to ensure correct clean up. func TestWatchStressResumeClose(t *testing.T) { integration.BeforeTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.Client(0) diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index 6ccfec419..37bb40b59 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -153,6 +153,9 @@ type ClusterConfig struct { // UseIP is true to use only IP for gRPC requests. UseIP bool + // UseBridge adds bridge between client and grpc server. Should be used in tests that + // want to manipulate connection or require connection not breaking despite server stop/restart. + UseBridge bool EnableLeaseCheckpoint bool LeaseCheckpointInterval time.Duration @@ -313,6 +316,7 @@ func (c *cluster) mustNewMember(t testutil.TB) *member { clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, useIP: c.cfg.UseIP, + useBridge: c.cfg.UseBridge, enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint, leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval, WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval, @@ -582,6 +586,7 @@ type member struct { clientMaxCallSendMsgSize int clientMaxCallRecvMsgSize int useIP bool + useBridge bool isLearner bool closed bool @@ -605,6 +610,7 @@ type memberConfig struct { clientMaxCallSendMsgSize int clientMaxCallRecvMsgSize int useIP bool + useBridge bool enableLeaseCheckpoint bool leaseCheckpointInterval time.Duration WatchProgressNotifyInterval time.Duration @@ -698,6 +704,7 @@ func mustNewMember(t testutil.TB, mcfg memberConfig) *member { m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize m.useIP = mcfg.useIP + m.useBridge = mcfg.useBridge m.EnableLeaseCheckpoint = mcfg.enableLeaseCheckpoint m.LeaseCheckpointInterval = mcfg.leaseCheckpointInterval @@ -730,35 +737,53 @@ func memberLogger(t testutil.TB, name string) *zap.Logger { // listenGRPC starts a grpc server over a unix domain socket on the member func (m *member) listenGRPC() error { // prefix with localhost so cert has right domain - m.grpcURL = "localhost:" + m.Name - m.Logger.Info("LISTEN GRPC", zap.String("m.grpcURL", m.grpcURL), zap.String("m.Name", m.Name)) - if m.useIP { // for IP-only TLS certs - m.grpcURL = "127.0.0.1:" + m.Name - } - grpcListener, err := transport.NewUnixListener(m.grpcURL) + grpcAddr := m.grpcAddr() + m.Logger.Info("LISTEN GRPC", zap.String("grpcAddr", grpcAddr), zap.String("m.Name", m.Name)) + grpcListener, err := transport.NewUnixListener(grpcAddr) if err != nil { - return fmt.Errorf("listen failed on grpc socket %s (%v)", m.grpcURL, err) + return fmt.Errorf("listen failed on grpc socket %s (%v)", grpcAddr, err) } - bridgeAddr := m.grpcURL + "0" - bridgeListener, err := transport.NewUnixListener(bridgeAddr) - if err != nil { - grpcListener.Close() - return fmt.Errorf("listen failed on bridge socket %s (%v)", m.grpcURL, err) + m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + grpcAddr + if m.useBridge { + _, err = m.addBridge() + if err != nil { + grpcListener.Close() + return err + } } - m.grpcBridge, err = newBridge(dialer{network: "unix", addr: m.grpcURL}, bridgeListener) - if err != nil { - bridgeListener.Close() - grpcListener.Close() - return err - } - m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr m.grpcListener = grpcListener return nil } +func (m *member) addBridge() (*bridge, error) { + grpcAddr := m.grpcAddr() + bridgeAddr := grpcAddr + "0" + m.Logger.Info("LISTEN BRIDGE", zap.String("grpc-address", bridgeAddr), zap.String("member", m.Name)) + bridgeListener, err := transport.NewUnixListener(bridgeAddr) + if err != nil { + return nil, fmt.Errorf("listen failed on bridge socket %s (%v)", grpcAddr, err) + } + m.grpcBridge, err = newBridge(dialer{network: "unix", addr: grpcAddr}, bridgeListener) + if err != nil { + bridgeListener.Close() + return nil, err + } + m.grpcURL = schemeFromTLSInfo(m.ClientTLSInfo) + "://" + bridgeAddr + return m.grpcBridge, nil +} + +func (m *member) grpcAddr() string { + // prefix with localhost so cert has right domain + addr := "localhost:" + m.Name + if m.useIP { // for IP-only TLS certs + addr = "127.0.0.1:" + m.Name + } + return addr +} + type dialer struct { network string - addr string + addr string } func (d dialer) Dial() (net.Conn, error) { diff --git a/tests/integration/cluster_test.go b/tests/integration/cluster_test.go index e25d77f21..2fb5a18d9 100644 --- a/tests/integration/cluster_test.go +++ b/tests/integration/cluster_test.go @@ -173,7 +173,7 @@ func testDecreaseClusterSize(t *testing.T, size int) { } func TestForceNewCluster(t *testing.T) { - c := NewCluster(t, 3) + c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true}) c.Launch(t) cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil) kapi := client.NewKeysAPI(cc) @@ -283,7 +283,7 @@ func testIssue2746(t *testing.T, members int) { func TestIssue2904(t *testing.T) { BeforeTest(t) // start 1-member cluster to ensure member 0 is the leader of the cluster. - c := NewCluster(t, 1) + c := newCluster(t, &ClusterConfig{Size: 1, UseBridge: true}) c.Launch(t) defer c.Terminate(t) @@ -319,7 +319,7 @@ func TestIssue2904(t *testing.T) { func TestIssue3699(t *testing.T) { // start a cluster of 3 nodes a, b, c BeforeTest(t) - c := NewCluster(t, 3) + c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true}) c.Launch(t) defer c.Terminate(t) @@ -371,7 +371,7 @@ func TestIssue3699(t *testing.T) { // TestRejectUnhealthyAdd ensures an unhealthy cluster rejects adding members. func TestRejectUnhealthyAdd(t *testing.T) { BeforeTest(t) - c := NewCluster(t, 3) + c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true}) for _, m := range c.Members { m.ServerConfig.StrictReconfigCheck = true } @@ -415,7 +415,7 @@ func TestRejectUnhealthyAdd(t *testing.T) { // if quorum will be lost. func TestRejectUnhealthyRemove(t *testing.T) { BeforeTest(t) - c := NewCluster(t, 5) + c := newCluster(t, &ClusterConfig{Size: 5, UseBridge: true}) for _, m := range c.Members { m.ServerConfig.StrictReconfigCheck = true } @@ -464,7 +464,7 @@ func TestRestartRemoved(t *testing.T) { BeforeTest(t) // 1. start single-member cluster - c := NewCluster(t, 1) + c := newCluster(t, &ClusterConfig{Size: 1, UseBridge: true}) for _, m := range c.Members { m.ServerConfig.StrictReconfigCheck = true } @@ -540,7 +540,7 @@ func clusterMustProgress(t *testing.T, membs []*member) { func TestSpeedyTerminate(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true}) // Stop/Restart so requests will time out on lost leaders for i := 0; i < 3; i++ { clus.Members[i].Stop(t) diff --git a/tests/integration/member_test.go b/tests/integration/member_test.go index 5493924c9..99788b757 100644 --- a/tests/integration/member_test.go +++ b/tests/integration/member_test.go @@ -46,7 +46,7 @@ func TestPauseMember(t *testing.T) { func TestRestartMember(t *testing.T) { BeforeTest(t) - c := NewCluster(t, 3) + c := newCluster(t, &ClusterConfig{Size: 3, UseBridge: true}) c.Launch(t) defer c.Terminate(t) @@ -88,7 +88,7 @@ func TestLaunchDuplicateMemberShouldFail(t *testing.T) { func TestSnapshotAndRestartMember(t *testing.T) { BeforeTest(t) - m := mustNewMember(t, memberConfig{name: "snapAndRestartTest"}) + m := mustNewMember(t, memberConfig{name: "snapAndRestartTest", useBridge: true}) m.SnapshotCount = 100 m.Launch() defer m.Terminate(t) diff --git a/tests/integration/v3_alarm_test.go b/tests/integration/v3_alarm_test.go index 0151dc27f..f39bd48cc 100644 --- a/tests/integration/v3_alarm_test.go +++ b/tests/integration/v3_alarm_test.go @@ -35,7 +35,7 @@ func TestV3StorageQuotaApply(t *testing.T) { BeforeTest(t) quotasize := int64(16 * os.Getpagesize()) - clus := NewClusterV3(t, &ClusterConfig{Size: 2}) + clus := NewClusterV3(t, &ClusterConfig{Size: 2, UseBridge: true}) defer clus.Terminate(t) kvc0 := toGRPC(clus.Client(0)).KV kvc1 := toGRPC(clus.Client(1)).KV @@ -147,7 +147,7 @@ func TestV3AlarmDeactivate(t *testing.T) { func TestV3CorruptAlarm(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) var wg sync.WaitGroup diff --git a/tests/integration/v3_grpc_inflight_test.go b/tests/integration/v3_grpc_inflight_test.go index 9f5085112..7432fb46a 100644 --- a/tests/integration/v3_grpc_inflight_test.go +++ b/tests/integration/v3_grpc_inflight_test.go @@ -61,7 +61,7 @@ func TestV3MaintenanceDefragmentInflightRange(t *testing.T) { // See https://github.com/etcd-io/etcd/issues/7322 for more detail. func TestV3KVInflightRangeRequests(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.RandClient() diff --git a/tests/integration/v3_grpc_test.go b/tests/integration/v3_grpc_test.go index 55edf4dc9..e4c5fc154 100644 --- a/tests/integration/v3_grpc_test.go +++ b/tests/integration/v3_grpc_test.go @@ -88,7 +88,7 @@ func TestV3PutOverwrite(t *testing.T) { // TestPutRestart checks if a put after an unrelated member restart succeeds func TestV3PutRestart(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) kvIdx := rand.Intn(3) @@ -1207,7 +1207,7 @@ func TestV3Hash(t *testing.T) { // TestV3HashRestart ensures that hash stays the same after restart. func TestV3HashRestart(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) cli := clus.RandClient() @@ -1240,7 +1240,7 @@ func TestV3StorageQuotaAPI(t *testing.T) { BeforeTest(t) quotasize := int64(16 * os.Getpagesize()) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true}) // Set a quota on one node clus.Members[0].QuotaBackendBytes = quotasize @@ -1851,7 +1851,7 @@ func TestGRPCRequireLeader(t *testing.T) { func TestGRPCStreamRequireLeader(t *testing.T) { BeforeTest(t) - cfg := ClusterConfig{Size: 3} + cfg := ClusterConfig{Size: 3, UseBridge: true} clus := newClusterV3NoClients(t, &cfg) defer clus.Terminate(t) diff --git a/tests/integration/v3_lease_test.go b/tests/integration/v3_lease_test.go index 08b0ca7bb..1727da65c 100644 --- a/tests/integration/v3_lease_test.go +++ b/tests/integration/v3_lease_test.go @@ -36,7 +36,7 @@ import ( func TestV3LeasePromote(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, UseBridge: true}) defer clus.Terminate(t) // create lease @@ -237,6 +237,7 @@ func TestV3LeaseCheckpoint(t *testing.T) { Size: 3, EnableLeaseCheckpoint: true, LeaseCheckpointInterval: leaseInterval, + UseBridge: true, }) defer clus.Terminate(t) @@ -649,7 +650,7 @@ const fiveMinTTL int64 = 300 func TestV3LeaseRecoverAndRevoke(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) kvc := toGRPC(clus.Client(0)).KV @@ -700,7 +701,7 @@ func TestV3LeaseRecoverAndRevoke(t *testing.T) { func TestV3LeaseRevokeAndRecover(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) kvc := toGRPC(clus.Client(0)).KV @@ -752,7 +753,7 @@ func TestV3LeaseRevokeAndRecover(t *testing.T) { func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) kvc := toGRPC(clus.Client(0)).KV @@ -808,7 +809,7 @@ func TestV3LeaseRecoverKeyWithDetachedLease(t *testing.T) { func TestV3LeaseRecoverKeyWithMutipleLease(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) kvc := toGRPC(clus.Client(0)).KV diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index 240af36f4..b249a8649 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -1040,7 +1040,7 @@ func TestWatchWithProgressNotify(t *testing.T) { // TestV3WatcMultiOpenhClose opens many watchers concurrently on multiple streams. func TestV3WatchClose(t *testing.T) { BeforeTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + clus := NewClusterV3(t, &ClusterConfig{Size: 1, UseBridge: true}) defer clus.Terminate(t) c := clus.Client(0)