diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 55feac1c9..832f11e71 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -91,6 +91,7 @@ type Peer interface { type peer struct { // id of the remote raft peer node id types.ID + r Raft msgAppWriter *streamWriter writer *streamWriter @@ -113,6 +114,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r picker := newURLPicker(urls) p := &peer{ id: to, + r: r, msgAppWriter: startStreamWriter(fs, r), writer: startStreamWriter(fs, r), pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc), @@ -156,6 +158,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r select { case writec <- m: default: + p.r.ReportUnreachable(m.To) log.Printf("peer: dropping %s to %s since %s with %d-size buffer is blocked", m.Type, p.id, name, bufSizeMap[name]) } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 9f0705b55..2f2a03b26 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -116,6 +116,8 @@ func (cw *streamWriter) run() { if m.Term > msgAppTerm { cw.resetCloser() heartbeatc, msgc = nil, nil + // TODO: report to raft at peer level + cw.r.ReportUnreachable(m.To) } continue }