From 0de0e4b77cc1301d97a40df06e0f533cb85d8073 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 11 Jun 2015 13:21:13 -0700 Subject: [PATCH] rafthttp: pretty print message drop info --- rafthttp/peer.go | 7 +++++-- rafthttp/peer_status.go | 6 ++++++ rafthttp/remote.go | 12 +++++++++--- rafthttp/stream.go | 8 +++++--- 4 files changed, 25 insertions(+), 8 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 962be0574..9447d2a20 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -161,8 +161,11 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r if isMsgSnap(m) { p.r.ReportSnapshot(m.To, raft.SnapshotFailure) } - // TODO: log start and end of message dropping - plog.Warningf("dropping %s to %s since %s's sending buffer is full", m.Type, p.id, name) + if status.isActive() { + plog.Warningf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) + } else { + plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) + } } case mm := <-p.recvc: if err := r.Process(context.TODO(), mm); err != nil { diff --git a/rafthttp/peer_status.go b/rafthttp/peer_status.go index 4e818462e..c7b8e108f 100644 --- a/rafthttp/peer_status.go +++ b/rafthttp/peer_status.go @@ -65,3 +65,9 @@ func (s *peerStatus) deactivate(failure failureType, reason string) { s.failureMap[failure] = reason plog.Errorf(logline) } + +func (s *peerStatus) isActive() bool { + s.mu.Lock() + defer s.mu.Unlock() + return s.active +} diff --git a/rafthttp/remote.go b/rafthttp/remote.go index 6995f0e2f..a00a8b096 100644 --- a/rafthttp/remote.go +++ b/rafthttp/remote.go @@ -23,14 +23,17 @@ import ( type remote struct { id types.ID + status *peerStatus pipeline *pipeline } func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, errorc chan error) *remote { picker := newURLPicker(urls) + status := newPeerStatus(to) return &remote{ id: to, - pipeline: newPipeline(tr, picker, local, to, cid, newPeerStatus(to), nil, r, errorc), + status: status, + pipeline: newPipeline(tr, picker, local, to, cid, status, nil, r, errorc), } } @@ -38,8 +41,11 @@ func (g *remote) Send(m raftpb.Message) { select { case g.pipeline.msgc <- m: default: - // TODO: log start and end of message dropping - plog.Warningf("dropping %s to %s since sending buffer is full", m.Type, g.id) + if g.status.isActive() { + plog.Warningf("dropped %s to %s since sending buffer is full", m.Type, g.id) + } else { + plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id) + } } } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index d25581b58..f9410dd32 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -359,9 +359,11 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { select { case recvc <- m: default: - // TODO: log start and end of message dropping - plog.Warningf("dropping %s from %x because receiving buffer is full", - m.Type, m.From) + if cr.status.isActive() { + plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, m.From) + } else { + plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, m.From) + } } } }