From 0d88e0d111ac29fd2371ce6ed65e23604937b57d Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 2 Apr 2015 21:55:13 -0700 Subject: [PATCH] rafthttp: introduce msgappv2 stream format msgappv2 stream is used to send all MsgApp, and replaces the functionality of msgapp stream. Compared to v1, it has several advantanges: 1. The output message is exactly the same with the input one, which cannot be done in v1. 2. It uses one connection to stream persistently, which prevents message reorder and saves the time to request stream. 3. It transmits 10 addiontional bytes in the procedure of committing one proposal, which is trivia for idle time. 4. It transmits less bytes when committing mutliple proposals or keep committing proposals. --- rafthttp/http.go | 8 +- rafthttp/http_test.go | 2 +- rafthttp/msgappv2.go | 192 +++++++++++++++++++++++++++++++ rafthttp/msgappv2_test.go | 123 ++++++++++++++++++++ rafthttp/peer.go | 6 +- rafthttp/stream.go | 37 +++--- rafthttp/stream_test.go | 17 +-- rafthttp/transport_bench_test.go | 37 ++++-- 8 files changed, 386 insertions(+), 36 deletions(-) create mode 100644 rafthttp/msgappv2.go create mode 100644 rafthttp/msgappv2_test.go diff --git a/rafthttp/http.go b/rafthttp/http.go index 70baedf33..ce221a5de 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -125,13 +125,13 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var t streamType switch path.Dir(r.URL.Path) { - case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)): - t = streamTypeMsgApp - case path.Join(RaftStreamPrefix, string(streamTypeMessage)): - t = streamTypeMessage // backward compatibility case RaftStreamPrefix: t = streamTypeMsgApp + case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)): + t = streamTypeMsgAppV2 + case path.Join(RaftStreamPrefix, string(streamTypeMessage)): + t = streamTypeMessage default: log.Printf("rafthttp: ignored unexpected streaming request path %s", r.URL.Path) http.Error(w, "invalid path", http.StatusNotFound) diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index b152431ef..95f0a78b4 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -165,7 +165,7 @@ func TestServeRaftStreamPrefix(t *testing.T) { }, { RaftStreamPrefix + "/msgapp/1", - streamTypeMsgApp, + streamTypeMsgAppV2, }, // backward compatibility { diff --git a/rafthttp/msgappv2.go b/rafthttp/msgappv2.go new file mode 100644 index 000000000..7c4395af6 --- /dev/null +++ b/rafthttp/msgappv2.go @@ -0,0 +1,192 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "encoding/binary" + "fmt" + "io" + "time" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" +) + +const ( + msgTypeLinkHeartbeat uint8 = 0 + msgTypeAppEntries uint8 = 1 + msgTypeApp uint8 = 2 +) + +// msgappv2 stream sends three types of message: linkHeartbeatMessage, +// AppEntries and MsgApp. AppEntries is the MsgApp that is sent in +// replicate state in raft, whose index and term are fully predicatable. +// +// Data format of linkHeartbeatMessage: +// | offset | bytes | description | +// +--------+-------+-------------+ +// | 0 | 1 | \x00 | +// +// Data format of AppEntries: +// | offset | bytes | description | +// +--------+-------+-------------+ +// | 0 | 1 | \x01 | +// | 1 | 8 | length of entries | +// | 9 | 8 | length of first entry | +// | 17 | n1 | first entry | +// ... +// | x | 8 | length of k-th entry data | +// | x+8 | nk | k-th entry data | +// | x+8+nk | 8 | commit index | +// +// Data format of MsgApp: +// | offset | bytes | description | +// +--------+-------+-------------+ +// | 0 | 1 | \x01 | +// | 1 | 8 | length of encoded message | +// | 9 | n | encoded message | +type msgAppV2Encoder struct { + w io.Writer + fs *stats.FollowerStats + + term uint64 + index uint64 +} + +func (enc *msgAppV2Encoder) encode(m raftpb.Message) error { + start := time.Now() + switch { + case isLinkHeartbeatMessage(m): + return binary.Write(enc.w, binary.BigEndian, msgTypeLinkHeartbeat) + case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term: + if err := binary.Write(enc.w, binary.BigEndian, msgTypeAppEntries); err != nil { + return err + } + // write length of entries + l := len(m.Entries) + if err := binary.Write(enc.w, binary.BigEndian, uint64(l)); err != nil { + return err + } + for i := 0; i < l; i++ { + size := m.Entries[i].Size() + if err := binary.Write(enc.w, binary.BigEndian, uint64(size)); err != nil { + return err + } + if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil { + return err + } + enc.index++ + } + // write commit index + if err := binary.Write(enc.w, binary.BigEndian, m.Commit); err != nil { + return err + } + default: + if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil { + return err + } + // write size of message + if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil { + return err + } + // write message + if _, err := enc.w.Write(pbutil.MustMarshal(&m)); err != nil { + return err + } + + enc.term = m.Term + enc.index = m.Index + if l := len(m.Entries); l > 0 { + enc.index = m.Entries[l-1].Index + } + } + enc.fs.Succ(time.Since(start)) + return nil +} + +type msgAppV2Decoder struct { + r io.Reader + local, remote types.ID + + term uint64 + index uint64 +} + +func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) { + var ( + m raftpb.Message + typ uint8 + ) + if err := binary.Read(dec.r, binary.BigEndian, &typ); err != nil { + return m, err + } + switch typ { + case msgTypeLinkHeartbeat: + return linkHeartbeatMessage, nil + case msgTypeAppEntries: + m = raftpb.Message{ + Type: raftpb.MsgApp, + From: uint64(dec.remote), + To: uint64(dec.local), + Term: dec.term, + LogTerm: dec.term, + Index: dec.index, + } + + // decode entries + var l uint64 + if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil { + return m, err + } + m.Entries = make([]raftpb.Entry, int(l)) + for i := 0; i < int(l); i++ { + var size uint64 + if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil { + return m, err + } + buf := make([]byte, int(size)) + if _, err := io.ReadFull(dec.r, buf); err != nil { + return m, err + } + dec.index++ + pbutil.MustUnmarshal(&m.Entries[i], buf) + } + // decode commit index + if err := binary.Read(dec.r, binary.BigEndian, &m.Commit); err != nil { + return m, err + } + case msgTypeApp: + var size uint64 + if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil { + return m, err + } + buf := make([]byte, int(size)) + if _, err := io.ReadFull(dec.r, buf); err != nil { + return m, err + } + pbutil.MustUnmarshal(&m, buf) + + dec.term = m.Term + dec.index = m.Index + if l := len(m.Entries); l > 0 { + dec.index = m.Entries[l-1].Index + } + default: + return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ) + } + return m, nil +} diff --git a/rafthttp/msgappv2_test.go b/rafthttp/msgappv2_test.go new file mode 100644 index 000000000..66f003880 --- /dev/null +++ b/rafthttp/msgappv2_test.go @@ -0,0 +1,123 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "bytes" + "reflect" + "testing" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" +) + +func TestMsgAppV2(t *testing.T) { + tests := []raftpb.Message{ + linkHeartbeatMessage, + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 1, + LogTerm: 1, + Index: 0, + Entries: []raftpb.Entry{ + {Term: 1, Index: 1, Data: []byte("some data")}, + {Term: 1, Index: 2, Data: []byte("some data")}, + {Term: 1, Index: 3, Data: []byte("some data")}, + }, + }, + // consecutive MsgApp + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 1, + LogTerm: 1, + Index: 3, + Entries: []raftpb.Entry{ + {Term: 1, Index: 4, Data: []byte("some data")}, + }, + }, + linkHeartbeatMessage, + // consecutive MsgApp after linkHeartbeatMessage + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 1, + LogTerm: 1, + Index: 4, + Entries: []raftpb.Entry{ + {Term: 1, Index: 5, Data: []byte("some data")}, + }, + }, + // MsgApp with higher term + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 3, + LogTerm: 1, + Index: 5, + Entries: []raftpb.Entry{ + {Term: 3, Index: 6, Data: []byte("some data")}, + }, + }, + linkHeartbeatMessage, + // consecutive MsgApp + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 3, + LogTerm: 2, + Index: 6, + Entries: []raftpb.Entry{ + {Term: 3, Index: 7, Data: []byte("some data")}, + }, + }, + // consecutive empty MsgApp + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 3, + LogTerm: 2, + Index: 7, + Entries: nil, + }, + linkHeartbeatMessage, + } + b := &bytes.Buffer{} + enc := &msgAppV2Encoder{w: b, fs: &stats.FollowerStats{}} + dec := &msgAppV2Decoder{r: b, local: types.ID(2), remote: types.ID(1)} + + for i, tt := range tests { + if err := enc.encode(tt); err != nil { + t.Errorf("#%d: unexpected encode message error: %v", i, err) + continue + } + m, err := dec.decode() + if err != nil { + t.Errorf("#%d: unexpected decode message error: %v", i, err) + continue + } + if !reflect.DeepEqual(m, tt) { + t.Errorf("#%d: message = %+v, want %+v", i, m, tt) + } + } +} diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 399b90670..e475e05c0 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -48,6 +48,7 @@ const ( maxPendingProposals = 4096 streamApp = "streamMsgApp" + streamAppV2 = "streamMsgAppV2" streamMsg = "streamMsg" pipelineMsg = "pipeline" ) @@ -55,6 +56,7 @@ const ( var ( bufSizeMap = map[string]int{ streamApp: streamBufSize, + streamAppV2: streamBufSize, streamMsg: streamBufSize, pipelineMsg: pipelineBufSize, } @@ -147,7 +149,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r go func() { var paused bool - msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc, p.propc) + msgAppReader := startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc) reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc) for { select { @@ -212,7 +214,7 @@ func (p *peer) Update(urls types.URLs) { func (p *peer) attachOutgoingConn(conn *outgoingConn) { var ok bool switch conn.t { - case streamTypeMsgApp: + case streamTypeMsgApp, streamTypeMsgAppV2: ok = p.msgAppWriter.attach(conn) case streamTypeMessage: ok = p.writer.attach(conn) diff --git a/rafthttp/stream.go b/rafthttp/stream.go index a8e213c84..1ca25bab8 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -30,15 +30,30 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) -type streamType string - const ( - streamTypeMessage streamType = "message" - streamTypeMsgApp streamType = "msgapp" + streamTypeMessage streamType = "message" + streamTypeMsgAppV2 streamType = "msgappv2" + streamTypeMsgApp streamType = "msgapp" streamBufSize = 4096 ) +type streamType string + +func (t streamType) endpoint() string { + switch t { + case streamTypeMsgApp: // for backward compatibility of v2.0 + return RaftStreamPrefix + case streamTypeMsgAppV2: + return path.Join(RaftStreamPrefix, "msgapp") + case streamTypeMessage: + return path.Join(RaftStreamPrefix, "message") + default: + log.Panicf("rafthttp: unhandled stream type %v", t) + return "" + } +} + var ( // linkHeartbeatMessage is a special message used as heartbeat message in // link layer. It never conflicts with messages from raft because raft @@ -146,6 +161,8 @@ func (cw *streamWriter) run() { log.Panicf("rafthttp: unexpected parse term %s error: %v", conn.termStr, err) } enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs} + case streamTypeMsgAppV2: + enc = &msgAppV2Encoder{w: conn.Writer, fs: cw.fs} case streamTypeMessage: enc = &messageEncoder{w: conn.Writer} default: @@ -263,6 +280,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error { switch cr.t { case streamTypeMsgApp: dec = &msgAppDecoder{r: rc, local: cr.from, remote: cr.to, term: cr.msgAppTerm} + case streamTypeMsgAppV2: + dec = &msgAppV2Decoder{r: rc, local: cr.from, remote: cr.to} case streamTypeMessage: dec = &messageDecoder{r: rc} default: @@ -329,15 +348,7 @@ func (cr *streamReader) dial() (io.ReadCloser, error) { cr.mu.Unlock() uu := u - switch cr.t { - case streamTypeMsgApp: - // for backward compatibility of v2.0 - uu.Path = path.Join(RaftStreamPrefix, cr.from.String()) - case streamTypeMessage: - uu.Path = path.Join(RaftStreamPrefix, string(streamTypeMessage), cr.from.String()) - default: - log.Panicf("rafthttp: unhandled stream type %v", cr.t) - } + uu.Path = path.Join(cr.t.endpoint(), cr.from.String()) req, err := http.NewRequest("GET", uu.String(), nil) if err != nil { cr.picker.unreachable(u) diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index dba5ab8b8..66fe6fef3 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -2,6 +2,7 @@ package rafthttp import ( "errors" + "fmt" "net/http" "net/http/httptest" "reflect" @@ -81,7 +82,7 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { } func TestStreamReaderDialRequest(t *testing.T) { - for i, tt := range []streamType{streamTypeMsgApp, streamTypeMessage} { + for i, tt := range []streamType{streamTypeMsgApp, streamTypeMessage, streamTypeMsgAppV2} { tr := &roundTripperRecorder{} sr := &streamReader{ tr: tr, @@ -95,13 +96,7 @@ func TestStreamReaderDialRequest(t *testing.T) { sr.dial() req := tr.Request() - var wurl string - switch tt { - case streamTypeMsgApp: - wurl = "http://localhost:7001/raft/stream/1" - case streamTypeMessage: - wurl = "http://localhost:7001/raft/stream/message/1" - } + wurl := fmt.Sprintf("http://localhost:7001" + tt.endpoint() + "/1") if req.URL.String() != wurl { t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl) } @@ -191,6 +186,12 @@ func TestStream(t *testing.T) { msgapp, recvc, }, + { + streamTypeMsgAppV2, + 0, + msgapp, + recvc, + }, } for i, tt := range tests { h := &fakeStreamHandler{t: tt.t} diff --git a/rafthttp/transport_bench_test.go b/rafthttp/transport_bench_test.go index 452d11594..f5d14119b 100644 --- a/rafthttp/transport_bench_test.go +++ b/rafthttp/transport_bench_test.go @@ -29,16 +29,24 @@ import ( ) func BenchmarkSendingMsgApp(b *testing.B) { - r := &countRaft{} - ss := &stats.ServerStats{} - ss.Initialize() - tr := NewTransporter(&http.Transport{}, types.ID(1), types.ID(1), r, nil, ss, stats.NewLeaderStats("1")) + // member 1 + tr := NewTransporter(&http.Transport{}, types.ID(1), types.ID(1), &fakeRaft{}, nil, newServerStats(), stats.NewLeaderStats("1")) srv := httptest.NewServer(tr.Handler()) defer srv.Close() - tr.AddPeer(types.ID(1), []string{srv.URL}) + + // member 2 + r := &countRaft{} + tr2 := NewTransporter(&http.Transport{}, types.ID(2), types.ID(1), r, nil, newServerStats(), stats.NewLeaderStats("2")) + srv2 := httptest.NewServer(tr2.Handler()) + defer srv2.Close() + + tr.AddPeer(types.ID(2), []string{srv2.URL}) defer tr.Stop() - // wait for underlying stream created - time.Sleep(time.Second) + tr2.AddPeer(types.ID(1), []string{srv.URL}) + defer tr2.Stop() + if !waitStreamWorking(tr.(*transport).Get(types.ID(2)).(*peer)) { + b.Fatalf("stream from 1 to 2 is not in work as expected") + } b.ReportAllocs() b.SetBytes(64) @@ -46,7 +54,20 @@ func BenchmarkSendingMsgApp(b *testing.B) { b.ResetTimer() data := make([]byte, 64) for i := 0; i < b.N; i++ { - tr.Send([]raftpb.Message{{Type: raftpb.MsgApp, To: 1, Entries: []raftpb.Entry{{Data: data}}}}) + tr.Send([]raftpb.Message{ + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Index: uint64(i), + Entries: []raftpb.Entry{ + { + Index: uint64(i + 1), + Data: data, + }, + }, + }, + }) } // wait until all messages are received by the target raft for r.count() != b.N {