mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
rafthttp: msgApp{Reader/Writer} -> msgAppV2{Reader/Writer}
To make what it serves more clear.
This commit is contained in:
parent
5060b2f322
commit
b61eaf3335
@ -148,7 +148,7 @@ func newServerStats() *stats.ServerStats {
|
|||||||
func waitStreamWorking(p *peer) bool {
|
func waitStreamWorking(p *peer) bool {
|
||||||
for i := 0; i < 1000; i++ {
|
for i := 0; i < 1000; i++ {
|
||||||
time.Sleep(time.Millisecond)
|
time.Sleep(time.Millisecond)
|
||||||
if _, ok := p.msgAppWriter.writec(); !ok {
|
if _, ok := p.msgAppV2Writer.writec(); !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if _, ok := p.writer.writec(); !ok {
|
if _, ok := p.writer.writec(); !ok {
|
||||||
|
@ -91,11 +91,11 @@ type peer struct {
|
|||||||
|
|
||||||
status *peerStatus
|
status *peerStatus
|
||||||
|
|
||||||
msgAppWriter *streamWriter
|
msgAppV2Writer *streamWriter
|
||||||
writer *streamWriter
|
writer *streamWriter
|
||||||
pipeline *pipeline
|
pipeline *pipeline
|
||||||
snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
|
snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
|
||||||
msgAppReader *streamReader
|
msgAppV2Reader *streamReader
|
||||||
|
|
||||||
sendc chan raftpb.Message
|
sendc chan raftpb.Message
|
||||||
recvc chan raftpb.Message
|
recvc chan raftpb.Message
|
||||||
@ -114,22 +114,22 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
|
|||||||
picker := newURLPicker(urls)
|
picker := newURLPicker(urls)
|
||||||
status := newPeerStatus(to)
|
status := newPeerStatus(to)
|
||||||
p := &peer{
|
p := &peer{
|
||||||
id: to,
|
id: to,
|
||||||
r: r,
|
r: r,
|
||||||
v3demo: v3demo,
|
v3demo: v3demo,
|
||||||
status: status,
|
status: status,
|
||||||
msgAppWriter: startStreamWriter(to, status, fs, r),
|
msgAppV2Writer: startStreamWriter(to, status, fs, r),
|
||||||
writer: startStreamWriter(to, status, fs, r),
|
writer: startStreamWriter(to, status, fs, r),
|
||||||
pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc),
|
pipeline: newPipeline(pipelineRt, picker, local, to, cid, status, fs, r, errorc),
|
||||||
snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc),
|
snapSender: newSnapshotSender(pipelineRt, picker, local, to, cid, status, snapst, r, errorc),
|
||||||
sendc: make(chan raftpb.Message),
|
sendc: make(chan raftpb.Message),
|
||||||
recvc: make(chan raftpb.Message, recvBufSize),
|
recvc: make(chan raftpb.Message, recvBufSize),
|
||||||
propc: make(chan raftpb.Message, maxPendingProposals),
|
propc: make(chan raftpb.Message, maxPendingProposals),
|
||||||
newURLsC: make(chan types.URLs),
|
newURLsC: make(chan types.URLs),
|
||||||
pausec: make(chan struct{}),
|
pausec: make(chan struct{}),
|
||||||
resumec: make(chan struct{}),
|
resumec: make(chan struct{}),
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use go-routine for process of MsgProp because it is
|
// Use go-routine for process of MsgProp because it is
|
||||||
@ -148,7 +148,7 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
p.msgAppReader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
|
p.msgAppV2Reader = startStreamReader(streamRt, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc)
|
||||||
reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
|
reader := startStreamReader(streamRt, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc)
|
||||||
go func() {
|
go func() {
|
||||||
var paused bool
|
var paused bool
|
||||||
@ -188,11 +188,11 @@ func startPeer(streamRt, pipelineRt http.RoundTripper, urls types.URLs, local, t
|
|||||||
paused = false
|
paused = false
|
||||||
case <-p.stopc:
|
case <-p.stopc:
|
||||||
cancel()
|
cancel()
|
||||||
p.msgAppWriter.stop()
|
p.msgAppV2Writer.stop()
|
||||||
p.writer.stop()
|
p.writer.stop()
|
||||||
p.pipeline.stop()
|
p.pipeline.stop()
|
||||||
p.snapSender.stop()
|
p.snapSender.stop()
|
||||||
p.msgAppReader.stop()
|
p.msgAppV2Reader.stop()
|
||||||
reader.stop()
|
reader.stop()
|
||||||
close(p.done)
|
close(p.done)
|
||||||
return
|
return
|
||||||
@ -221,7 +221,7 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) {
|
|||||||
var ok bool
|
var ok bool
|
||||||
switch conn.t {
|
switch conn.t {
|
||||||
case streamTypeMsgAppV2:
|
case streamTypeMsgAppV2:
|
||||||
ok = p.msgAppWriter.attach(conn)
|
ok = p.msgAppV2Writer.attach(conn)
|
||||||
case streamTypeMessage:
|
case streamTypeMessage:
|
||||||
ok = p.writer.attach(conn)
|
ok = p.writer.attach(conn)
|
||||||
default:
|
default:
|
||||||
@ -264,7 +264,7 @@ func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked stri
|
|||||||
// stream for a long time, only use one of the N pipelines to send MsgSnap.
|
// stream for a long time, only use one of the N pipelines to send MsgSnap.
|
||||||
if isMsgSnap(m) {
|
if isMsgSnap(m) {
|
||||||
return p.pipeline.msgc, pipelineMsg
|
return p.pipeline.msgc, pipelineMsg
|
||||||
} else if writec, ok = p.msgAppWriter.writec(); ok && isMsgApp(m) {
|
} else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
|
||||||
return writec, streamAppV2
|
return writec, streamAppV2
|
||||||
} else if writec, ok = p.writer.writec(); ok {
|
} else if writec, ok = p.writer.writec(); ok {
|
||||||
return writec, streamMsg
|
return writec, streamMsg
|
||||||
|
@ -75,9 +75,9 @@ func TestPeerPick(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
peer := &peer{
|
peer := &peer{
|
||||||
msgAppWriter: &streamWriter{working: tt.msgappWorking},
|
msgAppV2Writer: &streamWriter{working: tt.msgappWorking},
|
||||||
writer: &streamWriter{working: tt.messageWorking},
|
writer: &streamWriter{working: tt.messageWorking},
|
||||||
pipeline: &pipeline{},
|
pipeline: &pipeline{},
|
||||||
}
|
}
|
||||||
_, picked := peer.pick(tt.m)
|
_, picked := peer.pick(tt.m)
|
||||||
if picked != tt.wpicked {
|
if picked != tt.wpicked {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user