Merge pull request #14032 from caproven/close_director_goroutine

server: Director can be stopped
This commit is contained in:
Benjamin Wang 2022-05-13 17:43:32 +08:00 committed by GitHub
commit 153824be58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 2 deletions

View File

@ -41,9 +41,12 @@ func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duratio
lg: lg,
uf: urlsFunc,
failureWait: failureWait,
stopc: make(chan struct{}),
donec: make(chan struct{}),
}
d.refresh()
go func() {
defer close(d.donec)
// In order to prevent missing proxy endpoints in the first try:
// when given refresh interval of defaultRefreshInterval or greater
// and whenever there is no available proxy endpoints,
@ -65,8 +68,12 @@ func newDirector(lg *zap.Logger, urlsFunc GetProxyURLs, failureWait time.Duratio
lg.Info("endpoints found", zap.Strings("endpoints", sl))
})
}
time.Sleep(ri)
d.refresh()
select {
case <-time.After(ri):
d.refresh()
case <-d.stopc:
return
}
}
}()
return d
@ -78,6 +85,8 @@ type director struct {
ep []*endpoint
uf GetProxyURLs
failureWait time.Duration
stopc chan struct{}
donec chan struct{}
}
func (d *director) refresh() {
@ -116,6 +125,15 @@ func (d *director) endpoints() []*endpoint {
return filtered
}
func (d *director) stop() {
close(d.stopc)
select {
case <-d.donec:
case <-time.After(time.Second):
d.lg.Warn("timed out waiting for director to stop")
}
}
func newEndpoint(lg *zap.Logger, u url.URL, failureWait time.Duration) *endpoint {
ep := endpoint{
lg: lg,

View File

@ -66,6 +66,8 @@ func TestNewDirectorScheme(t *testing.T) {
if !reflect.DeepEqual(tt.want, gep) {
t.Errorf("#%d: want endpoints = %#v, got = %#v", i, tt.want, gep)
}
got.stop()
}
}