From a7a4233f0b04e439e6d7357d74ae234e9bbbd597 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 1 Jun 2015 17:18:37 -0700 Subject: [PATCH] rafhttp: clean up logging messages --- rafthttp/peer.go | 12 +----------- rafthttp/remote.go | 2 +- rafthttp/stream.go | 21 ++++++++++++--------- rafthttp/transport.go | 2 +- 4 files changed, 15 insertions(+), 22 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index e12792ac3..062196c42 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -53,15 +53,6 @@ const ( pipelineMsg = "pipeline" ) -var ( - bufSizeMap = map[string]int{ - streamApp: streamBufSize, - streamAppV2: streamBufSize, - streamMsg: streamBufSize, - pipelineMsg: pipelineBufSize, - } -) - type Peer interface { // Send sends the message to the remote peer. The function is non-blocking // and has no promise that the message will be received by the remote. @@ -170,8 +161,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r if isMsgSnap(m) { p.r.ReportSnapshot(m.To, raft.SnapshotFailure) } - log.Printf("peer: dropping %s to %s since %s with %d-size buffer is blocked", - m.Type, p.id, name, bufSizeMap[name]) + log.Printf("peer: dropping %s to %s since %s's sending buffer is full", m.Type, p.id, name) } case mm := <-p.recvc: if err := r.Process(context.TODO(), mm); err != nil { diff --git a/rafthttp/remote.go b/rafthttp/remote.go index 396e6b03f..030d7b20b 100644 --- a/rafthttp/remote.go +++ b/rafthttp/remote.go @@ -39,7 +39,7 @@ func (g *remote) Send(m raftpb.Message) { select { case g.pipeline.msgc <- m: default: - log.Printf("remote: dropping %s to %s since pipeline with %d-size buffer is blocked", m.Type, g.id, pipelineBufSize) + log.Printf("remote: dropping %s to %s since sending buffer is full", m.Type, g.id) } } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 7ca0a3831..12776f786 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -132,7 +132,7 @@ func (cw *streamWriter) run() { if err := enc.encode(linkHeartbeatMessage); err != nil { reportSentFailure(string(t), linkHeartbeatMessage) - log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err) + log.Printf("rafthttp: failed to heartbeat on stream %s (%v)", t, err) cw.close() heartbeatc, msgc = nil, nil continue @@ -154,7 +154,7 @@ func (cw *streamWriter) run() { if err := enc.encode(m); err != nil { reportSentFailure(string(t), m) - log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err) + log.Printf("rafthttp: failed to send message on stream %s (%v)", t, err) cw.close() heartbeatc, msgc = nil, nil cw.r.ReportUnreachable(m.To) @@ -170,7 +170,7 @@ func (cw *streamWriter) run() { var err error msgAppTerm, err = strconv.ParseUint(conn.termStr, 10, 64) if err != nil { - log.Panicf("rafthttp: unexpected parse term %s error: %v", conn.termStr, err) + log.Panicf("rafthttp: could not parse term %s to uint (%v)", conn.termStr, err) } enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs} case streamTypeMsgAppV2: @@ -278,7 +278,7 @@ func (cr *streamReader) run() { } if err != nil { if err != errUnsupportedStreamType { - log.Printf("rafthttp: roundtripping error: %v", err) + log.Printf("rafthttp: failed to dial stream %s (%v)", t, err) } } else { err := cr.decodeLoop(rc, t) @@ -291,7 +291,7 @@ func (cr *streamReader) run() { // heartbeat on the idle stream, so it is expected to time out. case t == streamTypeMsgApp && isNetworkTimeoutError(err): default: - log.Printf("rafthttp: failed to read message on stream %s due to %v", t, err) + log.Printf("rafthttp: failed to read message on stream %s (%v)", t, err) } } select { @@ -339,7 +339,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { select { case recvc <- m: default: - log.Printf("rafthttp: dropping %s from %x because receive buffer is blocked", + log.Printf("rafthttp: dropping %s from %x because receiving buffer is full", m.Type, m.From) } } @@ -384,23 +384,26 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { uu := u uu.Path = path.Join(t.endpoint(), cr.from.String()) + req, err := http.NewRequest("GET", uu.String(), nil) if err != nil { cr.picker.unreachable(u) - return nil, fmt.Errorf("new request to %s error: %v", u, err) + return nil, fmt.Errorf("failed to make http request to %s (%v)", u, err) } req.Header.Set("X-Etcd-Cluster-ID", cr.cid.String()) req.Header.Set("X-Raft-To", cr.to.String()) if t == streamTypeMsgApp { req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10)) } + cr.mu.Lock() cr.req = req cr.mu.Unlock() + resp, err := cr.tr.RoundTrip(req) if err != nil { cr.picker.unreachable(u) - return nil, fmt.Errorf("error roundtripping to %s: %v", req.URL, err) + return nil, err } rv := serverVersion(resp.Header) @@ -423,7 +426,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { return resp.Body, nil case http.StatusNotFound: resp.Body.Close() - return nil, fmt.Errorf("local member has not been added to the peer list of member %s", cr.to) + return nil, fmt.Errorf("remote member %s could not recognize local member", cr.to) case http.StatusPreconditionFailed: resp.Body.Close() log.Printf("rafthttp: request sent was ignored due to cluster ID mismatch (remote[%s]:%s, local:%s)", diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 6a78f7145..8383c29e3 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -150,7 +150,7 @@ func (t *transport) Send(msgs []raftpb.Message) { continue } - log.Printf("etcdserver: send message to unknown receiver %s", to) + log.Printf("rafthttp: ignored message %s (sent to unknown receiver %s)", m.Type, to) } }