diff --git a/rafthttp/http.go b/rafthttp/http.go index 70baedf33..ce221a5de 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -125,13 +125,13 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var t streamType switch path.Dir(r.URL.Path) { - case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)): - t = streamTypeMsgApp - case path.Join(RaftStreamPrefix, string(streamTypeMessage)): - t = streamTypeMessage // backward compatibility case RaftStreamPrefix: t = streamTypeMsgApp + case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)): + t = streamTypeMsgAppV2 + case path.Join(RaftStreamPrefix, string(streamTypeMessage)): + t = streamTypeMessage default: log.Printf("rafthttp: ignored unexpected streaming request path %s", r.URL.Path) http.Error(w, "invalid path", http.StatusNotFound) diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index b152431ef..95f0a78b4 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -165,7 +165,7 @@ func TestServeRaftStreamPrefix(t *testing.T) { }, { RaftStreamPrefix + "/msgapp/1", - streamTypeMsgApp, + streamTypeMsgAppV2, }, // backward compatibility { diff --git a/rafthttp/msgappv2.go b/rafthttp/msgappv2.go new file mode 100644 index 000000000..7c4395af6 --- /dev/null +++ b/rafthttp/msgappv2.go @@ -0,0 +1,192 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "encoding/binary" + "fmt" + "io" + "time" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/pbutil" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" +) + +const ( + msgTypeLinkHeartbeat uint8 = 0 + msgTypeAppEntries uint8 = 1 + msgTypeApp uint8 = 2 +) + +// msgappv2 stream sends three types of message: linkHeartbeatMessage, +// AppEntries and MsgApp. AppEntries is the MsgApp that is sent in +// replicate state in raft, whose index and term are fully predicatable. +// +// Data format of linkHeartbeatMessage: +// | offset | bytes | description | +// +--------+-------+-------------+ +// | 0 | 1 | \x00 | +// +// Data format of AppEntries: +// | offset | bytes | description | +// +--------+-------+-------------+ +// | 0 | 1 | \x01 | +// | 1 | 8 | length of entries | +// | 9 | 8 | length of first entry | +// | 17 | n1 | first entry | +// ... +// | x | 8 | length of k-th entry data | +// | x+8 | nk | k-th entry data | +// | x+8+nk | 8 | commit index | +// +// Data format of MsgApp: +// | offset | bytes | description | +// +--------+-------+-------------+ +// | 0 | 1 | \x01 | +// | 1 | 8 | length of encoded message | +// | 9 | n | encoded message | +type msgAppV2Encoder struct { + w io.Writer + fs *stats.FollowerStats + + term uint64 + index uint64 +} + +func (enc *msgAppV2Encoder) encode(m raftpb.Message) error { + start := time.Now() + switch { + case isLinkHeartbeatMessage(m): + return binary.Write(enc.w, binary.BigEndian, msgTypeLinkHeartbeat) + case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term: + if err := binary.Write(enc.w, binary.BigEndian, msgTypeAppEntries); err != nil { + return err + } + // write length of entries + l := len(m.Entries) + if err := binary.Write(enc.w, binary.BigEndian, uint64(l)); err != nil { + return err + } + for i := 0; i < l; i++ { + size := m.Entries[i].Size() + if err := binary.Write(enc.w, binary.BigEndian, uint64(size)); err != nil { + return err + } + if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil { + return err + } + enc.index++ + } + // write commit index + if err := binary.Write(enc.w, binary.BigEndian, m.Commit); err != nil { + return err + } + default: + if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil { + return err + } + // write size of message + if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil { + return err + } + // write message + if _, err := enc.w.Write(pbutil.MustMarshal(&m)); err != nil { + return err + } + + enc.term = m.Term + enc.index = m.Index + if l := len(m.Entries); l > 0 { + enc.index = m.Entries[l-1].Index + } + } + enc.fs.Succ(time.Since(start)) + return nil +} + +type msgAppV2Decoder struct { + r io.Reader + local, remote types.ID + + term uint64 + index uint64 +} + +func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) { + var ( + m raftpb.Message + typ uint8 + ) + if err := binary.Read(dec.r, binary.BigEndian, &typ); err != nil { + return m, err + } + switch typ { + case msgTypeLinkHeartbeat: + return linkHeartbeatMessage, nil + case msgTypeAppEntries: + m = raftpb.Message{ + Type: raftpb.MsgApp, + From: uint64(dec.remote), + To: uint64(dec.local), + Term: dec.term, + LogTerm: dec.term, + Index: dec.index, + } + + // decode entries + var l uint64 + if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil { + return m, err + } + m.Entries = make([]raftpb.Entry, int(l)) + for i := 0; i < int(l); i++ { + var size uint64 + if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil { + return m, err + } + buf := make([]byte, int(size)) + if _, err := io.ReadFull(dec.r, buf); err != nil { + return m, err + } + dec.index++ + pbutil.MustUnmarshal(&m.Entries[i], buf) + } + // decode commit index + if err := binary.Read(dec.r, binary.BigEndian, &m.Commit); err != nil { + return m, err + } + case msgTypeApp: + var size uint64 + if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil { + return m, err + } + buf := make([]byte, int(size)) + if _, err := io.ReadFull(dec.r, buf); err != nil { + return m, err + } + pbutil.MustUnmarshal(&m, buf) + + dec.term = m.Term + dec.index = m.Index + if l := len(m.Entries); l > 0 { + dec.index = m.Entries[l-1].Index + } + default: + return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ) + } + return m, nil +} diff --git a/rafthttp/msgappv2_test.go b/rafthttp/msgappv2_test.go new file mode 100644 index 000000000..66f003880 --- /dev/null +++ b/rafthttp/msgappv2_test.go @@ -0,0 +1,123 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rafthttp + +import ( + "bytes" + "reflect" + "testing" + + "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" +) + +func TestMsgAppV2(t *testing.T) { + tests := []raftpb.Message{ + linkHeartbeatMessage, + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 1, + LogTerm: 1, + Index: 0, + Entries: []raftpb.Entry{ + {Term: 1, Index: 1, Data: []byte("some data")}, + {Term: 1, Index: 2, Data: []byte("some data")}, + {Term: 1, Index: 3, Data: []byte("some data")}, + }, + }, + // consecutive MsgApp + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 1, + LogTerm: 1, + Index: 3, + Entries: []raftpb.Entry{ + {Term: 1, Index: 4, Data: []byte("some data")}, + }, + }, + linkHeartbeatMessage, + // consecutive MsgApp after linkHeartbeatMessage + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 1, + LogTerm: 1, + Index: 4, + Entries: []raftpb.Entry{ + {Term: 1, Index: 5, Data: []byte("some data")}, + }, + }, + // MsgApp with higher term + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 3, + LogTerm: 1, + Index: 5, + Entries: []raftpb.Entry{ + {Term: 3, Index: 6, Data: []byte("some data")}, + }, + }, + linkHeartbeatMessage, + // consecutive MsgApp + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 3, + LogTerm: 2, + Index: 6, + Entries: []raftpb.Entry{ + {Term: 3, Index: 7, Data: []byte("some data")}, + }, + }, + // consecutive empty MsgApp + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Term: 3, + LogTerm: 2, + Index: 7, + Entries: nil, + }, + linkHeartbeatMessage, + } + b := &bytes.Buffer{} + enc := &msgAppV2Encoder{w: b, fs: &stats.FollowerStats{}} + dec := &msgAppV2Decoder{r: b, local: types.ID(2), remote: types.ID(1)} + + for i, tt := range tests { + if err := enc.encode(tt); err != nil { + t.Errorf("#%d: unexpected encode message error: %v", i, err) + continue + } + m, err := dec.decode() + if err != nil { + t.Errorf("#%d: unexpected decode message error: %v", i, err) + continue + } + if !reflect.DeepEqual(m, tt) { + t.Errorf("#%d: message = %+v, want %+v", i, m, tt) + } + } +} diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 399b90670..e475e05c0 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -48,6 +48,7 @@ const ( maxPendingProposals = 4096 streamApp = "streamMsgApp" + streamAppV2 = "streamMsgAppV2" streamMsg = "streamMsg" pipelineMsg = "pipeline" ) @@ -55,6 +56,7 @@ const ( var ( bufSizeMap = map[string]int{ streamApp: streamBufSize, + streamAppV2: streamBufSize, streamMsg: streamBufSize, pipelineMsg: pipelineBufSize, } @@ -147,7 +149,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r go func() { var paused bool - msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc, p.propc) + msgAppReader := startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc) reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc) for { select { @@ -212,7 +214,7 @@ func (p *peer) Update(urls types.URLs) { func (p *peer) attachOutgoingConn(conn *outgoingConn) { var ok bool switch conn.t { - case streamTypeMsgApp: + case streamTypeMsgApp, streamTypeMsgAppV2: ok = p.msgAppWriter.attach(conn) case streamTypeMessage: ok = p.writer.attach(conn) diff --git a/rafthttp/stream.go b/rafthttp/stream.go index a8e213c84..1ca25bab8 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -30,15 +30,30 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) -type streamType string - const ( - streamTypeMessage streamType = "message" - streamTypeMsgApp streamType = "msgapp" + streamTypeMessage streamType = "message" + streamTypeMsgAppV2 streamType = "msgappv2" + streamTypeMsgApp streamType = "msgapp" streamBufSize = 4096 ) +type streamType string + +func (t streamType) endpoint() string { + switch t { + case streamTypeMsgApp: // for backward compatibility of v2.0 + return RaftStreamPrefix + case streamTypeMsgAppV2: + return path.Join(RaftStreamPrefix, "msgapp") + case streamTypeMessage: + return path.Join(RaftStreamPrefix, "message") + default: + log.Panicf("rafthttp: unhandled stream type %v", t) + return "" + } +} + var ( // linkHeartbeatMessage is a special message used as heartbeat message in // link layer. It never conflicts with messages from raft because raft @@ -146,6 +161,8 @@ func (cw *streamWriter) run() { log.Panicf("rafthttp: unexpected parse term %s error: %v", conn.termStr, err) } enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs} + case streamTypeMsgAppV2: + enc = &msgAppV2Encoder{w: conn.Writer, fs: cw.fs} case streamTypeMessage: enc = &messageEncoder{w: conn.Writer} default: @@ -263,6 +280,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error { switch cr.t { case streamTypeMsgApp: dec = &msgAppDecoder{r: rc, local: cr.from, remote: cr.to, term: cr.msgAppTerm} + case streamTypeMsgAppV2: + dec = &msgAppV2Decoder{r: rc, local: cr.from, remote: cr.to} case streamTypeMessage: dec = &messageDecoder{r: rc} default: @@ -329,15 +348,7 @@ func (cr *streamReader) dial() (io.ReadCloser, error) { cr.mu.Unlock() uu := u - switch cr.t { - case streamTypeMsgApp: - // for backward compatibility of v2.0 - uu.Path = path.Join(RaftStreamPrefix, cr.from.String()) - case streamTypeMessage: - uu.Path = path.Join(RaftStreamPrefix, string(streamTypeMessage), cr.from.String()) - default: - log.Panicf("rafthttp: unhandled stream type %v", cr.t) - } + uu.Path = path.Join(cr.t.endpoint(), cr.from.String()) req, err := http.NewRequest("GET", uu.String(), nil) if err != nil { cr.picker.unreachable(u) diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index dba5ab8b8..66fe6fef3 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -2,6 +2,7 @@ package rafthttp import ( "errors" + "fmt" "net/http" "net/http/httptest" "reflect" @@ -81,7 +82,7 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) { } func TestStreamReaderDialRequest(t *testing.T) { - for i, tt := range []streamType{streamTypeMsgApp, streamTypeMessage} { + for i, tt := range []streamType{streamTypeMsgApp, streamTypeMessage, streamTypeMsgAppV2} { tr := &roundTripperRecorder{} sr := &streamReader{ tr: tr, @@ -95,13 +96,7 @@ func TestStreamReaderDialRequest(t *testing.T) { sr.dial() req := tr.Request() - var wurl string - switch tt { - case streamTypeMsgApp: - wurl = "http://localhost:7001/raft/stream/1" - case streamTypeMessage: - wurl = "http://localhost:7001/raft/stream/message/1" - } + wurl := fmt.Sprintf("http://localhost:7001" + tt.endpoint() + "/1") if req.URL.String() != wurl { t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl) } @@ -191,6 +186,12 @@ func TestStream(t *testing.T) { msgapp, recvc, }, + { + streamTypeMsgAppV2, + 0, + msgapp, + recvc, + }, } for i, tt := range tests { h := &fakeStreamHandler{t: tt.t} diff --git a/rafthttp/transport_bench_test.go b/rafthttp/transport_bench_test.go index 452d11594..f5d14119b 100644 --- a/rafthttp/transport_bench_test.go +++ b/rafthttp/transport_bench_test.go @@ -29,16 +29,24 @@ import ( ) func BenchmarkSendingMsgApp(b *testing.B) { - r := &countRaft{} - ss := &stats.ServerStats{} - ss.Initialize() - tr := NewTransporter(&http.Transport{}, types.ID(1), types.ID(1), r, nil, ss, stats.NewLeaderStats("1")) + // member 1 + tr := NewTransporter(&http.Transport{}, types.ID(1), types.ID(1), &fakeRaft{}, nil, newServerStats(), stats.NewLeaderStats("1")) srv := httptest.NewServer(tr.Handler()) defer srv.Close() - tr.AddPeer(types.ID(1), []string{srv.URL}) + + // member 2 + r := &countRaft{} + tr2 := NewTransporter(&http.Transport{}, types.ID(2), types.ID(1), r, nil, newServerStats(), stats.NewLeaderStats("2")) + srv2 := httptest.NewServer(tr2.Handler()) + defer srv2.Close() + + tr.AddPeer(types.ID(2), []string{srv2.URL}) defer tr.Stop() - // wait for underlying stream created - time.Sleep(time.Second) + tr2.AddPeer(types.ID(1), []string{srv.URL}) + defer tr2.Stop() + if !waitStreamWorking(tr.(*transport).Get(types.ID(2)).(*peer)) { + b.Fatalf("stream from 1 to 2 is not in work as expected") + } b.ReportAllocs() b.SetBytes(64) @@ -46,7 +54,20 @@ func BenchmarkSendingMsgApp(b *testing.B) { b.ResetTimer() data := make([]byte, 64) for i := 0; i < b.N; i++ { - tr.Send([]raftpb.Message{{Type: raftpb.MsgApp, To: 1, Entries: []raftpb.Entry{{Data: data}}}}) + tr.Send([]raftpb.Message{ + { + Type: raftpb.MsgApp, + From: 1, + To: 2, + Index: uint64(i), + Entries: []raftpb.Entry{ + { + Index: uint64(i + 1), + Data: data, + }, + }, + }, + }) } // wait until all messages are received by the target raft for r.count() != b.N {