Merge pull request #13966 from VladSaioc/blocking-readyc

Fixed potential goroutine leak due to p.Ready() receive in pkg/proxy and dependents.
This commit is contained in:
Piotr Tabor 2022-04-28 16:12:02 +02:00 committed by GitHub
commit 52ea811af8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 10 deletions

View File

@ -73,7 +73,9 @@ func testServer(t *testing.T, scheme string, secure bool, delayTx bool) {
cfg.TLSInfo = tlsInfo
}
p := NewServer(cfg)
<-p.Ready()
waitForServer(t, p)
defer p.Close()
data1 := []byte("Hello World!")
@ -196,7 +198,9 @@ func testServerDelayAccept(t *testing.T, secure bool) {
cfg.TLSInfo = tlsInfo
}
p := NewServer(cfg)
<-p.Ready()
waitForServer(t, p)
defer p.Close()
data := []byte("Hello World!")
@ -246,7 +250,9 @@ func TestServer_PauseTx(t *testing.T) {
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
})
<-p.Ready()
waitForServer(t, p)
defer p.Close()
p.PauseTx()
@ -293,7 +299,9 @@ func TestServer_ModifyTx_corrupt(t *testing.T) {
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
})
<-p.Ready()
waitForServer(t, p)
defer p.Close()
p.ModifyTx(func(d []byte) []byte {
@ -329,7 +337,9 @@ func TestServer_ModifyTx_packet_loss(t *testing.T) {
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
})
<-p.Ready()
waitForServer(t, p)
defer p.Close()
// 50% packet loss
@ -366,7 +376,9 @@ func TestServer_BlackholeTx(t *testing.T) {
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
})
<-p.Ready()
waitForServer(t, p)
defer p.Close()
p.BlackholeTx()
@ -417,7 +429,9 @@ func TestServer_Shutdown(t *testing.T) {
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
})
<-p.Ready()
waitForServer(t, p)
defer p.Close()
s, _ := p.(*server)
@ -448,7 +462,9 @@ func TestServer_ShutdownListener(t *testing.T) {
From: url.URL{Scheme: scheme, Host: srcAddr},
To: url.URL{Scheme: scheme, Host: dstAddr},
})
<-p.Ready()
waitForServer(t, p)
defer p.Close()
// shut down destination
@ -527,7 +543,9 @@ func testServerHTTP(t *testing.T, secure, delayTx bool) {
cfg.TLSInfo = tlsInfo
}
p := NewServer(cfg)
<-p.Ready()
waitForServer(t, p)
defer func() {
lg.Info("closing Proxy server...")
p.Close()
@ -670,3 +688,13 @@ func receive(t *testing.T, ln net.Listener) (data []byte) {
}
return buf.Bytes()
}
// Waits until a proxy is ready to serve.
// Aborts test on proxy start-up error.
func waitForServer(t *testing.T, s Server) {
select {
case <-s.Ready():
case err := <-s.Error():
t.Fatal(err)
}
}

View File

@ -87,7 +87,13 @@ $ ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
zap.Int("port", httpPort))
}
p := proxy.NewServer(cfg)
<-p.Ready()
select {
case <-p.Ready():
case err := <-p.Error():
panic(err)
}
defer p.Close()
mux := http.NewServeMux()