Merge pull request #2911 from yichengq/rafthttp-plog

rafthttp: use leveled logger
This commit is contained in:
Yicheng Qin 2015-06-09 16:16:33 -07:00
commit 1403783326
6 changed files with 51 additions and 44 deletions

View File

@ -17,7 +17,6 @@ package rafthttp
import (
"errors"
"io/ioutil"
"log"
"net/http"
"path"
@ -77,7 +76,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil {
log.Printf("rafthttp: request received was ignored (%v)", err)
plog.Errorf("request received was ignored (%v)", err)
http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed)
return
}
@ -87,7 +86,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
gcid := r.Header.Get("X-Etcd-Cluster-ID")
if gcid != wcid {
log.Printf("rafthttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
plog.Errorf("request received was ignored (cluster ID mismatch got %s want %s)", gcid, wcid)
http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed)
return
}
@ -97,13 +96,13 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
limitedr := pioutil.NewLimitedBufferReader(r.Body, ConnReadLimitByte)
b, err := ioutil.ReadAll(limitedr)
if err != nil {
log.Println("rafthttp: error reading raft message:", err)
plog.Errorf("failed to read raft message (%v)", err)
http.Error(w, "error reading raft message", http.StatusBadRequest)
return
}
var m raftpb.Message
if err := m.Unmarshal(b); err != nil {
log.Println("rafthttp: error unmarshaling raft message:", err)
plog.Errorf("failed to unmarshal raft message (%v)", err)
http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
return
}
@ -112,7 +111,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case writerToResponse:
v.WriteTo(w)
default:
log.Printf("rafthttp: error processing raft message: %v", err)
plog.Warningf("failed to process raft message (%v)", err)
http.Error(w, "error processing raft message", http.StatusInternalServerError)
}
return
@ -139,7 +138,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Server-Version", version.Version)
if err := checkVersionCompability(r.Header.Get("X-Server-From"), serverVersion(r.Header), minClusterVersion(r.Header)); err != nil {
log.Printf("rafthttp: request received was ignored (%v)", err)
plog.Errorf("request received was ignored (%v)", err)
http.Error(w, errIncompatibleVersion.Error(), http.StatusPreconditionFailed)
return
}
@ -148,7 +147,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-Etcd-Cluster-ID", wcid)
if gcid := r.Header.Get("X-Etcd-Cluster-ID"); gcid != wcid {
log.Printf("rafthttp: streaming request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
plog.Errorf("streaming request ignored (cluster ID mismatch got %s want %s)", gcid, wcid)
http.Error(w, errClusterIDMismatch.Error(), http.StatusPreconditionFailed)
return
}
@ -163,7 +162,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case path.Join(RaftStreamPrefix, string(streamTypeMessage)):
t = streamTypeMessage
default:
log.Printf("rafthttp: ignored unexpected streaming request path %s", r.URL.Path)
plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
http.Error(w, "invalid path", http.StatusNotFound)
return
}
@ -171,25 +170,30 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fromStr := path.Base(r.URL.Path)
from, err := types.IDFromString(fromStr)
if err != nil {
log.Printf("rafthttp: failed to parse from %s into ID", fromStr)
plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
http.Error(w, "invalid from", http.StatusNotFound)
return
}
if h.r.IsIDRemoved(uint64(from)) {
log.Printf("rafthttp: reject the stream from peer %s since it was removed", from)
plog.Warningf("rejected the stream from peer %s since it was removed", from)
http.Error(w, "removed member", http.StatusGone)
return
}
p := h.peerGetter.Get(from)
if p == nil {
log.Printf("rafthttp: fail to find sender %s", from)
// This may happen in following cases:
// 1. user starts a remote peer that belongs to a different cluster
// with the same cluster ID.
// 2. local etcd falls behind of the cluster, and cannot recognize
// the members that joined after its current progress.
plog.Errorf("failed to find member %s in cluster %s", from, wcid)
http.Error(w, "error sender not found", http.StatusNotFound)
return
}
wto := h.id.String()
if gto := r.Header.Get("X-Raft-To"); gto != wto {
log.Printf("rafthttp: streaming request ignored due to ID mismatch got %s want %s", gto, wto)
plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
return
}

View File

@ -15,7 +15,6 @@
package rafthttp
import (
"log"
"net/http"
"time"
@ -135,7 +134,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
select {
case mm := <-p.propc:
if err := r.Process(ctx, mm); err != nil {
log.Printf("peer: process raft message error: %v", err)
plog.Warningf("failed to process raft message (%v)", err)
}
case <-p.stopc:
return
@ -161,11 +160,12 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
}
log.Printf("peer: dropping %s to %s since %s's sending buffer is full", m.Type, p.id, name)
// TODO: log start and end of message dropping
plog.Warningf("dropping %s to %s since %s's sending buffer is full", m.Type, p.id, name)
}
case mm := <-p.recvc:
if err := r.Process(context.TODO(), mm); err != nil {
log.Printf("peer: process raft message error: %v", err)
plog.Warningf("failed to process raft message (%v)", err)
}
case urls := <-p.newURLsC:
picker.update(urls)
@ -213,7 +213,7 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) {
case streamTypeMessage:
ok = p.writer.attach(conn)
default:
log.Panicf("rafthttp: unhandled stream type %s", conn.t)
plog.Panicf("unhandled stream type %s", conn.t)
}
if !ok {
conn.Close()

View File

@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"strings"
"sync"
@ -111,11 +110,11 @@ func (p *pipeline) handle() {
reportSentFailure(pipelineMsg, m)
if p.errored == nil || p.errored.Error() != err.Error() {
log.Printf("pipeline: error posting to %s: %v", p.to, err)
plog.Errorf("failed to post to %s (%v)", p.to, err)
p.errored = err
}
if p.active {
log.Printf("pipeline: the connection with %s became inactive", p.to)
plog.Infof("the connection with %s became inactive", p.to)
p.active = false
}
if m.Type == raftpb.MsgApp && p.fs != nil {
@ -127,7 +126,7 @@ func (p *pipeline) handle() {
}
} else {
if !p.active {
log.Printf("pipeline: the connection with %s became active", p.to)
plog.Infof("the connection with %s became active", p.to)
p.active = true
p.errored = nil
}
@ -196,10 +195,10 @@ func (p *pipeline) post(data []byte) (err error) {
case http.StatusPreconditionFailed:
switch strings.TrimSuffix(string(b), "\n") {
case errIncompatibleVersion.Error():
log.Printf("rafthttp: request sent was ignored by peer %s (server version incompatible)", p.to)
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", p.to)
return errIncompatibleVersion
case errClusterIDMismatch.Error():
log.Printf("rafthttp: request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
p.to, resp.Header.Get("X-Etcd-Cluster-ID"), p.cid)
return errClusterIDMismatch
default:

View File

@ -15,7 +15,6 @@
package rafthttp
import (
"log"
"net/http"
"github.com/coreos/etcd/pkg/types"
@ -39,7 +38,8 @@ func (g *remote) Send(m raftpb.Message) {
select {
case g.pipeline.msgc <- m:
default:
log.Printf("remote: dropping %s to %s since sending buffer is full", m.Type, g.id)
// TODO: log start and end of message dropping
plog.Warningf("dropping %s to %s since sending buffer is full", m.Type, g.id)
}
}

View File

@ -18,7 +18,6 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"path"
@ -63,7 +62,7 @@ func (t streamType) endpoint() string {
case streamTypeMessage:
return path.Join(RaftStreamPrefix, "message")
default:
log.Panicf("rafthttp: unhandled stream type %v", t)
plog.Panicf("unhandled stream type %v", t)
return ""
}
}
@ -134,7 +133,7 @@ func (cw *streamWriter) run() {
if err := enc.encode(linkHeartbeatMessage); err != nil {
reportSentFailure(string(t), linkHeartbeatMessage)
log.Printf("rafthttp: failed to heartbeat on stream %s (%v)", t, err)
plog.Errorf("failed to heartbeat on stream %s (%v)", t, err)
cw.close()
heartbeatc, msgc = nil, nil
continue
@ -156,7 +155,7 @@ func (cw *streamWriter) run() {
if err := enc.encode(m); err != nil {
reportSentFailure(string(t), m)
log.Printf("rafthttp: failed to send message on stream %s (%v)", t, err)
plog.Errorf("failed to send message on stream %s (%v)", t, err)
cw.close()
heartbeatc, msgc = nil, nil
cw.r.ReportUnreachable(m.To)
@ -172,7 +171,7 @@ func (cw *streamWriter) run() {
var err error
msgAppTerm, err = strconv.ParseUint(conn.termStr, 10, 64)
if err != nil {
log.Panicf("rafthttp: could not parse term %s to uint (%v)", conn.termStr, err)
plog.Panicf("could not parse term %s to uint (%v)", conn.termStr, err)
}
enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs}
case streamTypeMsgAppV2:
@ -180,7 +179,7 @@ func (cw *streamWriter) run() {
case streamTypeMessage:
enc = &messageEncoder{w: conn.Writer}
default:
log.Panicf("rafthttp: unhandled stream type %s", conn.t)
plog.Panicf("unhandled stream type %s", conn.t)
}
flusher = conn.Flusher
cw.mu.Lock()
@ -280,7 +279,9 @@ func (cr *streamReader) run() {
}
if err != nil {
if err != errUnsupportedStreamType {
log.Printf("rafthttp: failed to dial stream %s (%v)", t, err)
// TODO: log start and end of the stream, and print
// error in backoff way
plog.Errorf("failed to dial stream %s (%v)", t, err)
}
} else {
err := cr.decodeLoop(rc, t)
@ -293,7 +294,7 @@ func (cr *streamReader) run() {
// heartbeat on the idle stream, so it is expected to time out.
case t == streamTypeMsgApp && isNetworkTimeoutError(err):
default:
log.Printf("rafthttp: failed to read message on stream %s (%v)", t, err)
plog.Errorf("failed to read message on stream %s (%v)", t, err)
}
}
select {
@ -318,7 +319,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
case streamTypeMessage:
dec = &messageDecoder{r: rc}
default:
log.Panicf("rafthttp: unhandled stream type %s", t)
plog.Panicf("unhandled stream type %s", t)
}
cr.closer = rc
cr.mu.Unlock()
@ -341,7 +342,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
select {
case recvc <- m:
default:
log.Printf("rafthttp: dropping %s from %x because receiving buffer is full",
// TODO: log start and end of message dropping
plog.Warningf("dropping %s from %x because receiving buffer is full",
m.Type, m.From)
}
}
@ -442,10 +444,10 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
switch strings.TrimSuffix(string(b), "\n") {
case errIncompatibleVersion.Error():
log.Printf("rafthttp: request sent was ignored by peer %s (server version incompatible)", cr.to)
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.to)
return nil, errIncompatibleVersion
case errClusterIDMismatch.Error():
log.Printf("rafthttp: request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
cr.to, resp.Header.Get("X-Etcd-Cluster-ID"), cr.cid)
return nil, errClusterIDMismatch
default:

View File

@ -15,10 +15,10 @@
package rafthttp
import (
"log"
"net/http"
"sync"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
@ -26,6 +26,8 @@ import (
"github.com/coreos/etcd/raft/raftpb"
)
var plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "snap")
type Raft interface {
Process(ctx context.Context, m raftpb.Message) error
IsIDRemoved(id uint64) bool
@ -150,7 +152,7 @@ func (t *transport) Send(msgs []raftpb.Message) {
continue
}
log.Printf("rafthttp: ignored message %s (sent to unknown receiver %s)", m.Type, to)
plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to)
}
}
@ -174,7 +176,7 @@ func (t *transport) AddRemote(id types.ID, us []string) {
}
urls, err := types.NewURLs(us)
if err != nil {
log.Panicf("newURLs %+v should never fail: %+v", us, err)
plog.Panicf("newURLs %+v should never fail: %+v", us, err)
}
t.remotes[id] = startRemote(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, t.errorc)
}
@ -187,7 +189,7 @@ func (t *transport) AddPeer(id types.ID, us []string) {
}
urls, err := types.NewURLs(us)
if err != nil {
log.Panicf("newURLs %+v should never fail: %+v", us, err)
plog.Panicf("newURLs %+v should never fail: %+v", us, err)
}
fs := t.leaderStats.Follower(id.String())
t.peers[id] = startPeer(t.roundTripper, urls, t.id, id, t.clusterID, t.raft, fs, t.errorc)
@ -212,7 +214,7 @@ func (t *transport) removePeer(id types.ID) {
if peer, ok := t.peers[id]; ok {
peer.Stop()
} else {
log.Panicf("rafthttp: unexpected removal of unknown peer '%d'", id)
plog.Panicf("unexpected removal of unknown peer '%d'", id)
}
delete(t.peers, id)
delete(t.leaderStats.Followers, id.String())
@ -227,7 +229,7 @@ func (t *transport) UpdatePeer(id types.ID, us []string) {
}
urls, err := types.NewURLs(us)
if err != nil {
log.Panicf("newURLs %+v should never fail: %+v", us, err)
plog.Panicf("newURLs %+v should never fail: %+v", us, err)
}
t.peers[id].Update(urls)
}