mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
rafthttp: pretty print message drop info
This commit is contained in:
parent
cd629c9b44
commit
0de0e4b77c
@ -161,8 +161,11 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
|
||||
if isMsgSnap(m) {
|
||||
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
|
||||
}
|
||||
// TODO: log start and end of message dropping
|
||||
plog.Warningf("dropping %s to %s since %s's sending buffer is full", m.Type, p.id, name)
|
||||
if status.isActive() {
|
||||
plog.Warningf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
|
||||
} else {
|
||||
plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
|
||||
}
|
||||
}
|
||||
case mm := <-p.recvc:
|
||||
if err := r.Process(context.TODO(), mm); err != nil {
|
||||
|
@ -65,3 +65,9 @@ func (s *peerStatus) deactivate(failure failureType, reason string) {
|
||||
s.failureMap[failure] = reason
|
||||
plog.Errorf(logline)
|
||||
}
|
||||
|
||||
func (s *peerStatus) isActive() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.active
|
||||
}
|
||||
|
@ -23,14 +23,17 @@ import (
|
||||
|
||||
type remote struct {
|
||||
id types.ID
|
||||
status *peerStatus
|
||||
pipeline *pipeline
|
||||
}
|
||||
|
||||
func startRemote(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r Raft, errorc chan error) *remote {
|
||||
picker := newURLPicker(urls)
|
||||
status := newPeerStatus(to)
|
||||
return &remote{
|
||||
id: to,
|
||||
pipeline: newPipeline(tr, picker, local, to, cid, newPeerStatus(to), nil, r, errorc),
|
||||
status: status,
|
||||
pipeline: newPipeline(tr, picker, local, to, cid, status, nil, r, errorc),
|
||||
}
|
||||
}
|
||||
|
||||
@ -38,8 +41,11 @@ func (g *remote) Send(m raftpb.Message) {
|
||||
select {
|
||||
case g.pipeline.msgc <- m:
|
||||
default:
|
||||
// TODO: log start and end of message dropping
|
||||
plog.Warningf("dropping %s to %s since sending buffer is full", m.Type, g.id)
|
||||
if g.status.isActive() {
|
||||
plog.Warningf("dropped %s to %s since sending buffer is full", m.Type, g.id)
|
||||
} else {
|
||||
plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -359,9 +359,11 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
|
||||
select {
|
||||
case recvc <- m:
|
||||
default:
|
||||
// TODO: log start and end of message dropping
|
||||
plog.Warningf("dropping %s from %x because receiving buffer is full",
|
||||
m.Type, m.From)
|
||||
if cr.status.isActive() {
|
||||
plog.Warningf("dropped %s from %s since receiving buffer is full", m.Type, m.From)
|
||||
} else {
|
||||
plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, m.From)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user