tests: Cleanup member interface by exposing Bridge directly

This commit is contained in:
Marek Siarkowicz
2021-09-24 14:06:20 +02:00
parent f324894e8f
commit 0bac49bda4
7 changed files with 32 additions and 31 deletions

View File

@@ -68,7 +68,7 @@ func (b *bridge) Close() {
b.wg.Wait()
}
func (b *bridge) Reset() {
func (b *bridge) DropConnections() {
b.mu.Lock()
defer b.mu.Unlock()
for bc := range b.conns {
@@ -77,13 +77,13 @@ func (b *bridge) Reset() {
b.conns = make(map[*bridgeConn]struct{})
}
func (b *bridge) Pause() {
func (b *bridge) PauseConnections() {
b.mu.Lock()
b.pausec = make(chan struct{})
b.mu.Unlock()
}
func (b *bridge) Unpause() {
func (b *bridge) UnpauseConnections() {
b.mu.Lock()
select {
case <-b.pausec:

View File

@@ -77,7 +77,7 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
// give enough time for balancer resolution
time.Sleep(5 * time.Second)
clus.Members[0].Blackhole()
clus.Members[0].Bridge().Blackhole()
if _, err = clus.Client(1).Put(context.TODO(), "foo", "bar"); err != nil {
t.Fatal(err)
@@ -88,12 +88,12 @@ func TestBalancerUnderBlackholeKeepAliveWatch(t *testing.T) {
t.Error("took too long to receive watch events")
}
clus.Members[0].Unblackhole()
clus.Members[0].Bridge().Unblackhole()
// waiting for moving eps[0] out of unhealthy, so that it can be re-pined.
time.Sleep(ccfg.DialTimeout)
clus.Members[1].Blackhole()
clus.Members[1].Bridge().Blackhole()
// make sure client[0] can connect to eps[0] after remove the blackhole.
if _, err = clus.Client(0).Get(context.TODO(), "foo"); err != nil {
@@ -196,7 +196,7 @@ func testBalancerUnderBlackholeNoKeepAlive(t *testing.T, op func(*clientv3.Clien
cli.SetEndpoints(eps...)
// blackhole eps[0]
clus.Members[0].Blackhole()
clus.Members[0].Bridge().Blackhole()
// With round robin balancer, client will make a request to a healthy endpoint
// within a few requests.

View File

@@ -884,12 +884,12 @@ func TestKVPutAtMostOnce(t *testing.T) {
}
for i := 0; i < 10; i++ {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
donec := make(chan struct{})
go func() {
defer close(donec)
for i := 0; i < 10; i++ {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
time.Sleep(5 * time.Millisecond)
}
}()

View File

@@ -1509,11 +1509,11 @@ func TestLeasingReconnectOwnerConsistency(t *testing.T) {
for i := 0; i < 10; i++ {
v := fmt.Sprintf("%d", i)
donec := make(chan struct{})
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
go func() {
defer close(donec)
for i := 0; i < 20; i++ {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
time.Sleep(time.Millisecond)
}
}()
@@ -1663,9 +1663,9 @@ func TestLeasingReconnectTxn(t *testing.T) {
donec := make(chan struct{})
go func() {
defer close(donec)
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
for i := 0; i < 10; i++ {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
time.Sleep(time.Millisecond)
}
time.Sleep(10 * time.Millisecond)
@@ -1703,11 +1703,11 @@ func TestLeasingReconnectNonOwnerGet(t *testing.T) {
n := 0
for i := 0; i < 10; i++ {
donec := make(chan struct{})
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
go func() {
defer close(donec)
for j := 0; j < 10; j++ {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
time.Sleep(time.Millisecond)
}
}()

View File

@@ -188,7 +188,7 @@ func testWatchReconnRequest(t *testing.T, wctx *watchctx) {
defer close(donec)
// take down watcher connection
for {
wctx.clus.Members[wctx.wclientMember].DropConnections()
wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections()
select {
case <-timer:
// spinning on close may live lock reconnection
@@ -230,7 +230,7 @@ func testWatchReconnInit(t *testing.T, wctx *watchctx) {
if wctx.ch = wctx.w.Watch(context.TODO(), "a"); wctx.ch == nil {
t.Fatalf("expected non-nil channel")
}
wctx.clus.Members[wctx.wclientMember].DropConnections()
wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections()
// watcher should recover
putAndWatch(t, wctx, "a", "a")
}
@@ -247,7 +247,7 @@ func testWatchReconnRunning(t *testing.T, wctx *watchctx) {
}
putAndWatch(t, wctx, "a", "a")
// take down watcher connection
wctx.clus.Members[wctx.wclientMember].DropConnections()
wctx.clus.Members[wctx.wclientMember].Bridge().DropConnections()
// watcher should recover
putAndWatch(t, wctx, "a", "b")
}
@@ -368,8 +368,8 @@ func TestWatchResumeInitRev(t *testing.T) {
t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok)
}
// pause wch
clus.Members[0].DropConnections()
clus.Members[0].PauseConnections()
clus.Members[0].Bridge().DropConnections()
clus.Members[0].Bridge().PauseConnections()
select {
case resp, ok := <-wch:
@@ -378,7 +378,7 @@ func TestWatchResumeInitRev(t *testing.T) {
}
// resume wch
clus.Members[0].UnpauseConnections()
clus.Members[0].Bridge().UnpauseConnections()
select {
case resp, ok := <-wch:
@@ -968,7 +968,7 @@ func TestWatchWithCreatedNotificationDropConn(t *testing.T) {
t.Fatalf("expected created event, got %v", resp)
}
cluster.Members[0].DropConnections()
cluster.Members[0].Bridge().DropConnections()
// check watch channel doesn't post another watch response.
select {
@@ -1056,7 +1056,7 @@ func TestWatchOverlapContextCancel(t *testing.T) {
func TestWatchOverlapDropConnContextCancel(t *testing.T) {
f := func(clus *integration.ClusterV3) {
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
}
testWatchOverlapContextCancel(t, f)
}
@@ -1164,7 +1164,7 @@ func TestWatchStressResumeClose(t *testing.T) {
for i := range wchs {
wchs[i] = cli.Watch(ctx, "abc")
}
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
cancel()
if err := cli.Close(); err != nil {
t.Fatal(err)

View File

@@ -772,6 +772,13 @@ func (m *member) addBridge() (*bridge, error) {
return m.grpcBridge, nil
}
func (m *member) Bridge() *bridge {
if !m.useBridge {
m.Logger.Panic("Bridge not available. Please configure using bridge before creating cluster.")
}
return m.grpcBridge
}
func (m *member) grpcAddr() string {
// prefix with localhost so cert has right domain
addr := "localhost:" + m.Name
@@ -796,12 +803,6 @@ func (m *member) ElectionTimeout() time.Duration {
func (m *member) ID() types.ID { return m.s.ID() }
func (m *member) DropConnections() { m.grpcBridge.Reset() }
func (m *member) PauseConnections() { m.grpcBridge.Pause() }
func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }
// NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) {
if m.grpcURL == "" {

View File

@@ -1068,7 +1068,7 @@ func TestV3WatchClose(t *testing.T) {
}()
}
clus.Members[0].DropConnections()
clus.Members[0].Bridge().DropConnections()
wg.Wait()
}