From 207c92b62785d220edc74ed791fd8a22a18e378e Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sat, 10 Oct 2015 23:04:10 -0700 Subject: [PATCH] rafthttp: build transport inside pkg instead of passed-in rafthttp has different requirements for connections created by the transport for different usage, and this is hard to achieve when giving one http.RoundTripper. Pass into pkg the data needed to build transport now, and let rafthttp build its own transports. --- etcdmain/etcd.go | 13 +-------- etcdserver/config.go | 10 +++++-- etcdserver/server.go | 37 +++++++++++++++++--------- etcdserver/server_test.go | 2 +- integration/cluster_test.go | 3 +-- rafthttp/functional_test.go | 45 ++++++++++++++------------------ rafthttp/peer.go | 8 +++--- rafthttp/transport.go | 44 +++++++++++++++++++++++-------- rafthttp/transport_bench_test.go | 23 +++++++--------- rafthttp/transport_test.go | 29 ++++++++++---------- 10 files changed, 117 insertions(+), 97 deletions(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index f5191b5fb..8cde657c7 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -199,11 +199,6 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { return nil, fmt.Errorf("error setting up initial cluster: %v", err) } - pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, peerDialTimeout(cfg.ElectionMs), rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout) - if err != nil { - return nil, err - } - if !cfg.peerTLSInfo.Empty() { plog.Infof("peerTLS: %s", cfg.peerTLSInfo) } @@ -284,7 +279,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) { DiscoveryProxy: cfg.dproxy, NewCluster: cfg.isNewCluster(), ForceNewCluster: cfg.forceNewCluster, - Transport: pt, + PeerTLSInfo: cfg.peerTLSInfo, TickMs: cfg.TickMs, ElectionTicks: cfg.electionTicks(), V3demo: cfg.v3demo, @@ -534,9 +529,3 @@ func setupLogging(cfg *config) { repoLog.SetLogLevel(settings) } } - -func peerDialTimeout(electionMs uint) time.Duration { - // 1s for queue wait and system delay - // + one RTT, which is smaller than 1/5 election timeout - return time.Second + time.Duration(electionMs)*time.Millisecond/5 -} diff --git a/etcdserver/config.go b/etcdserver/config.go index 0ce09c572..ff28e4cff 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -16,13 +16,13 @@ package etcdserver import ( "fmt" - "net/http" "path" "sort" "strings" "time" "github.com/coreos/etcd/pkg/netutil" + "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" ) @@ -44,7 +44,7 @@ type ServerConfig struct { InitialClusterToken string NewCluster bool ForceNewCluster bool - Transport *http.Transport + PeerTLSInfo transport.TLSInfo TickMs uint ElectionTicks int @@ -132,6 +132,12 @@ func (c *ServerConfig) ReqTimeout() time.Duration { return 5*time.Second + 2*time.Duration(c.ElectionTicks)*time.Duration(c.TickMs)*time.Millisecond } +func (c *ServerConfig) peerDialTimeout() time.Duration { + // 1s for queue wait and system delay + // + one RTT, which is smaller than 1/5 election timeout + return time.Second + time.Duration(c.ElectionTicks)*time.Duration(c.TickMs)*time.Millisecond/5 +} + func (c *ServerConfig) PrintWithInitial() { c.print(true) } func (c *ServerConfig) Print() { c.print(false) } diff --git a/etcdserver/server.go b/etcdserver/server.go index 0506cc532..9247a1a04 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -40,6 +40,7 @@ import ( "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/runtime" "github.com/coreos/etcd/pkg/timeutil" + "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/pkg/wait" "github.com/coreos/etcd/raft" @@ -167,7 +168,9 @@ type EtcdServer struct { SyncTicker <-chan time.Time - reqIDGen *idutil.Generator + // versionTr used to send requests for peer version + versionTr *http.Transport + reqIDGen *idutil.Generator // forceVersionC is used to force the version monitor loop // to detect the cluster version immediately. @@ -205,6 +208,10 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { haveWAL := wal.Exist(cfg.WALDir()) ss := snap.New(cfg.SnapDir()) + pt, err := transport.NewTransport(cfg.PeerTLSInfo, cfg.peerDialTimeout()) + if err != nil { + return nil, err + } var remotes []*Member switch { case !haveWAL && !cfg.NewCluster: @@ -215,14 +222,14 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { if err != nil { return nil, err } - existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), cfg.Transport) + existingCluster, err := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), pt) if err != nil { return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", err) } if err := ValidateClusterAndAssignIDs(cl, existingCluster); err != nil { return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err) } - if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, cfg.Transport) { + if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, pt) { return nil, fmt.Errorf("incomptible with current running cluster") } @@ -240,7 +247,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { return nil, err } m := cl.MemberByName(cfg.Name) - if isMemberBootstrapped(cl, cfg.Name, cfg.Transport) { + if isMemberBootstrapped(cl, cfg.Name, pt) { return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID) } if cfg.ShouldDiscover() { @@ -328,6 +335,7 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { stats: sstats, lstats: lstats, SyncTicker: time.Tick(500 * time.Millisecond), + versionTr: pt, reqIDGen: idutil.NewGenerator(uint8(id), time.Now()), forceVersionC: make(chan struct{}), } @@ -346,15 +354,18 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { // TODO: move transport initialization near the definition of remote tr := &rafthttp.Transport{ - RoundTripper: cfg.Transport, - ID: id, - ClusterID: cl.ID(), - Raft: srv, - ServerStats: sstats, - LeaderStats: lstats, - ErrorC: srv.errorc, + TLSInfo: cfg.PeerTLSInfo, + DialTimeout: cfg.peerDialTimeout(), + ID: id, + ClusterID: cl.ID(), + Raft: srv, + ServerStats: sstats, + LeaderStats: lstats, + ErrorC: srv.errorc, + } + if err := tr.Start(); err != nil { + return nil, err } - tr.Start() // add all remotes into transport for _, m := range remotes { if m.ID != id { @@ -1032,7 +1043,7 @@ func (s *EtcdServer) monitorVersions() { continue } - v := decideClusterVersion(getVersions(s.cluster, s.id, s.cfg.Transport)) + v := decideClusterVersion(getVersions(s.cluster, s.id, s.versionTr)) if v != nil { // only keep major.minor version for comparasion v = &semver.Version{ diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index b51a15e6a..4ed74f6b5 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -1468,7 +1468,7 @@ func (n *readyNode) Ready() <-chan raft.Ready { return n.readyc } type nopTransporter struct{} -func (s *nopTransporter) Start() {} +func (s *nopTransporter) Start() error { return nil } func (s *nopTransporter) Handler() http.Handler { return nil } func (s *nopTransporter) Send(m []raftpb.Message) {} func (s *nopTransporter) AddRemote(id types.ID, us []string) {} diff --git a/integration/cluster_test.go b/integration/cluster_test.go index 0f02fb137..75e1bbf04 100644 --- a/integration/cluster_test.go +++ b/integration/cluster_test.go @@ -688,7 +688,7 @@ func mustNewMember(t *testing.T, name string, usePeerTLS bool) *member { } m.InitialClusterToken = clusterName m.NewCluster = true - m.Transport = mustNewTransport(t, m.PeerTLSInfo) + m.ServerConfig.PeerTLSInfo = m.PeerTLSInfo m.ElectionTicks = electionTicks m.TickMs = uint(tickDuration / time.Millisecond) return m @@ -720,7 +720,6 @@ func (m *member) Clone(t *testing.T) *member { panic(err) } mm.InitialClusterToken = m.InitialClusterToken - mm.Transport = mustNewTransport(t, m.PeerTLSInfo) mm.ElectionTicks = m.ElectionTicks mm.PeerTLSInfo = m.PeerTLSInfo return mm diff --git a/rafthttp/functional_test.go b/rafthttp/functional_test.go index b665857c0..542c281f7 100644 --- a/rafthttp/functional_test.go +++ b/rafthttp/functional_test.go @@ -15,7 +15,6 @@ package rafthttp import ( - "net/http" "net/http/httptest" "reflect" "testing" @@ -31,12 +30,11 @@ import ( func TestSendMessage(t *testing.T) { // member 1 tr := &Transport{ - RoundTripper: &http.Transport{}, - ID: types.ID(1), - ClusterID: types.ID(1), - Raft: &fakeRaft{}, - ServerStats: newServerStats(), - LeaderStats: stats.NewLeaderStats("1"), + ID: types.ID(1), + ClusterID: types.ID(1), + Raft: &fakeRaft{}, + ServerStats: newServerStats(), + LeaderStats: stats.NewLeaderStats("1"), } tr.Start() srv := httptest.NewServer(tr.Handler()) @@ -46,12 +44,11 @@ func TestSendMessage(t *testing.T) { recvc := make(chan raftpb.Message, 1) p := &fakeRaft{recvc: recvc} tr2 := &Transport{ - RoundTripper: &http.Transport{}, - ID: types.ID(2), - ClusterID: types.ID(1), - Raft: p, - ServerStats: newServerStats(), - LeaderStats: stats.NewLeaderStats("2"), + ID: types.ID(2), + ClusterID: types.ID(1), + Raft: p, + ServerStats: newServerStats(), + LeaderStats: stats.NewLeaderStats("2"), } tr2.Start() srv2 := httptest.NewServer(tr2.Handler()) @@ -92,12 +89,11 @@ func TestSendMessage(t *testing.T) { func TestSendMessageWhenStreamIsBroken(t *testing.T) { // member 1 tr := &Transport{ - RoundTripper: &http.Transport{}, - ID: types.ID(1), - ClusterID: types.ID(1), - Raft: &fakeRaft{}, - ServerStats: newServerStats(), - LeaderStats: stats.NewLeaderStats("1"), + ID: types.ID(1), + ClusterID: types.ID(1), + Raft: &fakeRaft{}, + ServerStats: newServerStats(), + LeaderStats: stats.NewLeaderStats("1"), } tr.Start() srv := httptest.NewServer(tr.Handler()) @@ -107,12 +103,11 @@ func TestSendMessageWhenStreamIsBroken(t *testing.T) { recvc := make(chan raftpb.Message, 1) p := &fakeRaft{recvc: recvc} tr2 := &Transport{ - RoundTripper: &http.Transport{}, - ID: types.ID(2), - ClusterID: types.ID(1), - Raft: p, - ServerStats: newServerStats(), - LeaderStats: stats.NewLeaderStats("2"), + ID: types.ID(2), + ClusterID: types.ID(1), + Raft: p, + ServerStats: newServerStats(), + LeaderStats: stats.NewLeaderStats("2"), } tr2.Start() srv2 := httptest.NewServer(tr2.Handler()) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 2be3fbeea..93396d21c 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -111,7 +111,7 @@ type peer struct { done chan struct{} } -func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, term uint64) *peer { +func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, term uint64) *peer { picker := newURLPicker(urls) status := newPeerStatus(to) p := &peer{ @@ -120,7 +120,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r status: status, msgAppWriter: startStreamWriter(to, status, fs, r), writer: startStreamWriter(to, status, fs, r), - pipeline: newPipeline(tr, picker, local, to, cid, status, fs, r, errorc), + pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), propc: make(chan raftpb.Message, maxPendingProposals), @@ -148,8 +148,8 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r } }() - p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc, term) - reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc, term) + p.msgAppReader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc, term) + reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc, term) go func() { var paused bool for { diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 653611cb3..282121fac 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -23,6 +23,7 @@ import ( "github.com/coreos/etcd/Godeps/_workspace/src/github.com/xiang90/probing" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" @@ -40,7 +41,7 @@ type Raft interface { type Transporter interface { // Start starts the given Transporter. // Start MUST be called before calling other functions in the interface. - Start() + Start() error // Handler returns the HTTP handler of the transporter. // A transporter HTTP handler handles the HTTP requests // from remote peers. @@ -88,11 +89,13 @@ type Transporter interface { // User needs to call Start before calling other functions, and call // Stop when the Transport is no longer used. type Transport struct { - RoundTripper http.RoundTripper // roundTripper to send requests - ID types.ID // local member ID - ClusterID types.ID // raft cluster ID for request validation - Raft Raft // raft state machine, to which the Transport forwards received messages and reports status - ServerStats *stats.ServerStats // used to record general transportation statistics + DialTimeout time.Duration // maximum duration before timing out dial of the request + TLSInfo transport.TLSInfo // TLS information used when creating connection + + ID types.ID // local member ID + ClusterID types.ID // raft cluster ID for request validation + Raft Raft // raft state machine, to which the Transport forwards received messages and reports status + ServerStats *stats.ServerStats // used to record general transportation statistics // used to record transportation statistics with followers when // performing as leader in raft protocol LeaderStats *stats.LeaderStats @@ -102,6 +105,9 @@ type Transport struct { // machine and thus stop the Transport. ErrorC chan error + streamRt http.RoundTripper // roundTripper used by streams + pipelineRt http.RoundTripper // roundTripper used by pipelines + mu sync.RWMutex // protect the term, remote and peer map term uint64 // the latest term that has been observed remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up @@ -110,10 +116,23 @@ type Transport struct { prober probing.Prober } -func (t *Transport) Start() { +func (t *Transport) Start() error { + var err error + // Read/write timeout is set for stream roundTripper to promptly + // find out broken status, which minimizes the number of messages + // sent on broken connection. + t.streamRt, err = transport.NewTimeoutTransport(t.TLSInfo, t.DialTimeout, ConnReadTimeout, ConnWriteTimeout) + if err != nil { + return err + } + t.pipelineRt, err = transport.NewTransport(t.TLSInfo, t.DialTimeout) + if err != nil { + return err + } t.remotes = make(map[types.ID]*remote) t.peers = make(map[types.ID]Peer) - t.prober = probing.NewProber(t.RoundTripper) + t.prober = probing.NewProber(t.pipelineRt) + return nil } func (t *Transport) Handler() http.Handler { @@ -183,7 +202,10 @@ func (t *Transport) Stop() { p.Stop() } t.prober.RemoveAll() - if tr, ok := t.RoundTripper.(*http.Transport); ok { + if tr, ok := t.streamRt.(*http.Transport); ok { + tr.CloseIdleConnections() + } + if tr, ok := t.pipelineRt.(*http.Transport); ok { tr.CloseIdleConnections() } } @@ -198,7 +220,7 @@ func (t *Transport) AddRemote(id types.ID, us []string) { if err != nil { plog.Panicf("newURLs %+v should never fail: %+v", us, err) } - t.remotes[id] = startRemote(t.RoundTripper, urls, t.ID, id, t.ClusterID, t.Raft, t.ErrorC) + t.remotes[id] = startRemote(t.pipelineRt, urls, t.ID, id, t.ClusterID, t.Raft, t.ErrorC) } func (t *Transport) AddPeer(id types.ID, us []string) { @@ -212,7 +234,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { plog.Panicf("newURLs %+v should never fail: %+v", us, err) } fs := t.LeaderStats.Follower(id.String()) - t.peers[id] = startPeer(t.RoundTripper, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.term) + t.peers[id] = startPeer(t.streamRt, t.pipelineRt, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC, t.term) addPeerToProber(t.prober, id.String(), us) } diff --git a/rafthttp/transport_bench_test.go b/rafthttp/transport_bench_test.go index 1b9bbe9ed..cfbdebab1 100644 --- a/rafthttp/transport_bench_test.go +++ b/rafthttp/transport_bench_test.go @@ -15,7 +15,6 @@ package rafthttp import ( - "net/http" "net/http/httptest" "sync" "testing" @@ -31,12 +30,11 @@ import ( func BenchmarkSendingMsgApp(b *testing.B) { // member 1 tr := &Transport{ - RoundTripper: &http.Transport{}, - ID: types.ID(1), - ClusterID: types.ID(1), - Raft: &fakeRaft{}, - ServerStats: newServerStats(), - LeaderStats: stats.NewLeaderStats("1"), + ID: types.ID(1), + ClusterID: types.ID(1), + Raft: &fakeRaft{}, + ServerStats: newServerStats(), + LeaderStats: stats.NewLeaderStats("1"), } tr.Start() srv := httptest.NewServer(tr.Handler()) @@ -45,12 +43,11 @@ func BenchmarkSendingMsgApp(b *testing.B) { // member 2 r := &countRaft{} tr2 := &Transport{ - RoundTripper: &http.Transport{}, - ID: types.ID(2), - ClusterID: types.ID(1), - Raft: r, - ServerStats: newServerStats(), - LeaderStats: stats.NewLeaderStats("2"), + ID: types.ID(2), + ClusterID: types.ID(1), + Raft: r, + ServerStats: newServerStats(), + LeaderStats: stats.NewLeaderStats("2"), } tr2.Start() srv2 := httptest.NewServer(tr2.Handler()) diff --git a/rafthttp/transport_test.go b/rafthttp/transport_test.go index 9013100c8..6b6b855b6 100644 --- a/rafthttp/transport_test.go +++ b/rafthttp/transport_test.go @@ -70,11 +70,11 @@ func TestTransportAdd(t *testing.T) { ls := stats.NewLeaderStats("") term := uint64(10) tr := &Transport{ - RoundTripper: &roundTripperRecorder{}, - LeaderStats: ls, - term: term, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: ls, + streamRt: &roundTripperRecorder{}, + term: term, + peers: make(map[types.ID]Peer), + prober: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) @@ -103,10 +103,10 @@ func TestTransportAdd(t *testing.T) { func TestTransportRemove(t *testing.T) { tr := &Transport{ - RoundTripper: &roundTripperRecorder{}, - LeaderStats: stats.NewLeaderStats(""), - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: stats.NewLeaderStats(""), + streamRt: &roundTripperRecorder{}, + peers: make(map[types.ID]Peer), + prober: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) tr.RemovePeer(types.ID(1)) @@ -134,11 +134,12 @@ func TestTransportUpdate(t *testing.T) { func TestTransportErrorc(t *testing.T) { errorc := make(chan error, 1) tr := &Transport{ - RoundTripper: newRespRoundTripper(http.StatusForbidden, nil), - LeaderStats: stats.NewLeaderStats(""), - ErrorC: errorc, - peers: make(map[types.ID]Peer), - prober: probing.NewProber(nil), + LeaderStats: stats.NewLeaderStats(""), + ErrorC: errorc, + streamRt: newRespRoundTripper(http.StatusForbidden, nil), + pipelineRt: newRespRoundTripper(http.StatusForbidden, nil), + peers: make(map[types.ID]Peer), + prober: probing.NewProber(nil), } tr.AddPeer(1, []string{"http://localhost:2380"}) defer tr.Stop()