From 2a3cacb60cc88b9cbd25322bfa31389760cf0317 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 17 Feb 2016 21:50:54 -0800 Subject: [PATCH] rafthttp: remove unncessary go routine --- rafthttp/peer.go | 123 ++++++++++++++++++----------------------------- 1 file changed, 48 insertions(+), 75 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index f292d45de..8c804b4d7 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -15,6 +15,7 @@ package rafthttp import ( + "sync" "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" @@ -104,18 +105,17 @@ type peer struct { pipeline *pipeline snapSender *snapshotSender // snapshot sender to send v3 snapshot messages msgAppV2Reader *streamReader + msgAppReader *streamReader - sendc chan raftpb.Message - recvc chan raftpb.Message - propc chan raftpb.Message - newURLsC chan types.URLs + sendc chan raftpb.Message + recvc chan raftpb.Message + propc chan raftpb.Message - // for testing - pausec chan struct{} - resumec chan struct{} + mu sync.Mutex + paused bool - stopc chan struct{} - done chan struct{} + cancel context.CancelFunc // cancel pending works in go routine created by peer. + stopc chan struct{} } func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer { @@ -134,16 +134,11 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), propc: make(chan raftpb.Message, maxPendingProposals), - newURLsC: make(chan types.URLs), - pausec: make(chan struct{}), - resumec: make(chan struct{}), stopc: make(chan struct{}), - done: make(chan struct{}), } - // Use go-routine for process of MsgProp because it is - // blocking when there is no leader. ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel go func() { for { select { @@ -151,6 +146,10 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r if err := r.Process(ctx, mm); err != nil { plog.Warningf("failed to process raft message (%v)", err) } + case mm := <-p.recvc: + if err := r.Process(ctx, mm); err != nil { + plog.Warningf("failed to process raft message (%v)", err) + } case <-p.stopc: return } @@ -158,59 +157,32 @@ func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r }() p.msgAppV2Reader = startStreamReader(transport, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc) - reader := startStreamReader(transport, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc) - go func() { - var paused bool - for { - select { - case m := <-p.sendc: - if paused { - continue - } - writec, name := p.pick(m) - select { - case writec <- m: - default: - p.r.ReportUnreachable(m.To) - if isMsgSnap(m) { - p.r.ReportSnapshot(m.To, raft.SnapshotFailure) - } - if status.isActive() { - plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name) - } - plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) - } - case mm := <-p.recvc: - if err := r.Process(context.TODO(), mm); err != nil { - plog.Warningf("failed to process raft message (%v)", err) - } - case urls := <-p.newURLsC: - picker.update(urls) - case <-p.pausec: - paused = true - case <-p.resumec: - paused = false - case <-p.stopc: - cancel() - p.msgAppV2Writer.stop() - p.writer.stop() - p.pipeline.stop() - p.snapSender.stop() - p.msgAppV2Reader.stop() - reader.stop() - close(p.done) - return - } - } - }() + p.msgAppReader = startStreamReader(transport, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc) return p } func (p *peer) send(m raftpb.Message) { + p.mu.Lock() + paused := p.paused + p.mu.Unlock() + + if paused { + return + } + + writec, name := p.pick(m) select { - case p.sendc <- m: - case <-p.done: + case writec <- m: + default: + p.r.ReportUnreachable(m.To) + if isMsgSnap(m) { + p.r.ReportSnapshot(m.To, raft.SnapshotFailure) + } + if p.status.isActive() { + plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name) + } + plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) } } @@ -219,10 +191,7 @@ func (p *peer) sendSnap(m snap.Message) { } func (p *peer) update(urls types.URLs) { - select { - case p.newURLsC <- urls: - case <-p.done: - } + p.picker.update(urls) } func (p *peer) attachOutgoingConn(conn *outgoingConn) { @@ -245,23 +214,27 @@ func (p *peer) activeSince() time.Time { return p.status.activeSince } // Pause pauses the peer. The peer will simply drops all incoming // messages without returning an error. func (p *peer) Pause() { - select { - case p.pausec <- struct{}{}: - case <-p.done: - } + p.mu.Lock() + defer p.mu.Unlock() + p.paused = true } // Resume resumes a paused peer. func (p *peer) Resume() { - select { - case p.resumec <- struct{}{}: - case <-p.done: - } + p.mu.Lock() + defer p.mu.Unlock() + p.paused = false } func (p *peer) stop() { close(p.stopc) - <-p.done + p.cancel() + p.msgAppV2Writer.stop() + p.writer.stop() + p.pipeline.stop() + p.snapSender.stop() + p.msgAppV2Reader.stop() + p.msgAppReader.stop() } // pick picks a chan for sending the given message. The picked chan and the picked chan