From 97579e2e1d1b34408f695b3490333d072d24d893 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 7 Mar 2015 21:00:13 -0800 Subject: [PATCH] raft: introduce logger interface --- raft/log.go | 20 +++---- raft/log_unstable.go | 14 ++--- raft/logger.go | 126 ++++++++++++++++++++++++++++++++++++++++ raft/node.go | 7 +-- raft/raft.go | 71 +++++++++++----------- raft/raft_paper_test.go | 22 ++++--- raft/rafttest/node.go | 2 +- raft/status.go | 3 +- raft/storage.go | 9 ++- 9 files changed, 195 insertions(+), 79 deletions(-) create mode 100644 raft/logger.go diff --git a/raft/log.go b/raft/log.go index 14a081400..bd7edea5b 100644 --- a/raft/log.go +++ b/raft/log.go @@ -77,7 +77,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry switch { case ci == 0: case ci <= l.committed: - log.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) + raftLogger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) default: offset := index + 1 l.append(ents[ci-offset:]...) @@ -93,7 +93,7 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 { return l.lastIndex() } if after := ents[0].Index - 1; after < l.committed { - log.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) + raftLogger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) } l.unstable.truncateAndAppend(ents) return l.lastIndex() @@ -114,7 +114,7 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { for _, ne := range ents { if !l.matchTerm(ne.Index, ne.Term) { if ne.Index <= l.lastIndex() { - log.Printf("raftlog: found conflict at index %d [existing term: %d, conflicting term: %d]", + raftLogger.Infof("raftlog: found conflict at index %d [existing term: %d, conflicting term: %d]", ne.Index, l.term(ne.Index), ne.Term) } return ne.Index @@ -174,7 +174,7 @@ func (l *raftLog) commitTo(tocommit uint64) { // never decrease commit if l.committed < tocommit { if l.lastIndex() < tocommit { - log.Panicf("tocommit(%d) is out of range [lastIndex(%d)]", tocommit, l.lastIndex()) + raftLogger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]", tocommit, l.lastIndex()) } l.committed = tocommit } @@ -185,7 +185,7 @@ func (l *raftLog) appliedTo(i uint64) { return } if l.committed < i || i < l.applied { - log.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) + raftLogger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) } l.applied = i } @@ -248,7 +248,7 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { } func (l *raftLog) restore(s pb.Snapshot) { - log.Printf("raftlog: log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term) + raftLogger.Infof("raftlog: log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term) l.committed = s.Metadata.Index l.unstable.restore(s) } @@ -264,9 +264,9 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry { storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset)) if err == ErrCompacted { // This should never fail because it has been checked before. - log.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset)) + raftLogger.Panicf("entries[%d:%d) from storage is out of bound", lo, min(hi, l.unstable.offset)) } else if err == ErrUnavailable { - log.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset)) + raftLogger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset)) } else if err != nil { panic(err) // TODO(bdarnell) } @@ -287,10 +287,10 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry { // l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries) func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) { if lo > hi { - log.Panicf("raft: invalid slice %d > %d", lo, hi) + raftLogger.Panicf("raft: invalid slice %d > %d", lo, hi) } length := l.lastIndex() - l.firstIndex() + 1 if lo < l.firstIndex() || hi > l.firstIndex()+length { - log.Panicf("raft: slice[%d,%d) out of bound [%d,%d]", lo, hi, l.firstIndex(), l.lastIndex()) + raftLogger.Panicf("raft: slice[%d,%d) out of bound [%d,%d]", lo, hi, l.firstIndex(), l.lastIndex()) } } diff --git a/raft/log_unstable.go b/raft/log_unstable.go index b2aab5975..4b6e42b30 100644 --- a/raft/log_unstable.go +++ b/raft/log_unstable.go @@ -14,11 +14,7 @@ package raft -import ( - "log" - - pb "github.com/coreos/etcd/raft/raftpb" -) +import pb "github.com/coreos/etcd/raft/raftpb" // unstable.entris[i] has raft log position i+unstable.offset. // Note that unstable.offset may be less than the highest log @@ -110,7 +106,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) { // directly append u.entries = append(u.entries, ents...) case after < u.offset: - log.Printf("raftlog: replace the unstable entries from index %d", after+1) + raftLogger.Infof("raftlog: replace the unstable entries from index %d", after+1) // The log is being truncated to before our current offset // portion, so set the offset and replace the entries u.offset = after + 1 @@ -118,7 +114,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) { default: // truncate to after and copy to u.entries // then append - log.Printf("raftlog: truncate the unstable entries to index %d", after) + raftLogger.Infof("raftlog: truncate the unstable entries to index %d", after) u.entries = append([]pb.Entry{}, u.slice(u.offset, after+1)...) u.entries = append(u.entries, ents...) } @@ -132,10 +128,10 @@ func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry { // u.offset <= lo <= hi <= u.offset+len(u.offset) func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) { if lo > hi { - log.Panicf("raft: invalid unstable.slice %d > %d", lo, hi) + raftLogger.Panicf("raft: invalid unstable.slice %d > %d", lo, hi) } upper := u.offset + uint64(len(u.entries)) if lo < u.offset || hi > upper { - log.Panicf("raft: unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper) + raftLogger.Panicf("raft: unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper) } } diff --git a/raft/logger.go b/raft/logger.go new file mode 100644 index 000000000..82d52e524 --- /dev/null +++ b/raft/logger.go @@ -0,0 +1,126 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package raft + +import ( + "fmt" + "io/ioutil" + "log" + "os" +) + +type Logger interface { + Debug(v ...interface{}) + Debugf(format string, v ...interface{}) + + Error(v ...interface{}) + Errorf(format string, v ...interface{}) + + Info(v ...interface{}) + Infof(format string, v ...interface{}) + + Warning(v ...interface{}) + Warningf(format string, v ...interface{}) + + Fatal(v ...interface{}) + Fatalf(format string, v ...interface{}) + + Panic(v ...interface{}) + Panicf(format string, v ...interface{}) +} + +func SetLogger(l Logger) { raftLogger = l } + +var ( + defaultLogger = &DefaultLogger{Logger: log.New(os.Stderr, "", 0)} + discardLogger = &DefaultLogger{Logger: log.New(ioutil.Discard, "", 0)} + raftLogger = Logger(defaultLogger) +) + +const ( + calldepth = 2 +) + +// DefaultLogger is a defualt implementation of the Logger interface. +type DefaultLogger struct { + *log.Logger + debug bool +} + +func (l *DefaultLogger) EnableTimestamps() { + l.SetFlags(l.Flags() | log.Ldate | log.Ltime) +} + +func (l *DefaultLogger) EnableDebug() { + l.debug = true +} + +func (l *DefaultLogger) Debug(v ...interface{}) { + if l.debug { + l.Output(calldepth, header("DEBUG", fmt.Sprint(v...))) + } +} + +func (l *DefaultLogger) Debugf(format string, v ...interface{}) { + if l.debug { + l.Output(calldepth, header("DEBUG", fmt.Sprintf(format, v...))) + } +} + +func (l *DefaultLogger) Info(v ...interface{}) { + l.Output(calldepth, header("INFO", fmt.Sprint(v...))) +} + +func (l *DefaultLogger) Infof(format string, v ...interface{}) { + l.Output(calldepth, header("INFO", fmt.Sprintf(format, v...))) +} + +func (l *DefaultLogger) Error(v ...interface{}) { + l.Output(calldepth, header("ERROR", fmt.Sprint(v...))) +} + +func (l *DefaultLogger) Errorf(format string, v ...interface{}) { + l.Output(calldepth, header("ERROR", fmt.Sprintf(format, v...))) +} + +func (l *DefaultLogger) Warning(v ...interface{}) { + l.Output(calldepth, header("WARN", fmt.Sprint(v...))) +} + +func (l *DefaultLogger) Warningf(format string, v ...interface{}) { + l.Output(calldepth, header("WARN", fmt.Sprintf(format, v...))) +} + +func (l *DefaultLogger) Fatal(v ...interface{}) { + l.Output(calldepth, header("FATAL", fmt.Sprint(v...))) + os.Exit(1) +} + +func (l *DefaultLogger) Fatalf(format string, v ...interface{}) { + l.Output(calldepth, header("FATAL", fmt.Sprintf(format, v...))) + os.Exit(1) +} + +func (l *DefaultLogger) Panic(v ...interface{}) { + l.Logger.Panic(v) +} + +func (l *DefaultLogger) Panicf(format string, v ...interface{}) { + l.Logger.Panicf(format, v) +} + +func header(lvl, msg string) string { + return fmt.Sprintf("%s: %s", lvl, msg) +} diff --git a/raft/node.go b/raft/node.go index 00e25bdce..c9761577f 100644 --- a/raft/node.go +++ b/raft/node.go @@ -16,7 +16,6 @@ package raft import ( "errors" - "log" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" pb "github.com/coreos/etcd/raft/raftpb" @@ -263,13 +262,13 @@ func (n *node) run(r *raft) { if lead != r.lead { if r.hasLeader() { if lead == None { - log.Printf("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term) + raftLogger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term) } else { - log.Printf("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term) + raftLogger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term) } propc = n.propc } else { - log.Printf("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term) + raftLogger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term) propc = nil } lead = r.lead diff --git a/raft/raft.go b/raft/raft.go index c1f9aebce..279e8a16a 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -17,7 +17,6 @@ package raft import ( "errors" "fmt" - "log" "math/rand" "sort" "strings" @@ -229,7 +228,7 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n)) } - log.Printf("raft: newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", + raftLogger.Infof("raft: newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm()) return r } @@ -286,10 +285,10 @@ func (r *raft) sendAppend(to uint64) { } m.Snapshot = snapshot sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term - log.Printf("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", + raftLogger.Infof("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr) pr.setPendingSnapshot(sindex) - log.Printf("raft: %x paused sending replication messages to %x [%s]", r.id, to, pr) + raftLogger.Infof("raft: %x paused sending replication messages to %x [%s]", r.id, to, pr) } else { m.Type = pb.MsgApp m.Index = pr.Next - 1 @@ -411,7 +410,7 @@ func (r *raft) becomeFollower(term uint64, lead uint64) { r.tick = r.tickElection r.lead = lead r.state = StateFollower - log.Printf("raft: %x became follower at term %d", r.id, r.Term) + raftLogger.Infof("raft: %x became follower at term %d", r.id, r.Term) } func (r *raft) becomeCandidate() { @@ -424,7 +423,7 @@ func (r *raft) becomeCandidate() { r.tick = r.tickElection r.Vote = r.id r.state = StateCandidate - log.Printf("raft: %x became candidate at term %d", r.id, r.Term) + raftLogger.Infof("raft: %x became candidate at term %d", r.id, r.Term) } func (r *raft) becomeLeader() { @@ -447,7 +446,7 @@ func (r *raft) becomeLeader() { r.pendingConf = true } r.appendEntry(pb.Entry{Data: nil}) - log.Printf("raft: %x became leader at term %d", r.id, r.Term) + raftLogger.Infof("raft: %x became leader at term %d", r.id, r.Term) } func (r *raft) campaign() { @@ -460,7 +459,7 @@ func (r *raft) campaign() { if i == r.id { continue } - log.Printf("raft: %x [logterm: %d, index: %d] sent vote request to %x at term %d", + raftLogger.Infof("raft: %x [logterm: %d, index: %d] sent vote request to %x at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term) r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()}) } @@ -468,9 +467,9 @@ func (r *raft) campaign() { func (r *raft) poll(id uint64, v bool) (granted int) { if v { - log.Printf("raft: %x received vote from %x at term %d", r.id, id, r.Term) + raftLogger.Infof("raft: %x received vote from %x at term %d", r.id, id, r.Term) } else { - log.Printf("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term) + raftLogger.Infof("raft: %x received vote rejection from %x at term %d", r.id, id, r.Term) } if _, ok := r.votes[id]; !ok { r.votes[id] = v @@ -485,7 +484,7 @@ func (r *raft) poll(id uint64, v bool) (granted int) { func (r *raft) Step(m pb.Message) error { if m.Type == pb.MsgHup { - log.Printf("raft: %x is starting a new election at term %d", r.id, r.Term) + raftLogger.Infof("raft: %x is starting a new election at term %d", r.id, r.Term) r.campaign() r.Commit = r.raftLog.committed return nil @@ -499,12 +498,12 @@ func (r *raft) Step(m pb.Message) error { if m.Type == pb.MsgVote { lead = None } - log.Printf("raft: %x [term: %d] received a %s message with higher term from %x [term: %d]", + raftLogger.Infof("raft: %x [term: %d] received a %s message with higher term from %x [term: %d]", r.id, r.Term, m.Type, m.From, m.Term) r.becomeFollower(m.Term, lead) case m.Term < r.Term: // ignore - log.Printf("raft: %x [term: %d] ignored a %s message with lower term from %x [term: %d]", + raftLogger.Infof("raft: %x [term: %d] ignored a %s message with lower term from %x [term: %d]", r.id, r.Term, m.Type, m.From, m.Term) return nil } @@ -523,7 +522,7 @@ func stepLeader(r *raft, m pb.Message) { r.bcastHeartbeat() case pb.MsgProp: if len(m.Entries) == 0 { - log.Panicf("raft: %x stepped empty MsgProp", r.id) + raftLogger.Panicf("raft: %x stepped empty MsgProp", r.id) } for i, e := range m.Entries { if e.Type == pb.EntryConfChange { @@ -538,20 +537,20 @@ func stepLeader(r *raft, m pb.Message) { case pb.MsgAppResp: if pr.isUnreachable() { pr.reachable() - log.Printf("raft: %x received msgAppResp from %x and changed it to be reachable [%s]", r.id, m.From, pr) + raftLogger.Infof("raft: %x received msgAppResp from %x and changed it to be reachable [%s]", r.id, m.From, pr) } if m.Reject { - log.Printf("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d", + raftLogger.Infof("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d", r.id, m.RejectHint, m.From, m.Index) if pr.maybeDecrTo(m.Index, m.RejectHint) { - log.Printf("raft: %x decreased progress of %x to [%s]", r.id, m.From, pr) + raftLogger.Infof("raft: %x decreased progress of %x to [%s]", r.id, m.From, pr) r.sendAppend(m.From) } } else { oldWait := pr.shouldWait() pr.update(m.Index) if r.prs[m.From].maybeSnapshotAbort() { - log.Printf("raft: %x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + raftLogger.Infof("raft: %x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr) } if r.maybeCommit() { r.bcastAppend() @@ -564,13 +563,13 @@ func stepLeader(r *raft, m pb.Message) { case pb.MsgHeartbeatResp: if pr.isUnreachable() { pr.reachable() - log.Printf("raft: %x received msgHeartbeatResp from %x and changed it to be reachable [%s]", r.id, m.From, pr) + raftLogger.Infof("raft: %x received msgHeartbeatResp from %x and changed it to be reachable [%s]", r.id, m.From, pr) } if pr.Match < r.raftLog.lastIndex() { r.sendAppend(m.From) } case pb.MsgVote: - log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d", + raftLogger.Infof("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) case pb.MsgSnapStatus: @@ -579,10 +578,10 @@ func stepLeader(r *raft, m pb.Message) { } if m.Reject { pr.snapshotFail() - log.Printf("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + raftLogger.Infof("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr) } else { pr.snapshotFinish() - log.Printf("raft: %x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + raftLogger.Infof("raft: %x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) // wait for the msgAppResp from the remote node before sending // out the next msgApp pr.waitSet(r.electionTimeout) @@ -590,7 +589,7 @@ func stepLeader(r *raft, m pb.Message) { case pb.MsgUnreachable: if !pr.isUnreachable() { pr.unreachable() - log.Printf("raft: %x failed to send message to %x and changed it to be unreachable [%s]", r.id, m.From, pr) + raftLogger.Infof("raft: %x failed to send message to %x and changed it to be unreachable [%s]", r.id, m.From, pr) } } } @@ -598,7 +597,7 @@ func stepLeader(r *raft, m pb.Message) { func stepCandidate(r *raft, m pb.Message) { switch m.Type { case pb.MsgProp: - log.Printf("raft: %x no leader at term %d; dropping proposal", r.id, r.Term) + raftLogger.Infof("raft: %x no leader at term %d; dropping proposal", r.id, r.Term) return case pb.MsgApp: r.becomeFollower(r.Term, m.From) @@ -610,12 +609,12 @@ func stepCandidate(r *raft, m pb.Message) { r.becomeFollower(m.Term, m.From) r.handleSnapshot(m) case pb.MsgVote: - log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x", + raftLogger.Infof("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) case pb.MsgVoteResp: gr := r.poll(m.From, !m.Reject) - log.Printf("raft: %x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr) + raftLogger.Infof("raft: %x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr) switch r.q() { case gr: r.becomeLeader() @@ -630,7 +629,7 @@ func stepFollower(r *raft, m pb.Message) { switch m.Type { case pb.MsgProp: if r.lead == None { - log.Printf("raft: %x no leader at term %d; dropping proposal", r.id, r.Term) + raftLogger.Infof("raft: %x no leader at term %d; dropping proposal", r.id, r.Term) return } m.To = r.lead @@ -649,12 +648,12 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgVote: if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { r.elapsed = 0 - log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d", + raftLogger.Infof("raft: %x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.Vote = m.From r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp}) } else { - log.Printf("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d", + raftLogger.Infof("raft: %x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) } @@ -665,7 +664,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { - log.Printf("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x", + raftLogger.Infof("raft: %x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x", r.id, r.raftLog.term(m.Index), m.Index, m.LogTerm, m.Index, m.From) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()}) } @@ -679,11 +678,11 @@ func (r *raft) handleHeartbeat(m pb.Message) { func (r *raft) handleSnapshot(m pb.Message) { sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term if r.restore(m.Snapshot) { - log.Printf("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]", + raftLogger.Infof("raft: %x [commit: %d] restored snapshot [index: %d, term: %d]", r.id, r.Commit, sindex, sterm) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) } else { - log.Printf("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]", + raftLogger.Infof("raft: %x [commit: %d] ignored snapshot [index: %d, term: %d]", r.id, r.Commit, sindex, sterm) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) } @@ -696,13 +695,13 @@ func (r *raft) restore(s pb.Snapshot) bool { return false } if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) { - log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", + raftLogger.Infof("raft: %x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.commitTo(s.Metadata.Index) return false } - log.Printf("raft: %x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", + raftLogger.Infof("raft: %x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.restore(s) @@ -715,7 +714,7 @@ func (r *raft) restore(s pb.Snapshot) bool { match = 0 } r.setProgress(n, match, next) - log.Printf("raft: %x restored progress of %x [%s]", r.id, n, r.prs[n]) + raftLogger.Infof("raft: %x restored progress of %x [%s]", r.id, n, r.prs[n]) } return true } @@ -759,7 +758,7 @@ func (r *raft) delProgress(id uint64) { func (r *raft) loadState(state pb.HardState) { if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() { - log.Panicf("raft: %x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex()) + raftLogger.Panicf("raft: %x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex()) } r.raftLog.committed = state.Commit r.Term = state.Term diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 9bb83c268..1a29b57ff 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -28,12 +28,10 @@ package raft import ( "fmt" - "io/ioutil" - "log" - "os" + "testing" + "reflect" "sort" - "testing" pb "github.com/coreos/etcd/raft/raftpb" ) @@ -294,13 +292,13 @@ func TestCandidateFallback(t *testing.T) { } func TestFollowerElectionTimeoutRandomized(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stderr) + SetLogger(discardLogger) + defer SetLogger(defaultLogger) testNonleaderElectionTimeoutRandomized(t, StateFollower) } func TestCandidateElectionTimeoutRandomized(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stderr) + SetLogger(discardLogger) + defer SetLogger(defaultLogger) testNonleaderElectionTimeoutRandomized(t, StateCandidate) } @@ -335,13 +333,13 @@ func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) { } func TestFollowersElectioinTimeoutNonconflict(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stderr) + SetLogger(discardLogger) + defer SetLogger(defaultLogger) testNonleadersElectionTimeoutNonconflict(t, StateFollower) } func TestCandidatesElectionTimeoutNonconflict(t *testing.T) { - log.SetOutput(ioutil.Discard) - defer log.SetOutput(os.Stderr) + SetLogger(discardLogger) + defer SetLogger(defaultLogger) testNonleadersElectionTimeoutNonconflict(t, StateCandidate) } diff --git a/raft/rafttest/node.go b/raft/rafttest/node.go index 6071b3df0..6e43aab15 100644 --- a/raft/rafttest/node.go +++ b/raft/rafttest/node.go @@ -59,7 +59,7 @@ func (n *node) start() { n.Step(context.TODO(), m) case <-n.stopc: n.Stop() - log.Printf("raft.%d: stop", n.id) + raftLogger.Infof("raft.%d: stop", n.id) n.Node = nil close(n.stopc) return diff --git a/raft/status.go b/raft/status.go index 0616b40f6..7f6b08f6d 100644 --- a/raft/status.go +++ b/raft/status.go @@ -16,7 +16,6 @@ package raft import ( "fmt" - "log" pb "github.com/coreos/etcd/raft/raftpb" ) @@ -70,7 +69,7 @@ func (s Status) MarshalJSON() ([]byte, error) { func (s Status) String() string { b, err := s.MarshalJSON() if err != nil { - log.Panicf("unexpected error: %v", err) + raftLogger.Panicf("unexpected error: %v", err) } return string(b) } diff --git a/raft/storage.go b/raft/storage.go index c37dee669..c4cfe7f20 100644 --- a/raft/storage.go +++ b/raft/storage.go @@ -16,7 +16,6 @@ package raft import ( "errors" - "log" "sync" pb "github.com/coreos/etcd/raft/raftpb" @@ -101,7 +100,7 @@ func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) { return nil, ErrCompacted } if hi > ms.lastIndex()+1 { - log.Panicf("entries's hi(%d) is out of bound lastindex(%d)", hi, ms.lastIndex()) + raftLogger.Panicf("entries's hi(%d) is out of bound lastindex(%d)", hi, ms.lastIndex()) } // only contains dummy entries. if len(ms.ents) == 1 { @@ -175,7 +174,7 @@ func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) offset := ms.ents[0].Index if i > ms.lastIndex() { - log.Panicf("snapshot %d is out of bound lastindex(%d)", i, ms.lastIndex()) + raftLogger.Panicf("snapshot %d is out of bound lastindex(%d)", i, ms.lastIndex()) } ms.snapshot.Metadata.Index = i @@ -196,7 +195,7 @@ func (ms *MemoryStorage) Compact(compactIndex uint64) error { return ErrCompacted } if compactIndex > ms.lastIndex() { - log.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex()) + raftLogger.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex()) } i := compactIndex - offset @@ -237,7 +236,7 @@ func (ms *MemoryStorage) Append(entries []pb.Entry) error { case uint64(len(ms.ents)) == offset: ms.ents = append(ms.ents, entries...) default: - log.Panicf("missing log entry [last: %d, append at: %d]", + raftLogger.Panicf("missing log entry [last: %d, append at: %d]", ms.lastIndex(), entries[0].Index) } return nil