diff --git a/etcdserver/sendhub.go b/etcdserver/sendhub.go index ccacdc686..ad1fa18fe 100644 --- a/etcdserver/sendhub.go +++ b/etcdserver/sendhub.go @@ -70,18 +70,11 @@ func (h *sendHub) Send(msgs []raftpb.Message) { continue } - // TODO: don't block. we should be able to have 1000s - // of messages out at a time. - data, err := m.Marshal() - if err != nil { - log.Println("sender: dropping message:", err) - return // drop bad message - } if m.Type == raftpb.MsgApp { - h.ss.SendAppendReq(len(data)) + h.ss.SendAppendReq(m.Size()) } - s.Send(data) + s.Send(m) } } diff --git a/etcdserver/sendhub_test.go b/etcdserver/sendhub_test.go index 5d5f6017d..c5bbdb924 100644 --- a/etcdserver/sendhub_test.go +++ b/etcdserver/sendhub_test.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" ) func TestSendHubInitSenders(t *testing.T) { @@ -98,7 +99,7 @@ func TestSendHubShouldStop(t *testing.T) { t.Fatalf("received unexpected shouldstop notification") case <-time.After(10 * time.Millisecond): } - h.senders[1].Send([]byte("somedata")) + h.senders[1].Send(raftpb.Message{}) testutil.ForceGosched() select { diff --git a/rafthttp/sender.go b/rafthttp/sender.go index 203a6c5aa..36d8ceb92 100644 --- a/rafthttp/sender.go +++ b/rafthttp/sender.go @@ -25,7 +25,9 @@ import ( "time" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" ) const ( @@ -37,7 +39,7 @@ type Sender interface { Update(u string) // Send sends the data to the remote node. It is always non-blocking. // It may be fail to send data if it returns nil error. - Send(data []byte) error + Send(m raftpb.Message) error // Stop performs any necessary finalization and terminates the Sender // elegantly. Stop() @@ -77,7 +79,10 @@ func (s *sender) Update(u string) { } // TODO (xiangli): reasonable retry logic -func (s *sender) Send(data []byte) error { +func (s *sender) Send(m raftpb.Message) error { + // TODO: don't block. we should be able to have 1000s + // of messages out at a time. + data := pbutil.MustMarshal(&m) select { case s.q <- data: return nil diff --git a/rafthttp/sender_test.go b/rafthttp/sender_test.go index 6e86a4f0c..a908d3838 100644 --- a/rafthttp/sender_test.go +++ b/rafthttp/sender_test.go @@ -26,6 +26,7 @@ import ( "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft/raftpb" ) // TestSenderSend tests that send func could post data using roundtripper @@ -35,7 +36,7 @@ func TestSenderSend(t *testing.T) { fs := &stats.FollowerStats{} s := NewSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) - if err := s.Send([]byte("some data")); err != nil { + if err := s.Send(raftpb.Message{}); err != nil { t.Fatalf("unexpect send error: %v", err) } s.Stop() @@ -58,7 +59,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { // keep the sender busy and make the buffer full // nothing can go out as we block the sender for i := 0; i < connPerSender+senderBufSize; i++ { - if err := s.Send([]byte("some data")); err != nil { + if err := s.Send(raftpb.Message{}); err != nil { t.Errorf("send err = %v, want nil", err) } // force the sender to grab data @@ -66,7 +67,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { } // try to send a data when we are sure the buffer is full - if err := s.Send([]byte("some data")); err == nil { + if err := s.Send(raftpb.Message{}); err == nil { t.Errorf("unexpect send success") } @@ -75,7 +76,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { testutil.ForceGosched() // It could send new data after previous ones succeed - if err := s.Send([]byte("some data")); err != nil { + if err := s.Send(raftpb.Message{}); err != nil { t.Errorf("send err = %v, want nil", err) } s.Stop() @@ -87,7 +88,7 @@ func TestSenderSendFailed(t *testing.T) { fs := &stats.FollowerStats{} s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil) - if err := s.Send([]byte("some data")); err != nil { + if err := s.Send(raftpb.Message{}); err != nil { t.Fatalf("unexpect Send error: %v", err) } s.Stop()