diff --git a/raft/raft.go b/raft/raft.go index c62858369..dced6910c 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -298,10 +298,13 @@ func (r *raft) reset(term uint64) { r.pendingConf = false } -func (r *raft) appendEntry(e pb.Entry) { - e.Term = r.Term - e.Index = r.raftLog.lastIndex() + 1 - r.raftLog.append(e) +func (r *raft) appendEntry(es ...pb.Entry) { + li := r.raftLog.lastIndex() + for i := range es { + es[i].Term = r.Term + es[i].Index = li + 1 + uint64(i) + } + r.raftLog.append(es...) r.prs[r.id].update(r.raftLog.lastIndex()) r.maybeCommit() } @@ -444,17 +447,18 @@ func stepLeader(r *raft, m pb.Message) { case pb.MsgBeat: r.bcastHeartbeat() case pb.MsgProp: - if len(m.Entries) != 1 { - panic("unexpected length(entries) of a MsgProp") + if len(m.Entries) == 0 { + log.Panicf("raft: %x stepped empty MsgProp", r.id) } - e := m.Entries[0] - if e.Type == pb.EntryConfChange { - if r.pendingConf { - return + for i, e := range m.Entries { + if e.Type == pb.EntryConfChange { + if r.pendingConf { + m.Entries[i] = pb.Entry{Type: pb.EntryNormal} + } + r.pendingConf = true } - r.pendingConf = true } - r.appendEntry(e) + r.appendEntry(m.Entries...) r.bcastAppend() case pb.MsgAppResp: if m.Reject { diff --git a/raft/raft_test.go b/raft/raft_test.go index ca6a5eb6d..0b1bcb2c6 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1245,8 +1245,8 @@ func TestStepConfig(t *testing.T) { } // TestStepIgnoreConfig tests that if raft step the second msgProp in -// EntryConfChange type when the first one is uncommitted, the node will deny -// the proposal and keep its original state. +// EntryConfChange type when the first one is uncommitted, the node will set +// the proposal to noop and keep its original state. func TestStepIgnoreConfig(t *testing.T) { // a raft that cannot make progress r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage()) @@ -1256,8 +1256,9 @@ func TestStepIgnoreConfig(t *testing.T) { index := r.raftLog.lastIndex() pendingConf := r.pendingConf r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}}) - if g := r.raftLog.lastIndex(); g != index { - t.Errorf("index = %d, want %d", g, index) + wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}} + if ents := r.raftLog.entries(index + 1); !reflect.DeepEqual(ents, wents) { + t.Errorf("ents = %+v, want %+v", ents, wents) } if r.pendingConf != pendingConf { t.Errorf("pendingConf = %v, want %v", r.pendingConf, pendingConf) diff --git a/rafthttp/batcher.go b/rafthttp/batcher.go index e9c501e4a..7babcb9ba 100644 --- a/rafthttp/batcher.go +++ b/rafthttp/batcher.go @@ -6,6 +6,10 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) +var ( + emptyMsgProp = raftpb.Message{Type: raftpb.MsgProp} +) + type Batcher struct { batchedN int batchedT time.Time @@ -39,3 +43,30 @@ func (b *Batcher) Reset(t time.Time) { func canBatch(m raftpb.Message) bool { return m.Type == raftpb.MsgAppResp && m.Reject == false } + +type ProposalBatcher struct { + *Batcher + raftpb.Message +} + +func NewProposalBatcher(n int, d time.Duration) *ProposalBatcher { + return &ProposalBatcher{ + Batcher: NewBatcher(n, d), + Message: emptyMsgProp, + } +} + +func (b *ProposalBatcher) Batch(m raftpb.Message) { + b.Message.From = m.From + b.Message.To = m.To + b.Message.Entries = append(b.Message.Entries, m.Entries...) +} + +func (b *ProposalBatcher) IsEmpty() bool { + return len(b.Message.Entries) == 0 +} + +func (b *ProposalBatcher) Reset(t time.Time) { + b.Batcher.Reset(t) + b.Message = emptyMsgProp +} diff --git a/rafthttp/sender.go b/rafthttp/sender.go index e3c558dbe..e18291607 100644 --- a/rafthttp/sender.go +++ b/rafthttp/sender.go @@ -39,6 +39,7 @@ const ( senderBufSize = 64 appRespBatchMs = 50 + propBatchMs = 10 ConnReadTimeout = 5 * time.Second ConnWriteTimeout = 5 * time.Second @@ -66,14 +67,15 @@ type Sender interface { func NewSender(tr http.RoundTripper, u string, cid types.ID, p Processor, fs *stats.FollowerStats, shouldstop chan struct{}) *sender { s := &sender{ - tr: tr, - u: u, - cid: cid, - p: p, - fs: fs, - shouldstop: shouldstop, - batcher: NewBatcher(100, appRespBatchMs*time.Millisecond), - q: make(chan []byte, senderBufSize), + tr: tr, + u: u, + cid: cid, + p: p, + fs: fs, + shouldstop: shouldstop, + batcher: NewBatcher(100, appRespBatchMs*time.Millisecond), + propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond), + q: make(chan []byte, senderBufSize), } s.wg.Add(connPerSender) for i := 0; i < connPerSender; i++ { @@ -90,11 +92,12 @@ type sender struct { fs *stats.FollowerStats shouldstop chan struct{} - strmCln *streamClient - batcher *Batcher - strmSrv *streamServer - strmSrvMu sync.Mutex - q chan []byte + strmCln *streamClient + batcher *Batcher + propBatcher *ProposalBatcher + strmSrv *streamServer + strmSrvMu sync.Mutex + q chan []byte paused bool mu sync.RWMutex @@ -136,16 +139,37 @@ func (s *sender) Send(m raftpb.Message) error { s.initStream(types.ID(m.From), types.ID(m.To), m.Term) s.batcher.Reset(time.Now()) } - if canBatch(m) && s.hasStreamClient() { - if s.batcher.ShouldBatch(time.Now()) { - return nil - } - } - if canUseStream(m) { - if ok := s.tryStream(m); ok { - return nil + + var err error + switch { + case isProposal(m): + s.propBatcher.Batch(m) + case canBatch(m) && s.hasStreamClient(): + if !s.batcher.ShouldBatch(time.Now()) { + err = s.send(m) + } + case canUseStream(m): + if ok := s.tryStream(m); !ok { + err = s.send(m) + } + default: + err = s.send(m) + } + // send out batched MsgProp if needed + // TODO: it is triggered by all outcoming send now, and it needs + // more clear solution. Either use separate goroutine to trigger it + // or use streaming. + if !s.propBatcher.IsEmpty() { + t := time.Now() + if !s.propBatcher.ShouldBatch(t) { + s.send(s.propBatcher.Message) + s.propBatcher.Reset(t) } } + return err +} + +func (s *sender) send(m raftpb.Message) error { // TODO: don't block. we should be able to have 1000s // of messages out at a time. data := pbutil.MustMarshal(&m) @@ -282,3 +306,5 @@ func (s *sender) post(data []byte) error { return fmt.Errorf("unhandled status %s", http.StatusText(resp.StatusCode)) } } + +func isProposal(m raftpb.Message) bool { return m.Type == raftpb.MsgProp }