diff --git a/etcdserver/etcdhttp/client.go b/etcdserver/etcdhttp/client.go index 18bf7ed02..29e1a783d 100644 --- a/etcdserver/etcdhttp/client.go +++ b/etcdserver/etcdhttp/client.go @@ -260,8 +260,13 @@ func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) { if !allowMethod(w, r.Method, "GET") { return } + stats := h.stats.LeaderStats() + if stats == nil { + writeError(w, httptypes.NewHTTPError(http.StatusForbidden, "not current leader")) + return + } w.Header().Set("Content-Type", "application/json") - w.Write(h.stats.LeaderStats()) + w.Write(stats) } func serveVersion(w http.ResponseWriter, r *http.Request) { diff --git a/etcdserver/server.go b/etcdserver/server.go index 1f5de691f..ee4b48ee5 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -526,7 +526,10 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) { func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() } func (s *EtcdServer) LeaderStats() []byte { - // TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ... + lead := atomic.LoadUint64(&s.raftLead) + if lead != uint64(s.id) { + return nil + } return s.lstats.JSON() } diff --git a/rafthttp/sender.go b/rafthttp/sender.go index 4b16ddbe7..d4e0f4e7e 100644 --- a/rafthttp/sender.go +++ b/rafthttp/sender.go @@ -77,7 +77,7 @@ func NewSender(tr http.RoundTripper, u string, id types.ID, cid types.ID, p Proc shouldstop: shouldstop, batcher: NewBatcher(100, appRespBatchMs*time.Millisecond), propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond), - q: make(chan []byte, senderBufSize), + q: make(chan *raftpb.Message, senderBufSize), } s.wg.Add(connPerSender) for i := 0; i < connPerSender; i++ { @@ -98,7 +98,7 @@ type sender struct { strmCln *streamClient batcher *Batcher propBatcher *ProposalBatcher - q chan []byte + q chan *raftpb.Message strmSrvMu sync.Mutex strmSrv *streamServer @@ -184,9 +184,8 @@ func (s *sender) Send(m raftpb.Message) error { func (s *sender) send(m raftpb.Message) error { // TODO: don't block. we should be able to have 1000s // of messages out at a time. - data := pbutil.MustMarshal(&m) select { - case s.q <- data: + case s.q <- &m: return nil default: log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached", @@ -267,9 +266,9 @@ func (s *sender) tryStream(m raftpb.Message) bool { func (s *sender) handle() { defer s.wg.Done() - for d := range s.q { + for m := range s.q { start := time.Now() - err := s.post(d) + err := s.post(pbutil.MustMarshal(m)) end := time.Now() s.mu.Lock() @@ -282,14 +281,18 @@ func (s *sender) handle() { log.Printf("sender: the connection with %s becomes inactive", s.id) s.active = false } - s.fs.Fail() + if m.Type == raftpb.MsgApp { + s.fs.Fail() + } } else { if !s.active { log.Printf("sender: the connection with %s becomes active", s.id) s.active = true s.errored = nil } - s.fs.Succ(end.Sub(start)) + if m.Type == raftpb.MsgApp { + s.fs.Succ(end.Sub(start)) + } } s.mu.Unlock() } diff --git a/rafthttp/sender_test.go b/rafthttp/sender_test.go index 2181c4ca6..67a85f6d4 100644 --- a/rafthttp/sender_test.go +++ b/rafthttp/sender_test.go @@ -36,7 +36,7 @@ func TestSenderSend(t *testing.T) { fs := &stats.FollowerStats{} s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) - if err := s.Send(raftpb.Message{}); err != nil { + if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil { t.Fatalf("unexpect send error: %v", err) } s.Stop() @@ -88,7 +88,7 @@ func TestSenderSendFailed(t *testing.T) { fs := &stats.FollowerStats{} s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil) - if err := s.Send(raftpb.Message{}); err != nil { + if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil { t.Fatalf("unexpect Send error: %v", err) } s.Stop()