Merge pull request #12870 from ptabor/20210416-fix-flake-TestSnapshotV3RestoreMultiMemberAdd-master

Fix TestSnapshotV3RestoreMultiMemberAdd flakes (leaks)
This commit is contained in:
Piotr Tabor 2021-04-16 21:49:46 +02:00 committed by GitHub
commit 5744cdf199
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 20 additions and 9 deletions

View File

@ -15,6 +15,7 @@
package transport
import (
"context"
"net"
"net/http"
"strings"
@ -31,11 +32,11 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
DialContext: (&net.Dialer{
Timeout: dialtimeoutd,
// value taken from http.DefaultTransport
KeepAlive: 30 * time.Second,
}).Dial,
}).DialContext,
// value taken from http.DefaultTransport
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: cfg,
@ -45,15 +46,20 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er
Timeout: dialtimeoutd,
KeepAlive: 30 * time.Second,
}
dial := func(net, addr string) (net.Conn, error) {
return dialer.Dial("unix", addr)
}
dialContext := func(ctx context.Context, net, addr string) (net.Conn, error) {
return dialer.DialContext(ctx, "unix", addr)
}
tu := &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: dial,
DialContext: dialContext,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: cfg,
// Cost of reopening connection on sockets is low, and they are mostly used in testing.
// Long living unix-transport connections were leading to 'leak' test flakes.
// Alternativly the returned Transport (t) should override CloseIdleConnections to
// forward it to 'tu' as well.
IdleConnTimeout: time.Microsecond,
}
ut := &unixTransport{tu}

View File

@ -15,6 +15,7 @@
package proxy
import (
"context"
"fmt"
"io"
mrand "math/rand"
@ -295,6 +296,7 @@ func (s *server) To() string {
func (s *server) listenAndServe() {
defer s.closeWg.Done()
ctx := context.Background()
s.lg.Info("proxy is listening on", zap.String("from", s.From()))
close(s.readyc)
@ -380,7 +382,7 @@ func (s *server) listenAndServe() {
}
continue
}
out, err = tp.Dial(s.to.Scheme, s.to.Host)
out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host)
} else {
out, err = net.Dial(s.to.Scheme, s.to.Host)
}

View File

@ -16,6 +16,7 @@ package proxy
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io/ioutil"
@ -615,7 +616,7 @@ func send(t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSI
if terr != nil {
t.Fatal(terr)
}
out, err = tp.Dial(scheme, addr)
out, err = tp.DialContext(context.Background(), scheme, addr)
} else {
out, err = net.Dial(scheme, addr)
}

View File

@ -19,6 +19,7 @@ import (
"context"
"errors"
"io/ioutil"
"runtime"
"sync"
"time"
@ -139,6 +140,7 @@ func (p *pipeline) post(data []byte) (err error) {
go func() {
select {
case <-done:
cancel()
case <-p.stopc:
waitSchedule()
cancel()
@ -173,4 +175,4 @@ func (p *pipeline) post(data []byte) (err error) {
}
// waitSchedule waits other goroutines to be scheduled for a while
func waitSchedule() { time.Sleep(time.Millisecond) }
func waitSchedule() { runtime.Gosched() }