mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
*: Use http.Request.WithContext instead of Cancel
This commit is contained in:
parent
42e7d4d09d
commit
9def4cb9fe
@ -16,6 +16,7 @@ package leasehttp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
@ -26,7 +27,6 @@ import (
|
||||
"github.com/coreos/etcd/lease"
|
||||
"github.com/coreos/etcd/lease/leasepb"
|
||||
"github.com/coreos/etcd/pkg/httputil"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -202,45 +202,27 @@ func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/protobuf")
|
||||
|
||||
cancel := httputil.RequestCanceler(req)
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
cc := &http.Client{Transport: rt}
|
||||
var b []byte
|
||||
// buffer errc channel so that errc don't block inside the go routinue
|
||||
errc := make(chan error, 2)
|
||||
go func() {
|
||||
resp, err := cc.Do(req)
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
b, err = readResponse(resp)
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
if resp.StatusCode == http.StatusRequestTimeout {
|
||||
errc <- ErrLeaseHTTPTimeout
|
||||
return
|
||||
}
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
errc <- lease.ErrLeaseNotFound
|
||||
return
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
errc <- fmt.Errorf("lease: unknown error(%s)", string(b))
|
||||
return
|
||||
}
|
||||
errc <- nil
|
||||
}()
|
||||
select {
|
||||
case derr := <-errc:
|
||||
if derr != nil {
|
||||
return nil, derr
|
||||
}
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
return nil, ctx.Err()
|
||||
resp, err := cc.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
b, err = readResponse(resp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if resp.StatusCode == http.StatusRequestTimeout {
|
||||
return nil, ErrLeaseHTTPTimeout
|
||||
}
|
||||
if resp.StatusCode == http.StatusNotFound {
|
||||
return nil, lease.ErrLeaseNotFound
|
||||
}
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("lease: unknown error(%s)", string(b))
|
||||
}
|
||||
|
||||
lresp := &leasepb.LeaseInternalResponse{}
|
||||
|
@ -13,15 +13,6 @@ import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func RequestCanceler(req *http.Request) func() {
|
||||
ch := make(chan struct{})
|
||||
req.Cancel = ch
|
||||
|
||||
return func() {
|
||||
close(ch)
|
||||
}
|
||||
}
|
||||
|
||||
// GracefulClose drains http.Response.Body until it hits EOF
|
||||
// and closes it. This prevents TCP/TLS connections from closing,
|
||||
// therefore available for reuse.
|
||||
|
@ -16,6 +16,7 @@ package httpproxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@ -24,11 +25,9 @@ import (
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
|
||||
"github.com/coreos/etcd/pkg/httputil"
|
||||
"github.com/coreos/pkg/capnslog"
|
||||
)
|
||||
|
||||
@ -110,7 +109,9 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
var requestClosed int32
|
||||
completeCh := make(chan bool, 1)
|
||||
closeNotifier, ok := rw.(http.CloseNotifier)
|
||||
cancel := httputil.RequestCanceler(proxyreq)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
proxyreq = proxyreq.WithContext(ctx)
|
||||
defer cancel()
|
||||
if ok {
|
||||
closeCh := closeNotifier.CloseNotify()
|
||||
go func() {
|
||||
@ -118,7 +119,6 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
case <-closeCh:
|
||||
atomic.StoreInt32(&requestClosed, 1)
|
||||
plog.Printf("client %v closed request prematurely", clientreq.RemoteAddr)
|
||||
cancel()
|
||||
case <-completeCh:
|
||||
}
|
||||
}()
|
||||
|
Loading…
x
Reference in New Issue
Block a user