Merge pull request #2459 from yichengq/335

rafthttp: use dedicated go-routine for MsgProp process
This commit is contained in:
Yicheng Qin 2015-03-09 14:17:28 -07:00
commit 4e525e63a4
3 changed files with 67 additions and 16 deletions

View File

@ -38,6 +38,13 @@ const (
ConnWriteTimeout = 5 * time.Second ConnWriteTimeout = 5 * time.Second
recvBufSize = 4096 recvBufSize = 4096
// maxPendingProposals holds the proposals during one leader election process.
// Generally one leader election takes at most 1 sec. It should have
// 0-2 election conflicts, and each one takes 0.5 sec.
// We assume the number of concurrent proposers is smaller than 4096.
// One client blocks on its proposal for at least 1 sec, so 4096 is enough
// to hold all proposals.
maxPendingProposals = 4096
streamApp = "streamMsgApp" streamApp = "streamMsgApp"
streamMsg = "streamMsg" streamMsg = "streamMsg"
@ -91,6 +98,7 @@ type peer struct {
sendc chan raftpb.Message sendc chan raftpb.Message
recvc chan raftpb.Message recvc chan raftpb.Message
propc chan raftpb.Message
newURLsC chan types.URLs newURLsC chan types.URLs
// for testing // for testing
@ -110,16 +118,34 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc), pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc),
sendc: make(chan raftpb.Message), sendc: make(chan raftpb.Message),
recvc: make(chan raftpb.Message, recvBufSize), recvc: make(chan raftpb.Message, recvBufSize),
propc: make(chan raftpb.Message, maxPendingProposals),
newURLsC: make(chan types.URLs), newURLsC: make(chan types.URLs),
pausec: make(chan struct{}), pausec: make(chan struct{}),
resumec: make(chan struct{}), resumec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
// Use go-routine for process of MsgProp because it is
// blocking when there is no leader.
ctx, cancel := context.WithCancel(context.Background())
go func() {
for {
select {
case mm := <-p.propc:
if err := r.Process(ctx, mm); err != nil {
log.Printf("peer: process raft message error: %v", err)
}
case <-p.stopc:
return
}
}
}()
go func() { go func() {
var paused bool var paused bool
msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc) msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc, p.propc)
reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc) reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc)
for { for {
select { select {
case m := <-p.sendc: case m := <-p.sendc:
@ -147,6 +173,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
case <-p.resumec: case <-p.resumec:
paused = false paused = false
case <-p.stopc: case <-p.stopc:
cancel()
p.msgAppWriter.stop() p.msgAppWriter.stop()
p.writer.stop() p.writer.stop()
p.pipeline.stop() p.pipeline.stop()

View File

@ -199,6 +199,7 @@ type streamReader struct {
from, to types.ID from, to types.ID
cid types.ID cid types.ID
recvc chan<- raftpb.Message recvc chan<- raftpb.Message
propc chan<- raftpb.Message
mu sync.Mutex mu sync.Mutex
msgAppTerm uint64 msgAppTerm uint64
@ -208,7 +209,7 @@ type streamReader struct {
done chan struct{} done chan struct{}
} }
func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message) *streamReader { func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, from, to, cid types.ID, recvc chan<- raftpb.Message, propc chan<- raftpb.Message) *streamReader {
r := &streamReader{ r := &streamReader{
tr: tr, tr: tr,
picker: picker, picker: picker,
@ -217,6 +218,7 @@ func startStreamReader(tr http.RoundTripper, picker *urlPicker, t streamType, fr
to: to, to: to,
cid: cid, cid: cid,
recvc: recvc, recvc: recvc,
propc: propc,
stopc: make(chan struct{}), stopc: make(chan struct{}),
done: make(chan struct{}), done: make(chan struct{}),
} }
@ -271,8 +273,12 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
case isLinkHeartbeatMessage(m): case isLinkHeartbeatMessage(m):
// do nothing for linkHeartbeatMessage // do nothing for linkHeartbeatMessage
default: default:
recvc := cr.recvc
if m.Type == raftpb.MsgProp {
recvc = cr.propc
}
select { select {
case cr.recvc <- m: case recvc <- m:
default: default:
log.Printf("rafthttp: dropping %s from %x because receive buffer is blocked", log.Printf("rafthttp: dropping %s from %x because receive buffer is blocked",
m.Type, m.From) m.Type, m.From)

View File

@ -6,6 +6,7 @@ import (
"net/http/httptest" "net/http/httptest"
"reflect" "reflect"
"testing" "testing"
"time"
"github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/testutil"
@ -154,28 +155,41 @@ func TestStreamReaderDialResult(t *testing.T) {
// TestStream tests that streamReader and streamWriter can build stream to // TestStream tests that streamReader and streamWriter can build stream to
// send messages between each other. // send messages between each other.
func TestStream(t *testing.T) { func TestStream(t *testing.T) {
recvc := make(chan raftpb.Message)
propc := make(chan raftpb.Message)
msgapp := raftpb.Message{
Type: raftpb.MsgApp,
From: 2,
To: 1,
Term: 1,
LogTerm: 1,
Index: 3,
Entries: []raftpb.Entry{{Term: 1, Index: 4}},
}
tests := []struct { tests := []struct {
t streamType t streamType
term uint64 term uint64
m raftpb.Message m raftpb.Message
wc chan raftpb.Message
}{ }{
{ {
streamTypeMessage, streamTypeMessage,
0, 0,
raftpb.Message{Type: raftpb.MsgProp, To: 2}, raftpb.Message{Type: raftpb.MsgProp, To: 2},
propc,
},
{
streamTypeMessage,
0,
msgapp,
recvc,
}, },
{ {
streamTypeMsgApp, streamTypeMsgApp,
1, 1,
raftpb.Message{ msgapp,
Type: raftpb.MsgApp, recvc,
From: 2,
To: 1,
Term: 1,
LogTerm: 1,
Index: 3,
Entries: []raftpb.Entry{{Term: 1, Index: 4}},
},
}, },
} }
for i, tt := range tests { for i, tt := range tests {
@ -187,16 +201,20 @@ func TestStream(t *testing.T) {
defer sw.stop() defer sw.stop()
h.sw = sw h.sw = sw
recvc := make(chan raftpb.Message)
picker := mustNewURLPicker(t, []string{srv.URL}) picker := mustNewURLPicker(t, []string{srv.URL})
sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc) sr := startStreamReader(&http.Transport{}, picker, tt.t, types.ID(1), types.ID(2), types.ID(1), recvc, propc)
defer sr.stop() defer sr.stop()
if tt.t == streamTypeMsgApp { if tt.t == streamTypeMsgApp {
sr.updateMsgAppTerm(tt.term) sr.updateMsgAppTerm(tt.term)
} }
sw.msgc <- tt.m sw.msgc <- tt.m
m := <-recvc var m raftpb.Message
select {
case m = <-tt.wc:
case <-time.After(time.Second):
t.Errorf("#%d: failed to receive message from the channel", i)
}
if !reflect.DeepEqual(m, tt.m) { if !reflect.DeepEqual(m, tt.m) {
t.Errorf("#%d: message = %+v, want %+v", i, m, tt.m) t.Errorf("#%d: message = %+v, want %+v", i, m, tt.m)
} }