diff --git a/pkg/ioutil/readcloser.go b/pkg/ioutil/readcloser.go new file mode 100644 index 000000000..7c1fda1c2 --- /dev/null +++ b/pkg/ioutil/readcloser.go @@ -0,0 +1,24 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ioutil + +import "io" + +// ReaderAndCloser implements io.ReadCloser interface by combining +// reader and closer together. +type ReaderAndCloser struct { + io.Reader + io.Closer +} diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index 8a9a99e93..9059a4c43 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -22,11 +22,17 @@ import ( "time" "github.com/coreos/etcd/pkg/httputil" + pioutil "github.com/coreos/etcd/pkg/ioutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/snap" ) +var ( + // timeout for reading snapshot response body + snapResponseReadTimeout = 5 * time.Second +) + type snapshotSender struct { from, to types.ID cid types.ID @@ -108,19 +114,17 @@ func (s *snapshotSender) post(req *http.Request) (err error) { result := make(chan responseAndError, 1) go func() { - // TODO: cancel the request if it has waited for a long time(~5s) after - // it has write out the full request body, which helps to avoid receiver - // dies when sender is waiting for response - // TODO: the snapshot could be large and eat up all resources when writing - // it out. Send it block by block and rest some time between to give the - // time for main loop to run. resp, err := s.tr.RoundTrip(req) if err != nil { result <- responseAndError{resp, nil, err} return } + + // close the response body when timeouts. + // prevents from reading the body forever when the other side dies right after + // successfully receives the request body. + time.AfterFunc(snapResponseReadTimeout, func() { resp.Body.Close() }) body, err := ioutil.ReadAll(resp.Body) - resp.Body.Close() result <- responseAndError{resp, body, err} }() @@ -136,12 +140,6 @@ func (s *snapshotSender) post(req *http.Request) (err error) { } } -// readCloser implements io.ReadCloser interface. -type readCloser struct { - io.Reader - io.Closer -} - func createSnapBody(merged snap.Message) io.ReadCloser { buf := new(bytes.Buffer) enc := &messageEncoder{w: buf} @@ -150,7 +148,7 @@ func createSnapBody(merged snap.Message) io.ReadCloser { plog.Panicf("encode message error (%v)", err) } - return &readCloser{ + return &pioutil.ReaderAndCloser{ Reader: io.MultiReader(buf, merged.ReadCloser), Closer: merged.ReadCloser, }