diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 7acd39a82..4d91d5d61 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -143,29 +143,33 @@ func (cw *streamWriter) run() { select { case <-heartbeatc: start := time.Now() - if err := enc.encode(linkHeartbeatMessage); err != nil { - reportSentFailure(string(t), linkHeartbeatMessage) - - cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) - cw.close() - heartbeatc, msgc = nil, nil + err := enc.encode(linkHeartbeatMessage) + if err == nil { + flusher.Flush() + reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start)) continue } - flusher.Flush() - reportSentDuration(string(t), linkHeartbeatMessage, time.Since(start)) + + reportSentFailure(string(t), linkHeartbeatMessage) + cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) + cw.close() + heartbeatc, msgc = nil, nil + case m := <-msgc: start := time.Now() - if err := enc.encode(m); err != nil { - reportSentFailure(string(t), m) - - cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) - cw.close() - heartbeatc, msgc = nil, nil - cw.r.ReportUnreachable(m.To) + err := enc.encode(m) + if err == nil { + flusher.Flush() + reportSentDuration(string(t), m, time.Since(start)) continue } - flusher.Flush() - reportSentDuration(string(t), m, time.Since(start)) + + reportSentFailure(string(t), m) + cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) + cw.close() + heartbeatc, msgc = nil, nil + cw.r.ReportUnreachable(m.To) + case conn := <-cw.connc: cw.close() t = conn.t