From 51397a64237021b72d611bb40f18184c6599b935 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Sat, 7 Mar 2015 09:21:45 -0800 Subject: [PATCH] rafthttp: use go-routine for MsgProp processing MsgProp process is blocking when there is no leader, which blocks the peer loop totally. --- rafthttp/peer.go | 31 ++++++++++++++++++++++++++++-- rafthttp/stream.go | 10 ++++++++-- rafthttp/stream_test.go | 42 +++++++++++++++++++++++++++++------------ 3 files changed, 67 insertions(+), 16 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 42abc4f00..ee8f14ba2 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -38,6 +38,13 @@ const ( ConnWriteTimeout = 5 * time.Second recvBufSize = 4096 + // maxPendingProposals holds the proposals during one leader election process. + // Generally one leader election takes at most 1 sec. It should have + // 0-2 election conflicts, and each one takes 0.5 sec. + // We assume the number of concurrent proposers is smaller than 4096. + // One client blocks on its proposal for at least 1 sec, so 4096 is enough + // to hold all proposals. + maxPendingProposals = 4096 streamApp = "streamMsgApp" streamMsg = "streamMsg" @@ -91,6 +98,7 @@ type peer struct { sendc chan raftpb.Message recvc chan raftpb.Message + propc chan raftpb.Message newURLsC chan types.URLs // for testing @@ -110,16 +118,34 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), + propc: make(chan raftpb.Message, maxPendingProposals), newURLsC: make(chan types.URLs), pausec: make(chan struct{}), resumec: make(chan struct{}), stopc: make(chan struct{}), done: make(chan struct{}), } + + // Use go-routine for process of MsgProp because it is + // blocking when there is no leader. + ctx, cancel := context.WithCancel(context.Background()) + go func() { + for { + select { + case mm := <-p.propc: + if err := r.Process(ctx, mm); err != nil { + log.Printf("peer: process raft message error: %v", err) + } + case <-p.stopc: + return + } + } + }() + go func() { var paused bool - msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc) - reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc) + msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc, p.propc) + reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc) for { select { case m := <-p.sendc: @@ -147,6 +173,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r case <-p.resumec: paused = false case <-p.stopc: + cancel() p.msgAppWriter.stop() p.writer.stop() p.pipeline.stop() diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 509c2c724..9f0705b55 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -199,6 +199,7 @@ type streamReader struct { from, to types.ID cid types.ID recvc chan<- raftpb.Message + propc chan<- raftpb.Message mu sync.Mutex msgAppTerm uint64 @@ -208,7 +209,7 @@ type streamReader struct { done chan struct{} } -func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message) *streamReader { +func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message, propc chan<- raftpb.Message) *streamReader { r := &streamReader{ tr: tr, picker: picker, @@ -217,6 +218,7 @@ func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, fr to: to, cid: cid, recvc: recvc, + propc: propc, stopc: make(chan struct{}), done: make(chan struct{}), } @@ -271,8 +273,12 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error { case isLinkHeartbeatMessage(m): // do nothing for linkHeartbeatMessage default: + recvc := cr.recvc + if m.Type == raftpb.MsgProp { + recvc = cr.propc + } select { - case cr.recvc <- m: + case recvc <- m: default: log.Printf("rafthttp: dropping %s from %x because receive buffer is blocked", m.Type, m.From) diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index a8c284226..4f82b767c 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -6,6 +6,7 @@ import ( "net/http/httptest" "reflect" "testing" + "time" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" @@ -154,28 +155,41 @@ func TestStreamReaderDialResult(t *testing.T) { // TestStream tests that streamReader and streamWriter can build stream to // send messages between each other. func TestStream(t *testing.T) { + recvc := make(chan raftpb.Message) + propc := make(chan raftpb.Message) + msgapp := raftpb.Message{ + Type: raftpb.MsgApp, + From: 2, + To: 1, + Term: 1, + LogTerm: 1, + Index: 3, + Entries: []raftpb.Entry{{Term: 1, Index: 4}}, + } + tests := []struct { t streamType term uint64 m raftpb.Message + wc chan raftpb.Message }{ { streamTypeMessage, 0, raftpb.Message{Type: raftpb.MsgProp, To: 2}, + propc, + }, + { + streamTypeMessage, + 0, + msgapp, + recvc, }, { streamTypeMsgApp, 1, - raftpb.Message{ - Type: raftpb.MsgApp, - From: 2, - To: 1, - Term: 1, - LogTerm: 1, - Index: 3, - Entries: []raftpb.Entry{{Term: 1, Index: 4}}, - }, + msgapp, + recvc, }, } for i, tt := range tests { @@ -187,16 +201,20 @@ func TestStream(t *testing.T) { defer sw.stop() h.sw = sw - recvc := make(chan raftpb.Message) picker := mustNewURLPicker(t, []string{srv.URL}) - sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc) + sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc, propc) defer sr.stop() if tt.t == streamTypeMsgApp { sr.updateMsgAppTerm(tt.term) } sw.msgc <- tt.m - m := <-recvc + var m raftpb.Message + select { + case m = <-tt.wc: + case <-time.After(time.Second): + t.Errorf("#%d: failed to receive message from the channel", i) + } if !reflect.DeepEqual(m, tt.m) { t.Errorf("#%d: message = %+v, want %+v", i, m, tt.m) }