Merge pull request #12538 from lzhfromustc/12_9_GoroutineLeak

test: change channel operations to avoid potential goroutine leaks
This commit is contained in:
Piotr Tabor 2021-02-01 21:16:43 +01:00 committed by GitHub
commit d6d03beaea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 20 additions and 18 deletions

View File

@ -22,14 +22,14 @@ import (
func TestReadWriteTimeoutDialer(t *testing.T) { func TestReadWriteTimeoutDialer(t *testing.T) {
stop := make(chan struct{}) stop := make(chan struct{})
defer func() {
stop <- struct{}{}
}()
ln, err := net.Listen("tcp", "127.0.0.1:0") ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil { if err != nil {
t.Fatalf("unexpected listen error: %v", err) t.Fatalf("unexpected listen error: %v", err)
} }
defer func() {
stop <- struct{}{}
}()
ts := testBlockingServer{ln, 2, stop} ts := testBlockingServer{ln, 2, stop}
go ts.Start(t) go ts.Start(t)

View File

@ -213,7 +213,7 @@ func TestApplyRepeat(t *testing.T) {
// wait for conf change message // wait for conf change message
act, err := n.Wait(1) act, err := n.Wait(1)
// wait for stop message (async to avoid deadlock) // wait for stop message (async to avoid deadlock)
stopc := make(chan error) stopc := make(chan error, 1)
go func() { go func() {
_, werr := n.Wait(1) _, werr := n.Wait(1)
stopc <- werr stopc <- werr

View File

@ -36,6 +36,7 @@ func TestDoubleBarrier(t *testing.T) {
b := recipe.NewDoubleBarrier(session, "test-barrier", waiters) b := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
donec := make(chan struct{}) donec := make(chan struct{})
defer close(donec)
for i := 0; i < waiters-1; i++ { for i := 0; i < waiters-1; i++ {
go func() { go func() {
session, err := concurrency.NewSession(clus.RandClient()) session, err := concurrency.NewSession(clus.RandClient())
@ -48,17 +49,17 @@ func TestDoubleBarrier(t *testing.T) {
if err := bb.Enter(); err != nil { if err := bb.Enter(); err != nil {
t.Errorf("could not enter on barrier (%v)", err) t.Errorf("could not enter on barrier (%v)", err)
} }
donec <- struct{}{} <-donec
if err := bb.Leave(); err != nil { if err := bb.Leave(); err != nil {
t.Errorf("could not leave on barrier (%v)", err) t.Errorf("could not leave on barrier (%v)", err)
} }
donec <- struct{}{} <-donec
}() }()
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
select { select {
case <-donec: case donec <- struct{}{}:
t.Fatalf("barrier did not enter-wait") t.Fatalf("barrier did not enter-wait")
default: default:
} }
@ -72,13 +73,13 @@ func TestDoubleBarrier(t *testing.T) {
select { select {
case <-timerC: case <-timerC:
t.Fatalf("barrier enter timed out") t.Fatalf("barrier enter timed out")
case <-donec: case donec <- struct{}{}:
} }
} }
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
select { select {
case <-donec: case donec <- struct{}{}:
t.Fatalf("barrier did not leave-wait") t.Fatalf("barrier did not leave-wait")
default: default:
} }
@ -89,7 +90,7 @@ func TestDoubleBarrier(t *testing.T) {
select { select {
case <-timerC: case <-timerC:
t.Fatalf("barrier leave timed out") t.Fatalf("barrier leave timed out")
case <-donec: case donec <- struct{}{}:
} }
} }
} }
@ -100,6 +101,7 @@ func TestDoubleBarrierFailover(t *testing.T) {
waiters := 10 waiters := 10
donec := make(chan struct{}) donec := make(chan struct{})
defer close(donec)
s0, err := concurrency.NewSession(clus.Client(0)) s0, err := concurrency.NewSession(clus.Client(0))
if err != nil { if err != nil {
@ -118,7 +120,7 @@ func TestDoubleBarrierFailover(t *testing.T) {
if berr := b.Enter(); berr != nil { if berr := b.Enter(); berr != nil {
t.Errorf("could not enter on barrier (%v)", berr) t.Errorf("could not enter on barrier (%v)", berr)
} }
donec <- struct{}{} <-donec
}() }()
for i := 0; i < waiters-1; i++ { for i := 0; i < waiters-1; i++ {
@ -127,16 +129,16 @@ func TestDoubleBarrierFailover(t *testing.T) {
if berr := b.Enter(); berr != nil { if berr := b.Enter(); berr != nil {
t.Errorf("could not enter on barrier (%v)", berr) t.Errorf("could not enter on barrier (%v)", berr)
} }
donec <- struct{}{} <-donec
b.Leave() b.Leave()
donec <- struct{}{} <-donec
}() }()
} }
// wait for barrier enter to unblock // wait for barrier enter to unblock
for i := 0; i < waiters; i++ { for i := 0; i < waiters; i++ {
select { select {
case <-donec: case donec <- struct{}{}:
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for enter, %d", i) t.Fatalf("timed out waiting for enter, %d", i)
} }
@ -148,7 +150,7 @@ func TestDoubleBarrierFailover(t *testing.T) {
// join on rest of waiters // join on rest of waiters
for i := 0; i < waiters-1; i++ { for i := 0; i < waiters-1; i++ {
select { select {
case <-donec: case donec <- struct{}{}:
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for leave, %d", i) t.Fatalf("timed out waiting for leave, %d", i)
} }

View File

@ -778,7 +778,7 @@ func TestKVPutFailGetRetry(t *testing.T) {
t.Fatalf("got success on disconnected put, wanted error") t.Fatalf("got success on disconnected put, wanted error")
} }
donec := make(chan struct{}) donec := make(chan struct{}, 1)
go func() { go func() {
// Get will fail, but reconnect will trigger // Get will fail, but reconnect will trigger
gresp, gerr := kv.Get(context.TODO(), "foo") gresp, gerr := kv.Get(context.TODO(), "foo")

View File

@ -954,7 +954,7 @@ func TestV2WatchKeyInDir(t *testing.T) {
tc := NewTestClient() tc := NewTestClient()
var body map[string]interface{} var body map[string]interface{}
c := make(chan bool) c := make(chan bool, 1)
// Create an expiring directory // Create an expiring directory
v := url.Values{} v := url.Values{}

View File

@ -1097,7 +1097,7 @@ func TestV3WatchWithFilter(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
recv := make(chan *pb.WatchResponse) recv := make(chan *pb.WatchResponse, 1)
go func() { go func() {
// check received PUT // check received PUT
resp, rerr := ws.Recv() resp, rerr := ws.Recv()