diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 8cde657c7..dc83626be 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -208,7 +208,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String()) } var l net.Listener - l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) + l, err = rafthttp.NewListener(u, cfg.peerTLSInfo) if err != nil { return nil, err } diff --git a/etcdserver/server.go b/etcdserver/server.go index b97d2e82a..8a8ee76a4 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -40,7 +40,6 @@ import ( "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/runtime" "github.com/coreos/etcd/pkg/timeutil" - "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/wait" "github.com/coreos/etcd/raft" @@ -211,12 +210,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { haveWAL := wal.Exist(cfg.WALDir()) ss := snap.New(cfg.SnapDir()) - // use timeout transport to pair with remote timeout listeners - pt, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, cfg.peerDialTimeout(), 0, 0) + prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout()) if err != nil { return nil, err } - prt := http.RoundTripper(pt) var remotes []*Member switch { case !haveWAL && !cfg.NewCluster: diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 3625bd001..8232003fb 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -136,15 +136,11 @@ type Transport struct { func (t *Transport) Start() error { var err error - // Read/write timeout is set for stream roundTripper to promptly - // find out broken status, which minimizes the number of messages - // sent on broken connection. - t.streamRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, ConnReadTimeout, ConnWriteTimeout) + t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout) if err != nil { return err } - // use timeout transport to pair with remote timeout listeners - t.pipelineRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, 0, 0) + t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout) if err != nil { return err } diff --git a/rafthttp/util.go b/rafthttp/util.go index dbf09c223..75a66cfd7 100644 --- a/rafthttp/util.go +++ b/rafthttp/util.go @@ -18,11 +18,14 @@ import ( "encoding/binary" "fmt" "io" + "net" "net/http" "net/url" "strings" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver" + "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/version" @@ -30,6 +33,30 @@ import ( var errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster") +// NewListener returns a listener for raft message transfer between peers. +// It uses timeout listener to identify broken streams promptly. +func NewListener(u url.URL, tlsInfo transport.TLSInfo) (net.Listener, error) { + return transport.NewTimeoutListener(u.Host, u.Scheme, tlsInfo, ConnReadTimeout, ConnWriteTimeout) +} + +// NewRoundTripper returns a roundTripper used to send requests +// to rafthttp listener of remote peers. +func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) { + // It uses timeout transport to pair with remote timeout listeners. + // It sets no read/write timeout, because message in requests may + // take long time to write out before reading out the response. + return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0) +} + +// newStreamRoundTripper returns a roundTripper used to send stream requests +// to rafthttp listener of remote peers. +// Read/write timeout is set for stream roundTripper to promptly +// find out broken status, which minimizes the number of messages +// sent on broken connection. +func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) { + return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout) +} + func writeEntryTo(w io.Writer, ent *raftpb.Entry) error { size := ent.Size() if err := binary.Write(w, binary.BigEndian, uint64(size)); err != nil {