diff --git a/client/pkg/transport/transport.go b/client/pkg/transport/transport.go index 949610d46..648512772 100644 --- a/client/pkg/transport/transport.go +++ b/client/pkg/transport/transport.go @@ -15,6 +15,7 @@ package transport import ( + "context" "net" "net/http" "strings" @@ -31,11 +32,11 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er t := &http.Transport{ Proxy: http.ProxyFromEnvironment, - Dial: (&net.Dialer{ + DialContext: (&net.Dialer{ Timeout: dialtimeoutd, // value taken from http.DefaultTransport KeepAlive: 30 * time.Second, - }).Dial, + }).DialContext, // value taken from http.DefaultTransport TLSHandshakeTimeout: 10 * time.Second, TLSClientConfig: cfg, @@ -45,15 +46,20 @@ func NewTransport(info TLSInfo, dialtimeoutd time.Duration) (*http.Transport, er Timeout: dialtimeoutd, KeepAlive: 30 * time.Second, } - dial := func(net, addr string) (net.Conn, error) { - return dialer.Dial("unix", addr) - } + dialContext := func(ctx context.Context, net, addr string) (net.Conn, error) { + return dialer.DialContext(ctx, "unix", addr) + } tu := &http.Transport{ Proxy: http.ProxyFromEnvironment, - Dial: dial, + DialContext: dialContext, TLSHandshakeTimeout: 10 * time.Second, TLSClientConfig: cfg, + // Cost of reopening connection on sockets is low, and they are mostly used in testing. + // Long living unix-transport connections were leading to 'leak' test flakes. + // Alternativly the returned Transport (t) should override CloseIdleConnections to + // forward it to 'tu' as well. + IdleConnTimeout: time.Microsecond, } ut := &unixTransport{tu} diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index ebbdd2379..480a9492b 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -15,6 +15,7 @@ package proxy import ( + "context" "fmt" "io" mrand "math/rand" @@ -295,6 +296,7 @@ func (s *server) To() string { func (s *server) listenAndServe() { defer s.closeWg.Done() + ctx := context.Background() s.lg.Info("proxy is listening on", zap.String("from", s.From())) close(s.readyc) @@ -380,7 +382,7 @@ func (s *server) listenAndServe() { } continue } - out, err = tp.Dial(s.to.Scheme, s.to.Host) + out, err = tp.DialContext(ctx, s.to.Scheme, s.to.Host) } else { out, err = net.Dial(s.to.Scheme, s.to.Host) } diff --git a/pkg/proxy/server_test.go b/pkg/proxy/server_test.go index 77ee64320..c634055e6 100644 --- a/pkg/proxy/server_test.go +++ b/pkg/proxy/server_test.go @@ -16,6 +16,7 @@ package proxy import ( "bytes" + "context" "crypto/tls" "fmt" "io/ioutil" @@ -615,7 +616,7 @@ func send(t *testing.T, data []byte, scheme, addr string, tlsInfo transport.TLSI if terr != nil { t.Fatal(terr) } - out, err = tp.Dial(scheme, addr) + out, err = tp.DialContext(context.Background(), scheme, addr) } else { out, err = net.Dial(scheme, addr) } diff --git a/server/etcdserver/api/rafthttp/pipeline.go b/server/etcdserver/api/rafthttp/pipeline.go index 19facc852..de3b45911 100644 --- a/server/etcdserver/api/rafthttp/pipeline.go +++ b/server/etcdserver/api/rafthttp/pipeline.go @@ -19,6 +19,7 @@ import ( "context" "errors" "io/ioutil" + "runtime" "sync" "time" @@ -139,6 +140,7 @@ func (p *pipeline) post(data []byte) (err error) { go func() { select { case <-done: + cancel() case <-p.stopc: waitSchedule() cancel() @@ -173,4 +175,4 @@ func (p *pipeline) post(data []byte) (err error) { } // waitSchedule waits other goroutines to be scheduled for a while -func waitSchedule() { time.Sleep(time.Millisecond) } +func waitSchedule() { runtime.Gosched() }