diff --git a/tests/integration/bridge.go b/tests/integration/bridge.go index 67d6ae447..746168fc7 100644 --- a/tests/integration/bridge.go +++ b/tests/integration/bridge.go @@ -68,7 +68,7 @@ func (b *bridge) Close() { b.wg.Wait() } -func (b *bridge) Reset() { +func (b *bridge) DropConnections() { b.mu.Lock() defer b.mu.Unlock() for bc := range b.conns { @@ -77,13 +77,13 @@ func (b *bridge) Reset() { b.conns = make(map[*bridgeConn]struct{}) } -func (b *bridge) Pause() { +func (b *bridge) PauseConnections() { b.mu.Lock() b.pausec = make(chan struct{}) b.mu.Unlock() } -func (b *bridge) Unpause() { +func (b *bridge) UnpauseConnections() { b.mu.Lock() select { case <-b.pausec: diff --git a/tests/integration/clientv3/connectivity/black_hole_test.go b/tests/integration/clientv3/connectivity/black_hole_test.go index c9a199558..4445c69f6 100644 --- a/tests/integration/clientv3/connectivity/black_hole_test.go +++ b/tests/integration/clientv3/connectivity/black_hole_test.go @@ -77,7 +77,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) { // give enough time for balancer resolution time.Sleep(5 * time.Second) - clus.Members[0].Blackhole() + clus.Members[0].Bridge().Blackhole() if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil { t.Fatal(err) @@ -88,12 +88,12 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) { t.Error("took too long to receive watch events") } - clus.Members[0].Unblackhole() + clus.Members[0].Bridge().Unblackhole() // waiting for moving eps[0] out of unhealthy, so that it can be re-pined. time.Sleep(ccfg.DialTimeout) - clus.Members[1].Blackhole() + clus.Members[1].Bridge().Blackhole() // make sure client[0] can connect to eps[0] after remove the blackhole. if _, err = clus.Client(0).Get(context.TODO(), "foo"); err != nil { @@ -196,7 +196,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien cli.SetEndpoints(eps...) // blackhole eps[0] - clus.Members[0].Blackhole() + clus.Members[0].Bridge().Blackhole() // With round robin balancer, client will make a request to a healthy endpoint // within a few requests. diff --git a/tests/integration/clientv3/kv_test.go b/tests/integration/clientv3/kv_test.go index 9cfd5b02c..6cfd471ee 100644 --- a/tests/integration/clientv3/kv_test.go +++ b/tests/integration/clientv3/kv_test.go @@ -884,12 +884,12 @@ func TestKVPutAtMostOnce(t *testing.T) { } for i := 0; i < 10; i++ { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() donec := make(chan struct{}) go func() { defer close(donec) for i := 0; i < 10; i++ { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() time.Sleep(5 * time.Millisecond) } }() diff --git a/tests/integration/clientv3/lease/leasing_test.go b/tests/integration/clientv3/lease/leasing_test.go index 47b776c5a..e9d28c3a8 100644 --- a/tests/integration/clientv3/lease/leasing_test.go +++ b/tests/integration/clientv3/lease/leasing_test.go @@ -1509,11 +1509,11 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) { for i := 0; i < 10; i++ { v := fmt.Sprintf("%d", i) donec := make(chan struct{}) - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() go func() { defer close(donec) for i := 0; i < 20; i++ { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() time.Sleep(time.Millisecond) } }() @@ -1663,9 +1663,9 @@ func TestLeasingReconnectTxn(t *testing.T) { donec := make(chan struct{}) go func() { defer close(donec) - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() for i := 0; i < 10; i++ { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() time.Sleep(time.Millisecond) } time.Sleep(10 * time.Millisecond) @@ -1703,11 +1703,11 @@ func TestLeasingReconnectNonOwnerGet(t *testing.T) { n := 0 for i := 0; i < 10; i++ { donec := make(chan struct{}) - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() go func() { defer close(donec) for j := 0; j < 10; j++ { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() time.Sleep(time.Millisecond) } }() diff --git a/tests/integration/clientv3/watch_test.go b/tests/integration/clientv3/watch_test.go index b5a0dd08f..7a992ecf9 100644 --- a/tests/integration/clientv3/watch_test.go +++ b/tests/integration/clientv3/watch_test.go @@ -188,7 +188,7 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) { defer close(donec) // take down watcher connection for { - wctx.clus.Members[wctx.wclientMember].DropConnections() + wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections() select { case <-timer: // spinning on close may live lock reconnection @@ -230,7 +230,7 @@ func testWatchReconnInit(t *testing.T, wctx *watchctx) { if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil { t.Fatalf("expected non-nil channel") } - wctx.clus.Members[wctx.wclientMember].DropConnections() + wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections() // watcher should recover putAndWatch(t, wctx, "a", "a") } @@ -247,7 +247,7 @@ func testWatchReconnRunning(t *testing.T, wctx *watchctx) { } putAndWatch(t, wctx, "a", "a") // take down watcher connection - wctx.clus.Members[wctx.wclientMember].DropConnections() + wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections() // watcher should recover putAndWatch(t, wctx, "a", "b") } @@ -368,8 +368,8 @@ func TestWatchResumeInitRev(t *testing.T) { t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok) } // pause wch - clus.Members[0].DropConnections() - clus.Members[0].PauseConnections() + clus.Members[0].Bridge().DropConnections() + clus.Members[0].Bridge().PauseConnections() select { case resp, ok := <-wch: @@ -378,7 +378,7 @@ func TestWatchResumeInitRev(t *testing.T) { } // resume wch - clus.Members[0].UnpauseConnections() + clus.Members[0].Bridge().UnpauseConnections() select { case resp, ok := <-wch: @@ -968,7 +968,7 @@ func TestWatchWithCreatedNotificationDropConn(t *testing.T) { t.Fatalf("expected created event, got %v", resp) } - cluster.Members[0].DropConnections() + cluster.Members[0].Bridge().DropConnections() // check watch channel doesn't post another watch response. select { @@ -1056,7 +1056,7 @@ func TestWatchOverlapContextCancel(t *testing.T) { func TestWatchOverlapDropConnContextCancel(t *testing.T) { f := func(clus *integration.ClusterV3) { - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() } testWatchOverlapContextCancel(t, f) } @@ -1164,7 +1164,7 @@ func TestWatchStressResumeClose(t *testing.T) { for i := range wchs { wchs[i] = cli.Watch(ctx, "abc") } - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() cancel() if err := cli.Close(); err != nil { t.Fatal(err) diff --git a/tests/integration/cluster.go b/tests/integration/cluster.go index 37bb40b59..cf1c37520 100644 --- a/tests/integration/cluster.go +++ b/tests/integration/cluster.go @@ -772,6 +772,13 @@ func (m *member) addBridge() (*bridge, error) { return m.grpcBridge, nil } +func (m *member) Bridge() *bridge { + if !m.useBridge { + m.Logger.Panic("Bridge not available. Please configure using bridge before creating cluster.") + } + return m.grpcBridge +} + func (m *member) grpcAddr() string { // prefix with localhost so cert has right domain addr := "localhost:" + m.Name @@ -796,12 +803,6 @@ func (m *member) ElectionTimeout() time.Duration { func (m *member) ID() types.ID { return m.s.ID() } -func (m *member) DropConnections() { m.grpcBridge.Reset() } -func (m *member) PauseConnections() { m.grpcBridge.Pause() } -func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() } -func (m *member) Blackhole() { m.grpcBridge.Blackhole() } -func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() } - // NewClientV3 creates a new grpc client connection to the member func NewClientV3(m *member) (*clientv3.Client, error) { if m.grpcURL == "" { diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index b249a8649..a969cc6aa 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -1068,7 +1068,7 @@ func TestV3WatchClose(t *testing.T) { }() } - clus.Members[0].DropConnections() + clus.Members[0].Bridge().DropConnections() wg.Wait() }