From f1e995b0702fee96fbcc99338a618961ec78d1ee Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 2 Jun 2015 15:33:58 -0700 Subject: [PATCH] rafthttp: use leveled logger --- rafthttp/http.go | 30 +++++++++++++++++------------- rafthttp/peer.go | 10 +++++----- rafthttp/pipeline.go | 11 +++++------ rafthttp/remote.go | 4 ++-- rafthttp/stream.go | 26 ++++++++++++++------------ rafthttp/transport.go | 14 ++++++++------ 6 files changed, 51 insertions(+), 44 deletions(-) diff --git a/rafthttp/http.go b/rafthttp/http.go index 67242d02a..8720fb22c 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -17,7 +17,6 @@ package rafthttp import ( "errors" "io/ioutil" - "log" "net/http" "path" @@ -77,7 +76,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil { - log.Printf("rafthttp: request received was ignored (%v)", err) + plog.Errorf("request received was ignored (%v)", err) http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed) return } @@ -87,7 +86,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { gcid := r.Header.Get("X-Etcd-Cluster-ID") if gcid != wcid { - log.Printf("rafthttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid) + plog.Errorf("request received was ignored (cluster ID mismatch got %s want %s)", gcid, wcid) http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed) return } @@ -97,13 +96,13 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { limitedr := pioutil.NewLimitedBufferReader(r.Body, ConnReadLimitByte) b, err := ioutil.ReadAll(limitedr) if err != nil { - log.Println("rafthttp: error reading raft message:", err) + plog.Errorf("failed to read raft message (%v)", err) http.Error(w, "error reading raft message", http.StatusBadRequest) return } var m raftpb.Message if err := m.Unmarshal(b); err != nil { - log.Println("rafthttp: error unmarshaling raft message:", err) + plog.Errorf("failed to unmarshal raft message (%v)", err) http.Error(w, "error unmarshaling raft message", http.StatusBadRequest) return } @@ -112,7 +111,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case writerToResponse: v.WriteTo(w) default: - log.Printf("rafthttp: error processing raft message: %v", err) + plog.Warningf("failed to process raft message (%v)", err) http.Error(w, "error processing raft message", http.StatusInternalServerError) } return @@ -139,7 +138,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Server-Version", version.Version) if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil { - log.Printf("rafthttp: request received was ignored (%v)", err) + plog.Errorf("request received was ignored (%v)", err) http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed) return } @@ -148,7 +147,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Etcd-Cluster-ID", wcid) if gcid := r.Header.Get("X-Etcd-Cluster-ID"); gcid != wcid { - log.Printf("rafthttp: streaming request ignored due to cluster ID mismatch got %s want %s", gcid, wcid) + plog.Errorf("streaming request ignored (cluster ID mismatch got %s want %s)", gcid, wcid) http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed) return } @@ -163,7 +162,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { case path.Join(RaftStreamPrefix, string(streamTypeMessage)): t = streamTypeMessage default: - log.Printf("rafthttp: ignored unexpected streaming request path %s", r.URL.Path) + plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path) http.Error(w, "invalid path", http.StatusNotFound) return } @@ -171,25 +170,30 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { fromStr := path.Base(r.URL.Path) from, err := types.IDFromString(fromStr) if err != nil { - log.Printf("rafthttp: failed to parse from %s into ID", fromStr) + plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err) http.Error(w, "invalid from", http.StatusNotFound) return } if h.r.IsIDRemoved(uint64(from)) { - log.Printf("rafthttp: reject the stream from peer %s since it was removed", from) + plog.Warningf("rejected the stream from peer %s since it was removed", from) http.Error(w, "removed member", http.StatusGone) return } p := h.peerGetter.Get(from) if p == nil { - log.Printf("rafthttp: fail to find sender %s", from) + // This may happen in following cases: + // 1. user starts a remote peer that belongs to a different cluster + // with the same cluster ID. + // 2. local etcd falls behind of the cluster, and cannot recognize + // the members that joined after its current progress. + plog.Errorf("failed to find member %s in cluster %s", from, wcid) http.Error(w, "error sender not found", http.StatusNotFound) return } wto := h.id.String() if gto := r.Header.Get("X-Raft-To"); gto != wto { - log.Printf("rafthttp: streaming request ignored due to ID mismatch got %s want %s", gto, wto) + plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto) http.Error(w, "to field mismatch", http.StatusPreconditionFailed) return } diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 799469fe7..735ced42e 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -15,7 +15,6 @@ package rafthttp import ( - "log" "net/http" "time" @@ -135,7 +134,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r select { case mm := <-p.propc: if err := r.Process(ctx, mm); err != nil { - log.Printf("peer: process raft message error: %v", err) + plog.Warningf("failed to process raft message (%v)", err) } case <-p.stopc: return @@ -161,11 +160,12 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r if isMsgSnap(m) { p.r.ReportSnapshot(m.To, raft.SnapshotFailure) } - log.Printf("peer: dropping %s to %s since %s's sending buffer is full", m.Type, p.id, name) + // TODO: log start and end of message dropping + plog.Warningf("dropping %s to %s since %s's sending buffer is full", m.Type, p.id, name) } case mm := <-p.recvc: if err := r.Process(context.TODO(), mm); err != nil { - log.Printf("peer: process raft message error: %v", err) + plog.Warningf("failed to process raft message (%v)", err) } case urls := <-p.newURLsC: picker.update(urls) @@ -213,7 +213,7 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) { case streamTypeMessage: ok = p.writer.attach(conn) default: - log.Panicf("rafthttp: unhandled stream type %s", conn.t) + plog.Panicf("unhandled stream type %s", conn.t) } if !ok { conn.Close() diff --git a/rafthttp/pipeline.go b/rafthttp/pipeline.go index a4a9123b0..0315e0a9c 100644 --- a/rafthttp/pipeline.go +++ b/rafthttp/pipeline.go @@ -18,7 +18,6 @@ import ( "bytes" "fmt" "io/ioutil" - "log" "net/http" "strings" "sync" @@ -98,11 +97,11 @@ func (p *pipeline) handle() { reportSentFailure(pipelineMsg, m) if p.errored == nil || p.errored.Error() != err.Error() { - log.Printf("pipeline: error posting to %s: %v", p.to, err) + plog.Errorf("failed to post to %s (%v)", p.to, err) p.errored = err } if p.active { - log.Printf("pipeline: the connection with %s became inactive", p.to) + plog.Infof("the connection with %s became inactive", p.to) p.active = false } if m.Type == raftpb.MsgApp && p.fs != nil { @@ -114,7 +113,7 @@ func (p *pipeline) handle() { } } else { if !p.active { - log.Printf("pipeline: the connection with %s became active", p.to) + plog.Infof("the connection with %s became active", p.to) p.active = true p.errored = nil } @@ -162,10 +161,10 @@ func (p *pipeline) post(data []byte) error { case http.StatusPreconditionFailed: switch strings.TrimSuffix(string(b), "\n") { case errIncompatibleVersion.Error(): - log.Printf("rafthttp: request sent was ignored by peer %s (server version incompatible)", p.to) + plog.Errorf("request sent was ignored by peer %s (server version incompatible)", p.to) return errIncompatibleVersion case errClusterIDMismatch.Error(): - log.Printf("rafthttp: request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)", + plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)", p.to, resp.Header.Get("X-Etcd-Cluster-ID"), p.cid) return errClusterIDMismatch default: diff --git a/rafthttp/remote.go b/rafthttp/remote.go index d085a07bf..8438cb765 100644 --- a/rafthttp/remote.go +++ b/rafthttp/remote.go @@ -15,7 +15,6 @@ package rafthttp import ( - "log" "net/http" "github.com/coreos/etcd/pkg/types" @@ -39,7 +38,8 @@ func (g *remote) Send(m raftpb.Message) { select { case g.pipeline.msgc <- m: default: - log.Printf("remote: dropping %s to %s since sending buffer is full", m.Type, g.id) + // TODO: log start and end of message dropping + plog.Warningf("dropping %s to %s since sending buffer is full", m.Type, g.id) } } diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 5db0d5ec5..f3a96fa47 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -18,7 +18,6 @@ import ( "fmt" "io" "io/ioutil" - "log" "net" "net/http" "path" @@ -63,7 +62,7 @@ func (t streamType) endpoint() string { case streamTypeMessage: return path.Join(RaftStreamPrefix, "message") default: - log.Panicf("rafthttp: unhandled stream type %v", t) + plog.Panicf("unhandled stream type %v", t) return "" } } @@ -134,7 +133,7 @@ func (cw *streamWriter) run() { if err := enc.encode(linkHeartbeatMessage); err != nil { reportSentFailure(string(t), linkHeartbeatMessage) - log.Printf("rafthttp: failed to heartbeat on stream %s (%v)", t, err) + plog.Errorf("failed to heartbeat on stream %s (%v)", t, err) cw.close() heartbeatc, msgc = nil, nil continue @@ -156,7 +155,7 @@ func (cw *streamWriter) run() { if err := enc.encode(m); err != nil { reportSentFailure(string(t), m) - log.Printf("rafthttp: failed to send message on stream %s (%v)", t, err) + plog.Errorf("failed to send message on stream %s (%v)", t, err) cw.close() heartbeatc, msgc = nil, nil cw.r.ReportUnreachable(m.To) @@ -172,7 +171,7 @@ func (cw *streamWriter) run() { var err error msgAppTerm, err = strconv.ParseUint(conn.termStr, 10, 64) if err != nil { - log.Panicf("rafthttp: could not parse term %s to uint (%v)", conn.termStr, err) + plog.Panicf("could not parse term %s to uint (%v)", conn.termStr, err) } enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs} case streamTypeMsgAppV2: @@ -180,7 +179,7 @@ func (cw *streamWriter) run() { case streamTypeMessage: enc = &messageEncoder{w: conn.Writer} default: - log.Panicf("rafthttp: unhandled stream type %s", conn.t) + plog.Panicf("unhandled stream type %s", conn.t) } flusher = conn.Flusher cw.mu.Lock() @@ -280,7 +279,9 @@ func (cr *streamReader) run() { } if err != nil { if err != errUnsupportedStreamType { - log.Printf("rafthttp: failed to dial stream %s (%v)", t, err) + // TODO: log start and end of the stream, and print + // error in backoff way + plog.Errorf("failed to dial stream %s (%v)", t, err) } } else { err := cr.decodeLoop(rc, t) @@ -293,7 +294,7 @@ func (cr *streamReader) run() { // heartbeat on the idle stream, so it is expected to time out. case t == streamTypeMsgApp && isNetworkTimeoutError(err): default: - log.Printf("rafthttp: failed to read message on stream %s (%v)", t, err) + plog.Errorf("failed to read message on stream %s (%v)", t, err) } } select { @@ -318,7 +319,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { case streamTypeMessage: dec = &messageDecoder{r: rc} default: - log.Panicf("rafthttp: unhandled stream type %s", t) + plog.Panicf("unhandled stream type %s", t) } cr.closer = rc cr.mu.Unlock() @@ -341,7 +342,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { select { case recvc <- m: default: - log.Printf("rafthttp: dropping %s from %x because receiving buffer is full", + // TODO: log start and end of message dropping + plog.Warningf("dropping %s from %x because receiving buffer is full", m.Type, m.From) } } @@ -442,10 +444,10 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { switch strings.TrimSuffix(string(b), "\n") { case errIncompatibleVersion.Error(): - log.Printf("rafthttp: request sent was ignored by peer %s (server version incompatible)", cr.to) + plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.to) return nil, errIncompatibleVersion case errClusterIDMismatch.Error(): - log.Printf("rafthttp: request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)", + plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)", cr.to, resp.Header.Get("X-Etcd-Cluster-ID"), cr.cid) return nil, errClusterIDMismatch default: diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 8383c29e3..ea9b055bd 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -15,10 +15,10 @@ package rafthttp import ( - "log" "net/http" "sync" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/types" @@ -26,6 +26,8 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) +var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "snap") + type Raft interface { Process(ctx context.Context, m raftpb.Message) error IsIDRemoved(id uint64) bool @@ -150,7 +152,7 @@ func (t *transport) Send(msgs []raftpb.Message) { continue } - log.Printf("rafthttp: ignored message %s (sent to unknown receiver %s)", m.Type, to) + plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to) } } @@ -174,7 +176,7 @@ func (t *transport) AddRemote(id types.ID, us []string) { } urls, err := types.NewURLs(us) if err != nil { - log.Panicf("newURLs %+v should never fail: %+v", us, err) + plog.Panicf("newURLs %+v should never fail: %+v", us, err) } t.remotes[id] = startRemote(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, t.errorc) } @@ -187,7 +189,7 @@ func (t *transport) AddPeer(id types.ID, us []string) { } urls, err := types.NewURLs(us) if err != nil { - log.Panicf("newURLs %+v should never fail: %+v", us, err) + plog.Panicf("newURLs %+v should never fail: %+v", us, err) } fs := t.leaderStats.Follower(id.String()) t.peers[id] = startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc) @@ -212,7 +214,7 @@ func (t *transport) removePeer(id types.ID) { if peer, ok := t.peers[id]; ok { peer.Stop() } else { - log.Panicf("rafthttp: unexpected removal of unknown peer '%d'", id) + plog.Panicf("unexpected removal of unknown peer '%d'", id) } delete(t.peers, id) delete(t.leaderStats.Followers, id.String()) @@ -227,7 +229,7 @@ func (t *transport) UpdatePeer(id types.ID, us []string) { } urls, err := types.NewURLs(us) if err != nil { - log.Panicf("newURLs %+v should never fail: %+v", us, err) + plog.Panicf("newURLs %+v should never fail: %+v", us, err) } t.peers[id].Update(urls) }