mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #2921 from xiang90/fix_watch_cancel
client: fix cancel watch
This commit is contained in:
commit
05b55d9d75
@ -293,7 +293,23 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
var body []byte
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
body, err = ioutil.ReadAll(resp.Body)
|
||||
done <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = resp.Body.Close()
|
||||
<-done
|
||||
if err == nil {
|
||||
err = ctx.Err()
|
||||
}
|
||||
case <-done:
|
||||
}
|
||||
|
||||
return resp, body, err
|
||||
}
|
||||
|
||||
|
@ -186,8 +186,11 @@ type checkableReadCloser struct {
|
||||
}
|
||||
|
||||
func (c *checkableReadCloser) Close() error {
|
||||
c.closed = true
|
||||
return c.ReadCloser.Close()
|
||||
if !c.closed {
|
||||
c.closed = true
|
||||
return c.ReadCloser.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSimpleHTTPClientDoCancelContextResponseBodyClosed(t *testing.T) {
|
||||
@ -218,6 +221,43 @@ func TestSimpleHTTPClientDoCancelContextResponseBodyClosed(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
type blockingBody struct {
|
||||
c chan struct{}
|
||||
}
|
||||
|
||||
func (bb *blockingBody) Read(p []byte) (n int, err error) {
|
||||
<-bb.c
|
||||
return 0, errors.New("closed")
|
||||
}
|
||||
|
||||
func (bb *blockingBody) Close() error {
|
||||
close(bb.c)
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSimpleHTTPClientDoCancelContextResponseBodyClosedWithBlockingBody(t *testing.T) {
|
||||
tr := newFakeTransport()
|
||||
c := &simpleHTTPClient{transport: tr}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
body := &checkableReadCloser{ReadCloser: &blockingBody{c: make(chan struct{})}}
|
||||
go func() {
|
||||
tr.respchan <- &http.Response{Body: body}
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
// cancel after the body is received
|
||||
cancel()
|
||||
}()
|
||||
|
||||
_, _, err := c.Do(ctx, &fakeAction{})
|
||||
if err == nil {
|
||||
t.Fatalf("expected non-nil error, got nil")
|
||||
}
|
||||
|
||||
if !body.closed {
|
||||
t.Fatalf("expected closed body")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSimpleHTTPClientDoCancelContextWaitForRoundTrip(t *testing.T) {
|
||||
tr := newFakeTransport()
|
||||
c := &simpleHTTPClient{transport: tr}
|
||||
|
Loading…
x
Reference in New Issue
Block a user