mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
rafthttp: use pointers to avoid extra copies upon message encoding
This commit is contained in:
parent
8e9097d0c0
commit
fd5bc21522
@ -18,7 +18,7 @@ import "github.com/coreos/etcd/raft/raftpb"
|
|||||||
|
|
||||||
type encoder interface {
|
type encoder interface {
|
||||||
// encode encodes the given message to an output stream.
|
// encode encodes the given message to an output stream.
|
||||||
encode(m raftpb.Message) error
|
encode(m *raftpb.Message) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type decoder interface {
|
type decoder interface {
|
||||||
|
@ -28,11 +28,11 @@ type messageEncoder struct {
|
|||||||
w io.Writer
|
w io.Writer
|
||||||
}
|
}
|
||||||
|
|
||||||
func (enc *messageEncoder) encode(m raftpb.Message) error {
|
func (enc *messageEncoder) encode(m *raftpb.Message) error {
|
||||||
if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
|
if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err := enc.w.Write(pbutil.MustMarshal(&m))
|
_, err := enc.w.Write(pbutil.MustMarshal(m))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ func TestMessage(t *testing.T) {
|
|||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
b := &bytes.Buffer{}
|
b := &bytes.Buffer{}
|
||||||
enc := &messageEncoder{w: b}
|
enc := &messageEncoder{w: b}
|
||||||
if err := enc.encode(tt); err != nil {
|
if err := enc.encode(&tt); err != nil {
|
||||||
t.Errorf("#%d: unexpected encode message error: %v", i, err)
|
t.Errorf("#%d: unexpected encode message error: %v", i, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (enc *msgAppV2Encoder) encode(m raftpb.Message) error {
|
func (enc *msgAppV2Encoder) encode(m *raftpb.Message) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
switch {
|
switch {
|
||||||
case isLinkHeartbeatMessage(m):
|
case isLinkHeartbeatMessage(m):
|
||||||
@ -135,7 +135,7 @@ func (enc *msgAppV2Encoder) encode(m raftpb.Message) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// write message
|
// write message
|
||||||
if _, err := enc.w.Write(pbutil.MustMarshal(&m)); err != nil {
|
if _, err := enc.w.Write(pbutil.MustMarshal(m)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,7 +107,7 @@ func TestMsgAppV2(t *testing.T) {
|
|||||||
dec := newMsgAppV2Decoder(b, types.ID(2), types.ID(1))
|
dec := newMsgAppV2Decoder(b, types.ID(2), types.ID(1))
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
if err := enc.encode(tt); err != nil {
|
if err := enc.encode(&tt); err != nil {
|
||||||
t.Errorf("#%d: unexpected encode message error: %v", i, err)
|
t.Errorf("#%d: unexpected encode message error: %v", i, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -143,7 +143,7 @@ func createSnapBody(merged snap.Message) io.ReadCloser {
|
|||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
enc := &messageEncoder{w: buf}
|
enc := &messageEncoder{w: buf}
|
||||||
// encode raft message
|
// encode raft message
|
||||||
if err := enc.encode(merged.Message); err != nil {
|
if err := enc.encode(&merged.Message); err != nil {
|
||||||
plog.Panicf("encode message error (%v)", err)
|
plog.Panicf("encode message error (%v)", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,7 +84,7 @@ var (
|
|||||||
linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
|
linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
|
||||||
)
|
)
|
||||||
|
|
||||||
func isLinkHeartbeatMessage(m raftpb.Message) bool {
|
func isLinkHeartbeatMessage(m *raftpb.Message) bool {
|
||||||
return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
|
return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -146,7 +146,7 @@ func (cw *streamWriter) run() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-heartbeatc:
|
case <-heartbeatc:
|
||||||
err := enc.encode(linkHeartbeatMessage)
|
err := enc.encode(&linkHeartbeatMessage)
|
||||||
unflushed += linkHeartbeatMessage.Size()
|
unflushed += linkHeartbeatMessage.Size()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
@ -163,7 +163,7 @@ func (cw *streamWriter) run() {
|
|||||||
heartbeatc, msgc = nil, nil
|
heartbeatc, msgc = nil, nil
|
||||||
|
|
||||||
case m := <-msgc:
|
case m := <-msgc:
|
||||||
err := enc.encode(m)
|
err := enc.encode(&m)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
unflushed += m.Size()
|
unflushed += m.Size()
|
||||||
|
|
||||||
@ -354,7 +354,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if isLinkHeartbeatMessage(m) {
|
if isLinkHeartbeatMessage(&m) {
|
||||||
// raft is not interested in link layer
|
// raft is not interested in link layer
|
||||||
// heartbeat message, so we should ignore
|
// heartbeat message, so we should ignore
|
||||||
// it.
|
// it.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user