diff --git a/rafthttp/functional_test.go b/rafthttp/functional_test.go index 50ae4c061..e07c0ba73 100644 --- a/rafthttp/functional_test.go +++ b/rafthttp/functional_test.go @@ -148,7 +148,7 @@ func newServerStats() *stats.ServerStats { func waitStreamWorking(p *peer) bool { for i := 0; i < 1000; i++ { time.Sleep(time.Millisecond) - if _, ok := p.msgAppWriter.writec(); !ok { + if _, ok := p.msgAppV2Writer.writec(); !ok { continue } if _, ok := p.writer.writec(); !ok { diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 5b70228df..8a95d5031 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -91,11 +91,11 @@ type peer struct { status *peerStatus - msgAppWriter *streamWriter - writer *streamWriter - pipeline *pipeline - snapSender *snapshotSender // snapshot sender to send v3 snapshot messages - msgAppReader *streamReader + msgAppV2Writer *streamWriter + writer *streamWriter + pipeline *pipeline + snapSender *snapshotSender // snapshot sender to send v3 snapshot messages + msgAppV2Reader *streamReader sendc chan raftpb.Message recvc chan raftpb.Message @@ -114,22 +114,22 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t picker := newURLPicker(urls) status := newPeerStatus(to) p := &peer{ - id: to, - r: r, - v3demo: v3demo, - status: status, - msgAppWriter: startStreamWriter(to, status, fs, r), - writer: startStreamWriter(to, status, fs, r), - pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc), - snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc), - sendc: make(chan raftpb.Message), - recvc: make(chan raftpb.Message, recvBufSize), - propc: make(chan raftpb.Message, maxPendingProposals), - newURLsC: make(chan types.URLs), - pausec: make(chan struct{}), - resumec: make(chan struct{}), - stopc: make(chan struct{}), - done: make(chan struct{}), + id: to, + r: r, + v3demo: v3demo, + status: status, + msgAppV2Writer: startStreamWriter(to, status, fs, r), + writer: startStreamWriter(to, status, fs, r), + pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc), + snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc), + sendc: make(chan raftpb.Message), + recvc: make(chan raftpb.Message, recvBufSize), + propc: make(chan raftpb.Message, maxPendingProposals), + newURLsC: make(chan types.URLs), + pausec: make(chan struct{}), + resumec: make(chan struct{}), + stopc: make(chan struct{}), + done: make(chan struct{}), } // Use go-routine for process of MsgProp because it is @@ -148,7 +148,7 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t } }() - p.msgAppReader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc) + p.msgAppV2Reader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc) reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc) go func() { var paused bool @@ -188,11 +188,11 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t paused = false case <-p.stopc: cancel() - p.msgAppWriter.stop() + p.msgAppV2Writer.stop() p.writer.stop() p.pipeline.stop() p.snapSender.stop() - p.msgAppReader.stop() + p.msgAppV2Reader.stop() reader.stop() close(p.done) return @@ -221,7 +221,7 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) { var ok bool switch conn.t { case streamTypeMsgAppV2: - ok = p.msgAppWriter.attach(conn) + ok = p.msgAppV2Writer.attach(conn) case streamTypeMessage: ok = p.writer.attach(conn) default: @@ -264,7 +264,7 @@ func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked stri // stream for a long time, only use one of the N pipelines to send MsgSnap. if isMsgSnap(m) { return p.pipeline.msgc, pipelineMsg - } else if writec, ok = p.msgAppWriter.writec(); ok && isMsgApp(m) { + } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) { return writec, streamAppV2 } else if writec, ok = p.writer.writec(); ok { return writec, streamMsg diff --git a/rafthttp/peer_test.go b/rafthttp/peer_test.go index 0ad2f82b3..1b5dd66df 100644 --- a/rafthttp/peer_test.go +++ b/rafthttp/peer_test.go @@ -75,9 +75,9 @@ func TestPeerPick(t *testing.T) { } for i, tt := range tests { peer := &peer{ - msgAppWriter: &streamWriter{working: tt.msgappWorking}, - writer: &streamWriter{working: tt.messageWorking}, - pipeline: &pipeline{}, + msgAppV2Writer: &streamWriter{working: tt.msgappWorking}, + writer: &streamWriter{working: tt.messageWorking}, + pipeline: &pipeline{}, } _, picked := peer.pick(tt.m) if picked != tt.wpicked {