From 08f839e32c5ad2386d581167a51475f4aa03b2b6 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 25 Nov 2014 21:38:57 -0800 Subject: [PATCH] rafthttp: set the API boundary of the package --- etcdserver/etcdhttp/peer.go | 5 +- etcdserver/server.go | 54 ++++++++++------ etcdserver/server_test.go | 81 +++++++++++++----------- rafthttp/http.go | 4 -- {etcdserver => rafthttp}/sendhub.go | 61 ++++++++---------- {etcdserver => rafthttp}/sendhub_test.go | 43 +++---------- rafthttp/transport.go | 49 ++++++++++++++ 7 files changed, 166 insertions(+), 131 deletions(-) rename {etcdserver => rafthttp}/sendhub.go (68%) rename {etcdserver => rafthttp}/sendhub_test.go (63%) create mode 100644 rafthttp/transport.go diff --git a/etcdserver/etcdhttp/peer.go b/etcdserver/etcdhttp/peer.go index aa4052ff2..f92e80dd9 100644 --- a/etcdserver/etcdhttp/peer.go +++ b/etcdserver/etcdhttp/peer.go @@ -31,16 +31,13 @@ const ( // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler { - rh := rafthttp.NewHandler(server, server.Cluster.ID()) - rsh := rafthttp.NewStreamHandler(server.SenderFinder(), server.ID(), server.Cluster.ID()) mh := &peerMembersHandler{ clusterInfo: server.Cluster, } mux := http.NewServeMux() mux.HandleFunc("/", http.NotFound) - mux.Handle(rafthttp.RaftPrefix, rh) - mux.Handle(rafthttp.RaftStreamPrefix+"/", rsh) + mux.Handle(rafthttp.RaftPrefix, server.RaftHandler()) mux.Handle(peerMembersPrefix, mh) return mux } diff --git a/etcdserver/server.go b/etcdserver/server.go index 889a2f17c..0b18c0ba3 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -144,11 +144,11 @@ type EtcdServer struct { stats *stats.ServerStats lstats *stats.LeaderStats - // sender specifies the sender to send msgs to members. sending msgs - // MUST NOT block. It is okay to drop messages, since clients should - // timeout and reissue their messages. If send is nil, server will - // panic. - sendhub SendHub + // transport specifies the transport to send and receive msgs to members. + // Sending messages MUST NOT block. It is okay to drop messages, since + // clients should timeout and reissue their messages. + // If transport is nil, server will panic. + transport rafthttp.Transporter Ticker <-chan time.Time SyncTicker <-chan time.Time @@ -271,13 +271,22 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { SyncTicker: time.Tick(500 * time.Millisecond), snapCount: cfg.SnapCount, } - srv.sendhub = newSendHub(cfg.Transport, cfg.Cluster, srv, sstats, lstats) + tr := &rafthttp.Transport{ + RoundTripper: cfg.Transport, + ID: id, + ClusterID: cfg.Cluster.ID(), + Processor: srv, + ServerStats: sstats, + LeaderStats: lstats, + } + tr.Start() // add all the remote members into sendhub for _, m := range cfg.Cluster.Members() { if m.Name != cfg.Name { - srv.sendhub.Add(m) + tr.AddPeer(m.ID, m.PeerURLs) } } + srv.transport = tr return srv, nil } @@ -327,7 +336,7 @@ func (s *EtcdServer) purgeFile() { func (s *EtcdServer) ID() types.ID { return s.id } -func (s *EtcdServer) SenderFinder() rafthttp.SenderFinder { return s.sendhub } +func (s *EtcdServer) RaftHandler() http.Handler { return s.transport.Handler() } func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { if s.Cluster.IsIDRemoved(types.ID(m.From)) { @@ -343,7 +352,7 @@ func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error { func (s *EtcdServer) run() { var syncC <-chan time.Time var shouldstop bool - shouldstopC := s.sendhub.ShouldStopNotify() + shouldstopC := s.transport.ShouldStopNotify() // load initial state from raft storage snap, err := s.raftStorage.Snapshot() @@ -357,7 +366,7 @@ func (s *EtcdServer) run() { defer func() { s.node.Stop() - s.sendhub.Stop() + s.transport.Stop() if err := s.storage.Close(); err != nil { log.Panicf("etcdserver: close storage error: %v", err) } @@ -397,7 +406,7 @@ func (s *EtcdServer) run() { } s.raftStorage.Append(rd.Entries) - s.sendhub.Send(rd.Messages) + s.send(rd.Messages) // recover from snapshot if it is more updated than current applied if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > appliedi { @@ -663,6 +672,15 @@ func getExpirationTime(r *pb.Request) time.Time { return t } +func (s *EtcdServer) send(ms []raftpb.Message) { + for _, m := range ms { + if !s.Cluster.IsIDRemoved(types.ID(m.To)) { + m.To = 0 + } + } + s.transport.Send(ms) +} + // apply takes entries received from Raft (after it has been committed) and // applies them to the current state of the EtcdServer. // The given entries should not be empty. @@ -764,7 +782,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if m.ID == s.id { log.Printf("etcdserver: added local member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } else { - s.sendhub.Add(m) + s.transport.AddPeer(m.ID, m.PeerURLs) log.Printf("etcdserver: added member %s %v to cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } case raftpb.ConfChangeRemoveNode: @@ -775,7 +793,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con log.Println("etcdserver: the data-dir used by this member must be removed so that this host can be re-added with a new member ID") return true, nil } else { - s.sendhub.Remove(id) + s.transport.RemovePeer(id) log.Printf("etcdserver: removed member %s from cluster %s", id, s.Cluster.ID()) } case raftpb.ConfChangeUpdateNode: @@ -790,7 +808,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con if m.ID == s.id { log.Printf("etcdserver: update local member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } else { - s.sendhub.Update(m) + s.transport.UpdatePeer(m.ID, m.PeerURLs) log.Printf("etcdserver: update member %s %v in cluster %s", m.ID, m.PeerURLs, s.Cluster.ID()) } } @@ -831,13 +849,13 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) { // for testing func (s *EtcdServer) PauseSending() { - hub := s.sendhub.(*sendHub) - hub.pause() + hub := s.transport.(*rafthttp.Transport) + hub.Pause() } func (s *EtcdServer) ResumeSending() { - hub := s.sendhub.(*sendHub) - hub.resume() + hub := s.transport.(*rafthttp.Transport) + hub.Resume() } func startNode(cfg *ServerConfig, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) { diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index cebd7dbd7..1b432716b 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -22,6 +22,7 @@ import ( "io/ioutil" "log" "math/rand" + "net/http" "path" "reflect" "strconv" @@ -499,10 +500,10 @@ func TestApplyConfChangeShouldStop(t *testing.T) { cl.AddMember(&Member{ID: types.ID(i)}) } srv := &EtcdServer{ - id: 1, - node: &nodeRecorder{}, - Cluster: cl, - sendhub: &nopSender{}, + id: 1, + node: &nodeRecorder{}, + Cluster: cl, + transport: &nopTransporter{}, } cc := raftpb.ConfChange{ Type: raftpb.ConfChangeRemoveNode, @@ -531,21 +532,24 @@ func TestApplyConfChangeShouldStop(t *testing.T) { func TestClusterOf1(t *testing.T) { testServer(t, 1) } func TestClusterOf3(t *testing.T) { testServer(t, 3) } -type fakeSender struct { +type fakeTransporter struct { ss []*EtcdServer } -func (s *fakeSender) Sender(id types.ID) rafthttp.Sender { return nil } -func (s *fakeSender) Send(msgs []raftpb.Message) { +func (s *fakeTransporter) Handler() http.Handler { return nil } +func (s *fakeTransporter) Sender(id types.ID) rafthttp.Sender { return nil } +func (s *fakeTransporter) Send(msgs []raftpb.Message) { for _, m := range msgs { s.ss[m.To-1].node.Step(context.TODO(), m) } } -func (s *fakeSender) Add(m *Member) {} -func (s *fakeSender) Update(m *Member) {} -func (s *fakeSender) Remove(id types.ID) {} -func (s *fakeSender) Stop() {} -func (s *fakeSender) ShouldStopNotify() <-chan struct{} { return nil } +func (s *fakeTransporter) AddPeer(id types.ID, us []string) {} +func (s *fakeTransporter) UpdatePeer(id types.ID, us []string) {} +func (s *fakeTransporter) RemovePeer(id types.ID) {} +func (s *fakeTransporter) Stop() {} +func (s *fakeTransporter) ShouldStopNotify() <-chan struct{} { return nil } +func (s *fakeTransporter) Pause() {} +func (s *fakeTransporter) Resume() {} func testServer(t *testing.T, ns uint64) { ctx, cancel := context.WithCancel(context.Background()) @@ -571,7 +575,7 @@ func testServer(t *testing.T, ns uint64) { node: n, raftStorage: s, store: st, - sendhub: &fakeSender{ss}, + transport: &fakeTransporter{ss}, storage: &storageRecorder{}, Ticker: tk.C, Cluster: cl, @@ -646,7 +650,7 @@ func TestDoProposal(t *testing.T) { node: n, raftStorage: s, store: st, - sendhub: &nopSender{}, + transport: &nopTransporter{}, storage: &storageRecorder{}, Ticker: tk, Cluster: cl, @@ -735,7 +739,7 @@ func TestDoProposalStopped(t *testing.T) { node: n, raftStorage: s, store: st, - sendhub: &nopSender{}, + transport: &nopTransporter{}, storage: &storageRecorder{}, Ticker: tk, Cluster: cl, @@ -847,7 +851,7 @@ func TestSyncTrigger(t *testing.T) { node: n, raftStorage: raft.NewMemoryStorage(), store: &storeRecorder{}, - sendhub: &nopSender{}, + transport: &nopTransporter{}, storage: &storageRecorder{}, SyncTicker: st, } @@ -933,7 +937,7 @@ func TestTriggerSnap(t *testing.T) { cl.SetStore(store.New()) srv := &EtcdServer{ store: st, - sendhub: &nopSender{}, + transport: &nopTransporter{}, storage: p, node: n, raftStorage: s, @@ -973,7 +977,7 @@ func TestRecvSnapshot(t *testing.T) { cl.SetStore(store.New()) s := &EtcdServer{ store: st, - sendhub: &nopSender{}, + transport: &nopTransporter{}, storage: p, node: n, raftStorage: raft.NewMemoryStorage(), @@ -1006,7 +1010,7 @@ func TestRecvSlowSnapshot(t *testing.T) { cl.SetStore(store.New()) s := &EtcdServer{ store: st, - sendhub: &nopSender{}, + transport: &nopTransporter{}, storage: &storageRecorder{}, node: n, raftStorage: raft.NewMemoryStorage(), @@ -1039,7 +1043,7 @@ func TestApplySnapshotAndCommittedEntries(t *testing.T) { storage := raft.NewMemoryStorage() s := &EtcdServer{ store: st, - sendhub: &nopSender{}, + transport: &nopTransporter{}, storage: &storageRecorder{}, node: n, raftStorage: storage, @@ -1082,7 +1086,7 @@ func TestAddMember(t *testing.T) { node: n, raftStorage: raft.NewMemoryStorage(), store: &storeRecorder{}, - sendhub: &nopSender{}, + transport: &nopTransporter{}, storage: &storageRecorder{}, Cluster: cl, } @@ -1117,7 +1121,7 @@ func TestRemoveMember(t *testing.T) { node: n, raftStorage: raft.NewMemoryStorage(), store: &storeRecorder{}, - sendhub: &nopSender{}, + transport: &nopTransporter{}, storage: &storageRecorder{}, Cluster: cl, } @@ -1151,7 +1155,7 @@ func TestUpdateMember(t *testing.T) { node: n, raftStorage: raft.NewMemoryStorage(), store: &storeRecorder{}, - sendhub: &nopSender{}, + transport: &nopTransporter{}, storage: &storageRecorder{}, Cluster: cl, } @@ -1219,12 +1223,12 @@ func TestPublish(t *testing.T) { // TestPublishStopped tests that publish will be stopped if server is stopped. func TestPublishStopped(t *testing.T) { srv := &EtcdServer{ - node: &nodeRecorder{}, - sendhub: &nopSender{}, - Cluster: &Cluster{}, - w: &waitRecorder{}, - done: make(chan struct{}), - stop: make(chan struct{}), + node: &nodeRecorder{}, + transport: &nopTransporter{}, + Cluster: &Cluster{}, + w: &waitRecorder{}, + done: make(chan struct{}), + stop: make(chan struct{}), } close(srv.done) srv.publish(time.Hour) @@ -1625,15 +1629,18 @@ func (w *waitWithResponse) Register(id uint64) <-chan interface{} { } func (w *waitWithResponse) Trigger(id uint64, x interface{}) {} -type nopSender struct{} +type nopTransporter struct{} -func (s *nopSender) Sender(id types.ID) rafthttp.Sender { return nil } -func (s *nopSender) Send(m []raftpb.Message) {} -func (s *nopSender) Add(m *Member) {} -func (s *nopSender) Remove(id types.ID) {} -func (s *nopSender) Update(m *Member) {} -func (s *nopSender) Stop() {} -func (s *nopSender) ShouldStopNotify() <-chan struct{} { return nil } +func (s *nopTransporter) Handler() http.Handler { return nil } +func (s *nopTransporter) Sender(id types.ID) rafthttp.Sender { return nil } +func (s *nopTransporter) Send(m []raftpb.Message) {} +func (s *nopTransporter) AddPeer(id types.ID, us []string) {} +func (s *nopTransporter) RemovePeer(id types.ID) {} +func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} +func (s *nopTransporter) Stop() {} +func (s *nopTransporter) ShouldStopNotify() <-chan struct{} { return nil } +func (s *nopTransporter) Pause() {} +func (s *nopTransporter) Resume() {} func mustMakePeerSlice(t *testing.T, ids ...uint64) []raft.Peer { peers := make([]raft.Peer, len(ids)) diff --git a/rafthttp/http.go b/rafthttp/http.go index 056446884..0e96cf185 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -40,10 +40,6 @@ var ( RaftStreamPrefix = path.Join(RaftPrefix, "stream") ) -type Processor interface { - Process(ctx context.Context, m raftpb.Message) error -} - type SenderFinder interface { // Sender returns the sender of the given id. Sender(id types.ID) Sender diff --git a/etcdserver/sendhub.go b/rafthttp/sendhub.go similarity index 68% rename from etcdserver/sendhub.go rename to rafthttp/sendhub.go index 8c3108a8c..ccdcd6667 100644 --- a/etcdserver/sendhub.go +++ b/rafthttp/sendhub.go @@ -14,7 +14,7 @@ limitations under the License. */ -package etcdserver +package rafthttp import ( "log" @@ -26,50 +26,39 @@ import ( "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft/raftpb" - "github.com/coreos/etcd/rafthttp" ) const ( raftPrefix = "/raft" ) -type SendHub interface { - rafthttp.SenderFinder - Send(m []raftpb.Message) - Add(m *Member) - Remove(id types.ID) - Update(m *Member) - Stop() - ShouldStopNotify() <-chan struct{} -} - type sendHub struct { tr http.RoundTripper - cl ClusterInfo - p rafthttp.Processor + cid types.ID + p Processor ss *stats.ServerStats ls *stats.LeaderStats mu sync.RWMutex // protect the sender map - senders map[types.ID]rafthttp.Sender + senders map[types.ID]Sender shouldstop chan struct{} } // newSendHub creates the default send hub used to transport raft messages // to other members. The returned sendHub will update the given ServerStats and // LeaderStats appropriately. -func newSendHub(t http.RoundTripper, cl ClusterInfo, p rafthttp.Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub { +func newSendHub(t http.RoundTripper, cid types.ID, p Processor, ss *stats.ServerStats, ls *stats.LeaderStats) *sendHub { return &sendHub{ tr: t, - cl: cl, + cid: cid, p: p, ss: ss, ls: ls, - senders: make(map[types.ID]rafthttp.Sender), + senders: make(map[types.ID]Sender), shouldstop: make(chan struct{}, 1), } } -func (h *sendHub) Sender(id types.ID) rafthttp.Sender { +func (h *sendHub) Sender(id types.ID) Sender { h.mu.RLock() defer h.mu.RUnlock() return h.senders[id] @@ -77,12 +66,14 @@ func (h *sendHub) Sender(id types.ID) rafthttp.Sender { func (h *sendHub) Send(msgs []raftpb.Message) { for _, m := range msgs { + // intentionally dropped message + if m.To == 0 { + continue + } to := types.ID(m.To) s, ok := h.senders[to] if !ok { - if !h.cl.IsIDRemoved(to) { - log.Printf("etcdserver: send message to unknown receiver %s", to) - } + log.Printf("etcdserver: send message to unknown receiver %s", to) continue } @@ -107,55 +98,55 @@ func (h *sendHub) ShouldStopNotify() <-chan struct{} { return h.shouldstop } -func (h *sendHub) Add(m *Member) { +func (h *sendHub) AddPeer(id types.ID, urls []string) { h.mu.Lock() defer h.mu.Unlock() - if _, ok := h.senders[m.ID]; ok { + if _, ok := h.senders[id]; ok { return } // TODO: considering how to switch between all available peer urls - peerURL := m.PickPeerURL() + peerURL := urls[0] u, err := url.Parse(peerURL) if err != nil { log.Panicf("unexpect peer url %s", peerURL) } u.Path = path.Join(u.Path, raftPrefix) - fs := h.ls.Follower(m.ID.String()) - s := rafthttp.NewSender(h.tr, u.String(), m.ID, h.cl.ID(), h.p, fs, h.shouldstop) - h.senders[m.ID] = s + fs := h.ls.Follower(id.String()) + s := NewSender(h.tr, u.String(), id, h.cid, h.p, fs, h.shouldstop) + h.senders[id] = s } -func (h *sendHub) Remove(id types.ID) { +func (h *sendHub) RemovePeer(id types.ID) { h.mu.Lock() defer h.mu.Unlock() h.senders[id].Stop() delete(h.senders, id) } -func (h *sendHub) Update(m *Member) { +func (h *sendHub) UpdatePeer(id types.ID, urls []string) { h.mu.Lock() defer h.mu.Unlock() // TODO: return error or just panic? - if _, ok := h.senders[m.ID]; !ok { + if _, ok := h.senders[id]; !ok { return } - peerURL := m.PickPeerURL() + peerURL := urls[0] u, err := url.Parse(peerURL) if err != nil { log.Panicf("unexpect peer url %s", peerURL) } u.Path = path.Join(u.Path, raftPrefix) - h.senders[m.ID].Update(u.String()) + h.senders[id].Update(u.String()) } // for testing -func (h *sendHub) pause() { +func (h *sendHub) Pause() { for _, s := range h.senders { s.Pause() } } -func (h *sendHub) resume() { +func (h *sendHub) Resume() { for _, s := range h.senders { s.Resume() } diff --git a/etcdserver/sendhub_test.go b/rafthttp/sendhub_test.go similarity index 63% rename from etcdserver/sendhub_test.go rename to rafthttp/sendhub_test.go index 427d2030c..bdea52a9a 100644 --- a/etcdserver/sendhub_test.go +++ b/rafthttp/sendhub_test.go @@ -14,7 +14,7 @@ limitations under the License. */ -package etcdserver +package rafthttp import ( "net/http" @@ -28,11 +28,9 @@ import ( ) func TestSendHubAdd(t *testing.T) { - cl := newTestCluster(nil) ls := stats.NewLeaderStats("") - h := newSendHub(nil, cl, nil, nil, ls) - m := newTestMember(1, []string{"http://a"}, "", nil) - h.Add(m) + h := newSendHub(nil, 0, nil, nil, ls) + h.AddPeer(1, []string{"http://a"}) if _, ok := ls.Followers["1"]; !ok { t.Errorf("FollowerStats[1] is nil, want exists") @@ -42,20 +40,18 @@ func TestSendHubAdd(t *testing.T) { t.Fatalf("senders[1] is nil, want exists") } - h.Add(m) + h.AddPeer(1, []string{"http://a"}) ns := h.senders[types.ID(1)] if s != ns { - t.Errorf("sender = %p, want %p", ns, s) + t.Errorf("sender = %v, want %v", ns, s) } } func TestSendHubRemove(t *testing.T) { - cl := newTestCluster(nil) ls := stats.NewLeaderStats("") - h := newSendHub(nil, cl, nil, nil, ls) - m := newTestMember(1, []string{"http://a"}, "", nil) - h.Add(m) - h.Remove(types.ID(1)) + h := newSendHub(nil, 0, nil, nil, ls) + h.AddPeer(1, []string{"http://a"}) + h.RemovePeer(types.ID(1)) if _, ok := h.senders[types.ID(1)]; ok { t.Fatalf("senders[1] exists, want removed") @@ -64,11 +60,9 @@ func TestSendHubRemove(t *testing.T) { func TestSendHubShouldStop(t *testing.T) { tr := newRespRoundTripper(http.StatusForbidden, nil) - cl := newTestCluster(nil) ls := stats.NewLeaderStats("") - h := newSendHub(tr, cl, nil, nil, ls) - m := newTestMember(1, []string{"http://a"}, "", nil) - h.Add(m) + h := newSendHub(tr, 0, nil, nil, ls) + h.AddPeer(1, []string{"http://a"}) shouldstop := h.ShouldStopNotify() select { @@ -85,20 +79,3 @@ func TestSendHubShouldStop(t *testing.T) { t.Fatalf("cannot receive stop notification") } } - -type respRoundTripper struct { - code int - err error -} - -func newRespRoundTripper(code int, err error) *respRoundTripper { - return &respRoundTripper{code: code, err: err} -} -func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { - return &http.Response{StatusCode: t.code, Body: &nopReadCloser{}}, t.err -} - -type nopReadCloser struct{} - -func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, nil } -func (n *nopReadCloser) Close() error { return nil } diff --git a/rafthttp/transport.go b/rafthttp/transport.go new file mode 100644 index 000000000..56d08270f --- /dev/null +++ b/rafthttp/transport.go @@ -0,0 +1,49 @@ +package rafthttp + +import ( + "net/http" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" +) + +type Processor interface { + Process(ctx context.Context, m raftpb.Message) error +} + +type Transporter interface { + Handler() http.Handler + Send(m []raftpb.Message) + AddPeer(id types.ID, urls []string) + RemovePeer(id types.ID) + UpdatePeer(id types.ID, urls []string) + Stop() + ShouldStopNotify() <-chan struct{} +} + +type Transport struct { + RoundTripper http.RoundTripper + ID types.ID + ClusterID types.ID + Processor Processor + ServerStats *stats.ServerStats + LeaderStats *stats.LeaderStats + + *sendHub + handler http.Handler +} + +func (t *Transport) Start() { + t.sendHub = newSendHub(t.RoundTripper, t.ClusterID, t.Processor, t.ServerStats, t.LeaderStats) + h := NewHandler(t.Processor, t.ClusterID) + sh := NewStreamHandler(t.sendHub, t.ID, t.ClusterID) + mux := http.NewServeMux() + mux.Handle(RaftPrefix, h) + mux.Handle(RaftStreamPrefix+"/", sh) + t.handler = mux +} + +func (t *Transport) Handler() http.Handler { return t.handler }