rafthttp: introduce msgappv2 stream format

msgappv2 stream is used to send all MsgApp, and replaces the
functionality of msgapp stream. Compared to v1, it has several
advantanges:
1. The output message is exactly the same with the input one, which
cannot be done in v1.
2. It uses one connection to stream persistently, which prevents message
reorder and saves the time to request stream.
3. It transmits 10 addiontional bytes in the procedure of committing one
proposal, which is trivia for idle time.
4. It transmits less bytes when committing mutliple proposals or keep
committing proposals.
This commit is contained in:
Yicheng Qin 2015-04-02 21:55:13 -07:00
parent a719f78046
commit 0d88e0d111
8 changed files with 386 additions and 36 deletions

View File

@ -125,13 +125,13 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var t streamType
switch path.Dir(r.URL.Path) {
case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)):
t = streamTypeMsgApp
case path.Join(RaftStreamPrefix, string(streamTypeMessage)):
t = streamTypeMessage
// backward compatibility
case RaftStreamPrefix:
t = streamTypeMsgApp
case path.Join(RaftStreamPrefix, string(streamTypeMsgApp)):
t = streamTypeMsgAppV2
case path.Join(RaftStreamPrefix, string(streamTypeMessage)):
t = streamTypeMessage
default:
log.Printf("rafthttp: ignored unexpected streaming request path %s", r.URL.Path)
http.Error(w, "invalid path", http.StatusNotFound)

View File

@ -165,7 +165,7 @@ func TestServeRaftStreamPrefix(t *testing.T) {
},
{
RaftStreamPrefix + "/msgapp/1",
streamTypeMsgApp,
streamTypeMsgAppV2,
},
// backward compatibility
{

192
rafthttp/msgappv2.go Normal file
View File

@ -0,0 +1,192 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"encoding/binary"
"fmt"
"io"
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
const (
msgTypeLinkHeartbeat uint8 = 0
msgTypeAppEntries uint8 = 1
msgTypeApp uint8 = 2
)
// msgappv2 stream sends three types of message: linkHeartbeatMessage,
// AppEntries and MsgApp. AppEntries is the MsgApp that is sent in
// replicate state in raft, whose index and term are fully predicatable.
//
// Data format of linkHeartbeatMessage:
// | offset | bytes | description |
// +--------+-------+-------------+
// | 0 | 1 | \x00 |
//
// Data format of AppEntries:
// | offset | bytes | description |
// +--------+-------+-------------+
// | 0 | 1 | \x01 |
// | 1 | 8 | length of entries |
// | 9 | 8 | length of first entry |
// | 17 | n1 | first entry |
// ...
// | x | 8 | length of k-th entry data |
// | x+8 | nk | k-th entry data |
// | x+8+nk | 8 | commit index |
//
// Data format of MsgApp:
// | offset | bytes | description |
// +--------+-------+-------------+
// | 0 | 1 | \x01 |
// | 1 | 8 | length of encoded message |
// | 9 | n | encoded message |
type msgAppV2Encoder struct {
w io.Writer
fs *stats.FollowerStats
term uint64
index uint64
}
func (enc *msgAppV2Encoder) encode(m raftpb.Message) error {
start := time.Now()
switch {
case isLinkHeartbeatMessage(m):
return binary.Write(enc.w, binary.BigEndian, msgTypeLinkHeartbeat)
case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term:
if err := binary.Write(enc.w, binary.BigEndian, msgTypeAppEntries); err != nil {
return err
}
// write length of entries
l := len(m.Entries)
if err := binary.Write(enc.w, binary.BigEndian, uint64(l)); err != nil {
return err
}
for i := 0; i < l; i++ {
size := m.Entries[i].Size()
if err := binary.Write(enc.w, binary.BigEndian, uint64(size)); err != nil {
return err
}
if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil {
return err
}
enc.index++
}
// write commit index
if err := binary.Write(enc.w, binary.BigEndian, m.Commit); err != nil {
return err
}
default:
if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil {
return err
}
// write size of message
if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
return err
}
// write message
if _, err := enc.w.Write(pbutil.MustMarshal(&m)); err != nil {
return err
}
enc.term = m.Term
enc.index = m.Index
if l := len(m.Entries); l > 0 {
enc.index = m.Entries[l-1].Index
}
}
enc.fs.Succ(time.Since(start))
return nil
}
type msgAppV2Decoder struct {
r io.Reader
local, remote types.ID
term uint64
index uint64
}
func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) {
var (
m raftpb.Message
typ uint8
)
if err := binary.Read(dec.r, binary.BigEndian, &typ); err != nil {
return m, err
}
switch typ {
case msgTypeLinkHeartbeat:
return linkHeartbeatMessage, nil
case msgTypeAppEntries:
m = raftpb.Message{
Type: raftpb.MsgApp,
From: uint64(dec.remote),
To: uint64(dec.local),
Term: dec.term,
LogTerm: dec.term,
Index: dec.index,
}
// decode entries
var l uint64
if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
return m, err
}
m.Entries = make([]raftpb.Entry, int(l))
for i := 0; i < int(l); i++ {
var size uint64
if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
return m, err
}
buf := make([]byte, int(size))
if _, err := io.ReadFull(dec.r, buf); err != nil {
return m, err
}
dec.index++
pbutil.MustUnmarshal(&m.Entries[i], buf)
}
// decode commit index
if err := binary.Read(dec.r, binary.BigEndian, &m.Commit); err != nil {
return m, err
}
case msgTypeApp:
var size uint64
if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
return m, err
}
buf := make([]byte, int(size))
if _, err := io.ReadFull(dec.r, buf); err != nil {
return m, err
}
pbutil.MustUnmarshal(&m, buf)
dec.term = m.Term
dec.index = m.Index
if l := len(m.Entries); l > 0 {
dec.index = m.Entries[l-1].Index
}
default:
return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ)
}
return m, nil
}

123
rafthttp/msgappv2_test.go Normal file
View File

@ -0,0 +1,123 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rafthttp
import (
"bytes"
"reflect"
"testing"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
)
func TestMsgAppV2(t *testing.T) {
tests := []raftpb.Message{
linkHeartbeatMessage,
{
Type: raftpb.MsgApp,
From: 1,
To: 2,
Term: 1,
LogTerm: 1,
Index: 0,
Entries: []raftpb.Entry{
{Term: 1, Index: 1, Data: []byte("some data")},
{Term: 1, Index: 2, Data: []byte("some data")},
{Term: 1, Index: 3, Data: []byte("some data")},
},
},
// consecutive MsgApp
{
Type: raftpb.MsgApp,
From: 1,
To: 2,
Term: 1,
LogTerm: 1,
Index: 3,
Entries: []raftpb.Entry{
{Term: 1, Index: 4, Data: []byte("some data")},
},
},
linkHeartbeatMessage,
// consecutive MsgApp after linkHeartbeatMessage
{
Type: raftpb.MsgApp,
From: 1,
To: 2,
Term: 1,
LogTerm: 1,
Index: 4,
Entries: []raftpb.Entry{
{Term: 1, Index: 5, Data: []byte("some data")},
},
},
// MsgApp with higher term
{
Type: raftpb.MsgApp,
From: 1,
To: 2,
Term: 3,
LogTerm: 1,
Index: 5,
Entries: []raftpb.Entry{
{Term: 3, Index: 6, Data: []byte("some data")},
},
},
linkHeartbeatMessage,
// consecutive MsgApp
{
Type: raftpb.MsgApp,
From: 1,
To: 2,
Term: 3,
LogTerm: 2,
Index: 6,
Entries: []raftpb.Entry{
{Term: 3, Index: 7, Data: []byte("some data")},
},
},
// consecutive empty MsgApp
{
Type: raftpb.MsgApp,
From: 1,
To: 2,
Term: 3,
LogTerm: 2,
Index: 7,
Entries: nil,
},
linkHeartbeatMessage,
}
b := &bytes.Buffer{}
enc := &msgAppV2Encoder{w: b, fs: &stats.FollowerStats{}}
dec := &msgAppV2Decoder{r: b, local: types.ID(2), remote: types.ID(1)}
for i, tt := range tests {
if err := enc.encode(tt); err != nil {
t.Errorf("#%d: unexpected encode message error: %v", i, err)
continue
}
m, err := dec.decode()
if err != nil {
t.Errorf("#%d: unexpected decode message error: %v", i, err)
continue
}
if !reflect.DeepEqual(m, tt) {
t.Errorf("#%d: message = %+v, want %+v", i, m, tt)
}
}
}

View File

@ -48,6 +48,7 @@ const (
maxPendingProposals = 4096
streamApp = "streamMsgApp"
streamAppV2 = "streamMsgAppV2"
streamMsg = "streamMsg"
pipelineMsg = "pipeline"
)
@ -55,6 +56,7 @@ const (
var (
bufSizeMap = map[string]int{
streamApp: streamBufSize,
streamAppV2: streamBufSize,
streamMsg: streamBufSize,
pipelineMsg: pipelineBufSize,
}
@ -147,7 +149,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
go func() {
var paused bool
msgAppReader := startStreamReader(tr, picker, streamTypeMsgApp, local, to, cid, p.recvc, p.propc)
msgAppReader := startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc)
reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc)
for {
select {
@ -212,7 +214,7 @@ func (p *peer) Update(urls types.URLs) {
func (p *peer) attachOutgoingConn(conn *outgoingConn) {
var ok bool
switch conn.t {
case streamTypeMsgApp:
case streamTypeMsgApp, streamTypeMsgAppV2:
ok = p.msgAppWriter.attach(conn)
case streamTypeMessage:
ok = p.writer.attach(conn)

View File

@ -30,15 +30,30 @@ import (
"github.com/coreos/etcd/raft/raftpb"
)
type streamType string
const (
streamTypeMessage streamType = "message"
streamTypeMsgApp streamType = "msgapp"
streamTypeMessage streamType = "message"
streamTypeMsgAppV2 streamType = "msgappv2"
streamTypeMsgApp streamType = "msgapp"
streamBufSize = 4096
)
type streamType string
func (t streamType) endpoint() string {
switch t {
case streamTypeMsgApp: // for backward compatibility of v2.0
return RaftStreamPrefix
case streamTypeMsgAppV2:
return path.Join(RaftStreamPrefix, "msgapp")
case streamTypeMessage:
return path.Join(RaftStreamPrefix, "message")
default:
log.Panicf("rafthttp: unhandled stream type %v", t)
return ""
}
}
var (
// linkHeartbeatMessage is a special message used as heartbeat message in
// link layer. It never conflicts with messages from raft because raft
@ -146,6 +161,8 @@ func (cw *streamWriter) run() {
log.Panicf("rafthttp: unexpected parse term %s error: %v", conn.termStr, err)
}
enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs}
case streamTypeMsgAppV2:
enc = &msgAppV2Encoder{w: conn.Writer, fs: cw.fs}
case streamTypeMessage:
enc = &messageEncoder{w: conn.Writer}
default:
@ -263,6 +280,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
switch cr.t {
case streamTypeMsgApp:
dec = &msgAppDecoder{r: rc, local: cr.from, remote: cr.to, term: cr.msgAppTerm}
case streamTypeMsgAppV2:
dec = &msgAppV2Decoder{r: rc, local: cr.from, remote: cr.to}
case streamTypeMessage:
dec = &messageDecoder{r: rc}
default:
@ -329,15 +348,7 @@ func (cr *streamReader) dial() (io.ReadCloser, error) {
cr.mu.Unlock()
uu := u
switch cr.t {
case streamTypeMsgApp:
// for backward compatibility of v2.0
uu.Path = path.Join(RaftStreamPrefix, cr.from.String())
case streamTypeMessage:
uu.Path = path.Join(RaftStreamPrefix, string(streamTypeMessage), cr.from.String())
default:
log.Panicf("rafthttp: unhandled stream type %v", cr.t)
}
uu.Path = path.Join(cr.t.endpoint(), cr.from.String())
req, err := http.NewRequest("GET", uu.String(), nil)
if err != nil {
cr.picker.unreachable(u)

View File

@ -2,6 +2,7 @@ package rafthttp
import (
"errors"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
@ -81,7 +82,7 @@ func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
}
func TestStreamReaderDialRequest(t *testing.T) {
for i, tt := range []streamType{streamTypeMsgApp, streamTypeMessage} {
for i, tt := range []streamType{streamTypeMsgApp, streamTypeMessage, streamTypeMsgAppV2} {
tr := &roundTripperRecorder{}
sr := &streamReader{
tr: tr,
@ -95,13 +96,7 @@ func TestStreamReaderDialRequest(t *testing.T) {
sr.dial()
req := tr.Request()
var wurl string
switch tt {
case streamTypeMsgApp:
wurl = "http://localhost:7001/raft/stream/1"
case streamTypeMessage:
wurl = "http://localhost:7001/raft/stream/message/1"
}
wurl := fmt.Sprintf("http://localhost:7001" + tt.endpoint() + "/1")
if req.URL.String() != wurl {
t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl)
}
@ -191,6 +186,12 @@ func TestStream(t *testing.T) {
msgapp,
recvc,
},
{
streamTypeMsgAppV2,
0,
msgapp,
recvc,
},
}
for i, tt := range tests {
h := &fakeStreamHandler{t: tt.t}

View File

@ -29,16 +29,24 @@ import (
)
func BenchmarkSendingMsgApp(b *testing.B) {
r := &countRaft{}
ss := &stats.ServerStats{}
ss.Initialize()
tr := NewTransporter(&http.Transport{}, types.ID(1), types.ID(1), r, nil, ss, stats.NewLeaderStats("1"))
// member 1
tr := NewTransporter(&http.Transport{}, types.ID(1), types.ID(1), &fakeRaft{}, nil, newServerStats(), stats.NewLeaderStats("1"))
srv := httptest.NewServer(tr.Handler())
defer srv.Close()
tr.AddPeer(types.ID(1), []string{srv.URL})
// member 2
r := &countRaft{}
tr2 := NewTransporter(&http.Transport{}, types.ID(2), types.ID(1), r, nil, newServerStats(), stats.NewLeaderStats("2"))
srv2 := httptest.NewServer(tr2.Handler())
defer srv2.Close()
tr.AddPeer(types.ID(2), []string{srv2.URL})
defer tr.Stop()
// wait for underlying stream created
time.Sleep(time.Second)
tr2.AddPeer(types.ID(1), []string{srv.URL})
defer tr2.Stop()
if !waitStreamWorking(tr.(*transport).Get(types.ID(2)).(*peer)) {
b.Fatalf("stream from 1 to 2 is not in work as expected")
}
b.ReportAllocs()
b.SetBytes(64)
@ -46,7 +54,20 @@ func BenchmarkSendingMsgApp(b *testing.B) {
b.ResetTimer()
data := make([]byte, 64)
for i := 0; i < b.N; i++ {
tr.Send([]raftpb.Message{{Type: raftpb.MsgApp, To: 1, Entries: []raftpb.Entry{{Data: data}}}})
tr.Send([]raftpb.Message{
{
Type: raftpb.MsgApp,
From: 1,
To: 2,
Index: uint64(i),
Entries: []raftpb.Entry{
{
Index: uint64(i + 1),
Data: data,
},
},
},
})
}
// wait until all messages are received by the target raft
for r.count() != b.N {