From 70a7674e20998acac9280ed0f1ca7f9d2a7acf19 Mon Sep 17 00:00:00 2001 From: Christian Provenzano <18606244+caproven@users.noreply.github.com> Date: Tue, 10 May 2022 23:36:14 -0400 Subject: [PATCH 1/4] server: Director can be stopped Goroutine for new directors would live past director scope. Tests could occassionally fail if this goroutine had log events after test execution should have ended. --- server/proxy/httpproxy/director.go | 13 ++++++++++--- server/proxy/httpproxy/director_test.go | 10 +++++++++- server/proxy/httpproxy/proxy.go | 2 +- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/server/proxy/httpproxy/director.go b/server/proxy/httpproxy/director.go index e20e2226a..ad50dd07b 100644 --- a/server/proxy/httpproxy/director.go +++ b/server/proxy/httpproxy/director.go @@ -33,7 +33,7 @@ func init() { rand.Seed(time.Now().UnixNano()) } -func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director { +func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration, stop <-chan struct{}, donec chan<- struct{}) *director { if lg == nil { lg = zap.NewNop() } @@ -44,6 +44,9 @@ func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duratio } d.refresh() go func() { + if donec != nil { + defer close(donec) + } // In order to prevent missing proxy endpoints in the first try: // when given refresh interval of defaultRefreshInterval or greater // and whenever there is no available proxy endpoints, @@ -65,8 +68,12 @@ func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duratio lg.Info("endpoints found", zap.Strings("endpoints", sl)) }) } - time.Sleep(ri) - d.refresh() + select { + case <-time.After(ri): + d.refresh() + case <-stop: + return + } } }() return d diff --git a/server/proxy/httpproxy/director_test.go b/server/proxy/httpproxy/director_test.go index 9bba5987c..7eb089a1f 100644 --- a/server/proxy/httpproxy/director_test.go +++ b/server/proxy/httpproxy/director_test.go @@ -55,7 +55,8 @@ func TestNewDirectorScheme(t *testing.T) { uf := func() []string { return tt.urls } - got := newDirector(zaptest.NewLogger(t), uf, time.Minute, time.Minute) + stop, donec := make(chan struct{}), make(chan struct{}) + got := newDirector(zaptest.NewLogger(t), uf, time.Minute, time.Minute, stop, donec) var gep []string for _, ep := range got.ep { @@ -66,6 +67,13 @@ func TestNewDirectorScheme(t *testing.T) { if !reflect.DeepEqual(tt.want, gep) { t.Errorf("#%d: want endpoints = %#v, got = %#v", i, tt.want, gep) } + + close(stop) + select { + case <-donec: + case <-time.After(time.Second): + t.Fatalf("done took too long") + } } } diff --git a/server/proxy/httpproxy/proxy.go b/server/proxy/httpproxy/proxy.go index c8f27bf01..6874604e8 100644 --- a/server/proxy/httpproxy/proxy.go +++ b/server/proxy/httpproxy/proxy.go @@ -58,7 +58,7 @@ func NewHandler(lg *zap.Logger, t *http.Transport, urlsFunc GetProxyURLs, failur p := &reverseProxy{ lg: lg, - director: newDirector(lg, urlsFunc, failureWait, refreshInterval), + director: newDirector(lg, urlsFunc, failureWait, refreshInterval, nil, nil), transport: t, } From 655d171ecaadbaecb53177c20769abdd340603f2 Mon Sep 17 00:00:00 2001 From: Christian Provenzano <18606244+caproven@users.noreply.github.com> Date: Wed, 11 May 2022 18:19:22 -0400 Subject: [PATCH 2/4] server: Add director interrupt handler Director's goroutine would not be properly stopped in a non-test scenario. Handler stops it when process is interrupted. --- server/proxy/httpproxy/director.go | 16 +++++++++++----- server/proxy/httpproxy/director_test.go | 7 +++---- server/proxy/httpproxy/proxy.go | 2 +- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/server/proxy/httpproxy/director.go b/server/proxy/httpproxy/director.go index ad50dd07b..add97621d 100644 --- a/server/proxy/httpproxy/director.go +++ b/server/proxy/httpproxy/director.go @@ -20,6 +20,7 @@ import ( "sync" "time" + "go.etcd.io/etcd/pkg/v3/osutil" "go.uber.org/zap" ) @@ -33,7 +34,7 @@ func init() { rand.Seed(time.Now().UnixNano()) } -func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration, stop <-chan struct{}, donec chan<- struct{}) *director { +func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director { if lg == nil { lg = zap.NewNop() } @@ -41,12 +42,15 @@ func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duratio lg: lg, uf: urlsFunc, failureWait: failureWait, + stopc: make(chan struct{}), + donec: make(chan struct{}), } + osutil.RegisterInterruptHandler(func() { + close(d.stopc) + }) d.refresh() go func() { - if donec != nil { - defer close(donec) - } + defer close(d.donec) // In order to prevent missing proxy endpoints in the first try: // when given refresh interval of defaultRefreshInterval or greater // and whenever there is no available proxy endpoints, @@ -71,7 +75,7 @@ func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duratio select { case <-time.After(ri): d.refresh() - case <-stop: + case <-d.stopc: return } } @@ -85,6 +89,8 @@ type director struct { ep []*endpoint uf GetProxyURLs failureWait time.Duration + stopc chan struct{} + donec chan struct{} } func (d *director) refresh() { diff --git a/server/proxy/httpproxy/director_test.go b/server/proxy/httpproxy/director_test.go index 7eb089a1f..6f831d1aa 100644 --- a/server/proxy/httpproxy/director_test.go +++ b/server/proxy/httpproxy/director_test.go @@ -55,8 +55,7 @@ func TestNewDirectorScheme(t *testing.T) { uf := func() []string { return tt.urls } - stop, donec := make(chan struct{}), make(chan struct{}) - got := newDirector(zaptest.NewLogger(t), uf, time.Minute, time.Minute, stop, donec) + got := newDirector(zaptest.NewLogger(t), uf, time.Minute, time.Minute) var gep []string for _, ep := range got.ep { @@ -68,9 +67,9 @@ func TestNewDirectorScheme(t *testing.T) { t.Errorf("#%d: want endpoints = %#v, got = %#v", i, tt.want, gep) } - close(stop) + close(got.stopc) select { - case <-donec: + case <-got.donec: case <-time.After(time.Second): t.Fatalf("done took too long") } diff --git a/server/proxy/httpproxy/proxy.go b/server/proxy/httpproxy/proxy.go index 6874604e8..c8f27bf01 100644 --- a/server/proxy/httpproxy/proxy.go +++ b/server/proxy/httpproxy/proxy.go @@ -58,7 +58,7 @@ func NewHandler(lg *zap.Logger, t *http.Transport, urlsFunc GetProxyURLs, failur p := &reverseProxy{ lg: lg, - director: newDirector(lg, urlsFunc, failureWait, refreshInterval, nil, nil), + director: newDirector(lg, urlsFunc, failureWait, refreshInterval), transport: t, } From b5b466088d8858ab37493d1065aa60ae76d3c138 Mon Sep 17 00:00:00 2001 From: Christian Provenzano <18606244+caproven@users.noreply.github.com> Date: Wed, 11 May 2022 20:41:10 -0400 Subject: [PATCH 3/4] server: Move director interrupt handler to method --- server/proxy/httpproxy/director.go | 13 ++++++++++--- server/proxy/httpproxy/director_test.go | 7 +------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/server/proxy/httpproxy/director.go b/server/proxy/httpproxy/director.go index add97621d..6862e2a69 100644 --- a/server/proxy/httpproxy/director.go +++ b/server/proxy/httpproxy/director.go @@ -45,9 +45,7 @@ func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duratio stopc: make(chan struct{}), donec: make(chan struct{}), } - osutil.RegisterInterruptHandler(func() { - close(d.stopc) - }) + osutil.RegisterInterruptHandler(d.stop) d.refresh() go func() { defer close(d.donec) @@ -129,6 +127,15 @@ func (d *director) endpoints() []*endpoint { return filtered } +func (d *director) stop() { + close(d.stopc) + select { + case <-d.donec: + case <-time.After(time.Second): + d.lg.Warn("timed out waiting for director to stop") + } +} + func newEndpoint(lg *zap.Logger, u url.URL, failureWait time.Duration) *endpoint { ep := endpoint{ lg: lg, diff --git a/server/proxy/httpproxy/director_test.go b/server/proxy/httpproxy/director_test.go index 6f831d1aa..7c6716fe3 100644 --- a/server/proxy/httpproxy/director_test.go +++ b/server/proxy/httpproxy/director_test.go @@ -67,12 +67,7 @@ func TestNewDirectorScheme(t *testing.T) { t.Errorf("#%d: want endpoints = %#v, got = %#v", i, tt.want, gep) } - close(got.stopc) - select { - case <-got.donec: - case <-time.After(time.Second): - t.Fatalf("done took too long") - } + got.stop() } } From c1e58ee91fbaf318f00540e6600a61dcd6611a9b Mon Sep 17 00:00:00 2001 From: Christian Provenzano <18606244+caproven@users.noreply.github.com> Date: Wed, 11 May 2022 21:21:06 -0400 Subject: [PATCH 4/4] server: Don't register director interrupt handler --- server/proxy/httpproxy/director.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/proxy/httpproxy/director.go b/server/proxy/httpproxy/director.go index 6862e2a69..43db09a8f 100644 --- a/server/proxy/httpproxy/director.go +++ b/server/proxy/httpproxy/director.go @@ -20,7 +20,6 @@ import ( "sync" "time" - "go.etcd.io/etcd/pkg/v3/osutil" "go.uber.org/zap" ) @@ -45,7 +44,6 @@ func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duratio stopc: make(chan struct{}), donec: make(chan struct{}), } - osutil.RegisterInterruptHandler(d.stop) d.refresh() go func() { defer close(d.donec)