From c25c00fcf9ed4590a79736090a3376246b1b0989 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 1 Jun 2016 21:47:46 -0700 Subject: [PATCH] rafthttp: simplify initialization funcs --- rafthttp/peer.go | 24 +++++++++++++----------- rafthttp/remote.go | 6 +++--- rafthttp/snapshot_sender.go | 10 +++++----- rafthttp/transport.go | 4 ++-- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 683d4f225..9ba24d028 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -117,14 +117,16 @@ type peer struct { stopc chan struct{} } -func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { - plog.Infof("starting peer %s...", to) - defer plog.Infof("started peer %s", to) +func startPeer(transport *Transport, urls types.URLs, id types.ID, fs *stats.FollowerStats) *peer { + plog.Infof("starting peer %s...", id) + defer plog.Infof("started peer %s", id) - status := newPeerStatus(to) + status := newPeerStatus(id) picker := newURLPicker(urls) + errorc := transport.ErrorC + r := transport.Raft pipeline := &pipeline{ - to: to, + to: id, tr: transport, picker: picker, status: status, @@ -135,14 +137,14 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r pipeline.start() p := &peer{ - id: to, + id: id, r: r, status: status, picker: picker, - msgAppV2Writer: startStreamWriter(to, status, fs, r), - writer: startStreamWriter(to, status, fs, r), + msgAppV2Writer: startStreamWriter(id, status, fs, r), + writer: startStreamWriter(id, status, fs, r), pipeline: pipeline, - snapSender: newSnapshotSender(transport, picker, local, to, cid, status, r, errorc), + snapSender: newSnapshotSender(transport, picker, id, status), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), propc: make(chan raftpb.Message, maxPendingProposals), @@ -184,7 +186,7 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r typ: streamTypeMsgAppV2, tr: transport, picker: picker, - to: to, + to: id, status: status, recvc: p.recvc, propc: p.propc, @@ -193,7 +195,7 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r typ: streamTypeMessage, tr: transport, picker: picker, - to: to, + to: id, status: status, recvc: p.recvc, propc: p.propc, diff --git a/rafthttp/remote.go b/rafthttp/remote.go index 02c10e118..d34662b14 100644 --- a/rafthttp/remote.go +++ b/rafthttp/remote.go @@ -25,7 +25,7 @@ type remote struct { pipeline *pipeline } -func startRemote(tr *Transport, urls types.URLs, to types.ID, r Raft, errorc chan error) *remote { +func startRemote(tr *Transport, urls types.URLs, to types.ID) *remote { picker := newURLPicker(urls) status := newPeerStatus(to) pipeline := &pipeline{ @@ -33,8 +33,8 @@ func startRemote(tr *Transport, urls types.URLs, to types.ID, r Raft, errorc cha tr: tr, picker: picker, status: status, - raft: r, - errorc: errorc, + raft: tr.Raft, + errorc: tr.ErrorC, } pipeline.start() diff --git a/rafthttp/snapshot_sender.go b/rafthttp/snapshot_sender.go index 8b5731476..615e408e3 100644 --- a/rafthttp/snapshot_sender.go +++ b/rafthttp/snapshot_sender.go @@ -46,16 +46,16 @@ type snapshotSender struct { stopc chan struct{} } -func newSnapshotSender(tr *Transport, picker *urlPicker, from, to, cid types.ID, status *peerStatus, r Raft, errorc chan error) *snapshotSender { +func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *peerStatus) *snapshotSender { return &snapshotSender{ - from: from, + from: tr.ID, to: to, - cid: cid, + cid: tr.ClusterID, tr: tr, picker: picker, status: status, - r: r, - errorc: errorc, + r: tr.Raft, + errorc: tr.ErrorC, stopc: make(chan struct{}), } } diff --git a/rafthttp/transport.go b/rafthttp/transport.go index ab459b0f3..a7692d3b9 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -225,7 +225,7 @@ func (t *Transport) AddRemote(id types.ID, us []string) { if err != nil { plog.Panicf("newURLs %+v should never fail: %+v", us, err) } - t.remotes[id] = startRemote(t, urls, id, t.Raft, t.ErrorC) + t.remotes[id] = startRemote(t, urls, id) } func (t *Transport) AddPeer(id types.ID, us []string) { @@ -243,7 +243,7 @@ func (t *Transport) AddPeer(id types.ID, us []string) { plog.Panicf("newURLs %+v should never fail: %+v", us, err) } fs := t.LeaderStats.Follower(id.String()) - t.peers[id] = startPeer(t, urls, t.ID, id, t.ClusterID, t.Raft, fs, t.ErrorC) + t.peers[id] = startPeer(t, urls, id, fs) addPeerToProber(t.prober, id.String(), us) plog.Infof("added peer %s", id)