diff --git a/raft/log.go b/raft/log.go index 280d03112..e171d36ea 100644 --- a/raft/log.go +++ b/raft/log.go @@ -36,16 +36,19 @@ type raftLog struct { // been instructed to apply to its state machine. // Invariant: applied <= committed applied uint64 + + logger Logger } // newLog returns log using the given storage. It recovers the log to the state // that it just commits and applies the latest snapshot. -func newLog(storage Storage) *raftLog { +func newLog(storage Storage, logger Logger) *raftLog { if storage == nil { log.Panic("storage must not be nil") } log := &raftLog{ storage: storage, + logger: logger, } firstIndex, err := storage.FirstIndex() if err != nil { @@ -56,6 +59,7 @@ func newLog(storage Storage) *raftLog { panic(err) // TODO(bdarnell) } log.unstable.offset = lastIndex + 1 + log.unstable.logger = logger // Initialize our committed and applied pointers to the time of the last compaction. log.committed = firstIndex - 1 log.applied = firstIndex - 1 @@ -76,7 +80,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry switch { case ci == 0: case ci <= l.committed: - raftLogger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) + l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) default: offset := index + 1 l.append(ents[ci-offset:]...) @@ -92,7 +96,7 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 { return l.lastIndex() } if after := ents[0].Index - 1; after < l.committed { - raftLogger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) + l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed) } l.unstable.truncateAndAppend(ents) return l.lastIndex() @@ -113,8 +117,8 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { for _, ne := range ents { if !l.matchTerm(ne.Index, ne.Term) { if ne.Index <= l.lastIndex() { - raftLogger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", - ne.Index, zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term) + l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", + ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term) } return ne.Index } @@ -137,7 +141,7 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) { if l.committed+1 > off { ents, err := l.slice(off, l.committed+1, noLimit) if err != nil { - raftLogger.Panicf("unexpected error when getting unapplied entries (%v)", err) + l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) } return ents } @@ -177,7 +181,7 @@ func (l *raftLog) commitTo(tocommit uint64) { // never decrease commit if l.committed < tocommit { if l.lastIndex() < tocommit { - raftLogger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]", tocommit, l.lastIndex()) + l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]", tocommit, l.lastIndex()) } l.committed = tocommit } @@ -188,7 +192,7 @@ func (l *raftLog) appliedTo(i uint64) { return } if l.committed < i || i < l.applied { - raftLogger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) + l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed) } l.applied = i } @@ -200,7 +204,7 @@ func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) } func (l *raftLog) lastTerm() uint64 { t, err := l.term(l.lastIndex()) if err != nil { - raftLogger.Panicf("unexpected error when getting the last term (%v)", err) + l.logger.Panicf("unexpected error when getting the last term (%v)", err) } return t } @@ -266,7 +270,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool { } func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { - if maxIndex > l.committed && zeroTermOnErrCompacted(l.term(maxIndex)) == term { + if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } @@ -274,7 +278,7 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { } func (l *raftLog) restore(s pb.Snapshot) { - raftLogger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term) + l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term) l.committed = s.Metadata.Index l.unstable.restore(s) } @@ -294,7 +298,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) { if err == ErrCompacted { return nil, err } else if err == ErrUnavailable { - raftLogger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset)) + l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset)) } else if err != nil { panic(err) // TODO(bdarnell) } @@ -321,7 +325,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) { // l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries) func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { if lo > hi { - raftLogger.Panicf("invalid slice %d > %d", lo, hi) + l.logger.Panicf("invalid slice %d > %d", lo, hi) } fi := l.firstIndex() if lo < fi { @@ -330,18 +334,18 @@ func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { length := l.lastIndex() - fi + 1 if lo < fi || hi > fi+length { - raftLogger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex()) + l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex()) } return nil } -func zeroTermOnErrCompacted(t uint64, err error) uint64 { +func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 { if err == nil { return t } if err == ErrCompacted { return 0 } - raftLogger.Panicf("unexpected error (%v)", err) + l.logger.Panicf("unexpected error (%v)", err) return 0 } diff --git a/raft/log_test.go b/raft/log_test.go index bd9d928aa..1b0590075 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -46,7 +46,7 @@ func TestFindConflict(t *testing.T) { } for i, tt := range tests { - raftLog := newLog(NewMemoryStorage()) + raftLog := newLog(NewMemoryStorage(), raftLogger) raftLog.append(previousEnts...) gconflict := raftLog.findConflict(tt.ents) @@ -58,7 +58,7 @@ func TestFindConflict(t *testing.T) { func TestIsUpToDate(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} - raftLog := newLog(NewMemoryStorage()) + raftLog := newLog(NewMemoryStorage(), raftLogger) raftLog.append(previousEnts...) tests := []struct { lastIndex uint64 @@ -126,7 +126,7 @@ func TestAppend(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append(previousEnts) - raftLog := newLog(storage) + raftLog := newLog(storage, raftLogger) index := raftLog.append(tt.ents...) if index != tt.windex { @@ -237,7 +237,7 @@ func TestLogMaybeAppend(t *testing.T) { } for i, tt := range tests { - raftLog := newLog(NewMemoryStorage()) + raftLog := newLog(NewMemoryStorage(), raftLogger) raftLog.append(previousEnts...) raftLog.committed = commit func() { @@ -285,7 +285,7 @@ func TestCompactionSideEffects(t *testing.T) { for i = 1; i <= unstableIndex; i++ { storage.Append([]pb.Entry{{Term: uint64(i), Index: uint64(i)}}) } - raftLog := newLog(storage) + raftLog := newLog(storage, raftLogger) for i = unstableIndex; i < lastIndex; i++ { raftLog.append(pb.Entry{Term: uint64(i + 1), Index: uint64(i + 1)}) } @@ -359,7 +359,7 @@ func TestNextEnts(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.ApplySnapshot(snap) - raftLog := newLog(storage) + raftLog := newLog(storage, raftLogger) raftLog.append(ents...) raftLog.maybeCommit(5, 1) raftLog.appliedTo(tt.applied) @@ -389,7 +389,7 @@ func TestUnstableEnts(t *testing.T) { storage.Append(previousEnts[:tt.unstable-1]) // append unstable entries to raftlog - raftLog := newLog(storage) + raftLog := newLog(storage, raftLogger) raftLog.append(previousEnts[tt.unstable-1:]...) ents := raftLog.unstableEntries() @@ -427,7 +427,7 @@ func TestCommitTo(t *testing.T) { } } }() - raftLog := newLog(NewMemoryStorage()) + raftLog := newLog(NewMemoryStorage(), raftLogger) raftLog.append(previousEnts...) raftLog.committed = commit raftLog.commitTo(tt.commit) @@ -450,7 +450,7 @@ func TestStableTo(t *testing.T) { {3, 1, 1}, // bad index } for i, tt := range tests { - raftLog := newLog(NewMemoryStorage()) + raftLog := newLog(NewMemoryStorage(), raftLogger) raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) raftLog.stableTo(tt.stablei, tt.stablet) if raftLog.unstable.offset != tt.wunstable { @@ -487,7 +487,7 @@ func TestStableToWithSnap(t *testing.T) { for i, tt := range tests { s := NewMemoryStorage() s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}}) - raftLog := newLog(s) + raftLog := newLog(s, raftLogger) raftLog.append(tt.newEnts...) raftLog.stableTo(tt.stablei, tt.stablet) if raftLog.unstable.offset != tt.wunstable { @@ -525,7 +525,7 @@ func TestCompaction(t *testing.T) { for i := uint64(1); i <= tt.lastIndex; i++ { storage.Append([]pb.Entry{{Index: i}}) } - raftLog := newLog(storage) + raftLog := newLog(storage, raftLogger) raftLog.maybeCommit(tt.lastIndex, 0) raftLog.appliedTo(raftLog.committed) @@ -551,7 +551,7 @@ func TestLogRestore(t *testing.T) { snap := pb.SnapshotMetadata{Index: index, Term: term} storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: snap}) - raftLog := newLog(storage) + raftLog := newLog(storage, raftLogger) if len(raftLog.allEntries()) != 0 { t.Errorf("len = %d, want 0", len(raftLog.allEntries())) @@ -563,7 +563,7 @@ func TestLogRestore(t *testing.T) { t.Errorf("committed = %d, want %d", raftLog.committed, index) } if raftLog.unstable.offset != index+1 { - t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1) + t.Errorf("unstable = %d, want %d", raftLog.unstable.offset, index+1) } if mustTerm(raftLog.term(index)) != term { t.Errorf("term = %d, want %d", mustTerm(raftLog.term(index)), term) @@ -575,7 +575,7 @@ func TestIsOutOfBounds(t *testing.T) { num := uint64(100) storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) - l := newLog(storage) + l := newLog(storage, raftLogger) for i := uint64(1); i <= num; i++ { l.append(pb.Entry{Index: i + offset}) } @@ -658,7 +658,7 @@ func TestTerm(t *testing.T) { storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}}) - l := newLog(storage) + l := newLog(storage, raftLogger) for i = 1; i < num; i++ { l.append(pb.Entry{Index: offset + i, Term: i}) } @@ -688,7 +688,7 @@ func TestTermWithUnstableSnapshot(t *testing.T) { storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: storagesnapi, Term: 1}}) - l := newLog(storage) + l := newLog(storage, raftLogger) l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}}) tests := []struct { @@ -725,7 +725,7 @@ func TestSlice(t *testing.T) { for i = 1; i < num/2; i++ { storage.Append([]pb.Entry{{Index: offset + i, Term: offset + i}}) } - l := newLog(storage) + l := newLog(storage, raftLogger) for i = num / 2; i < num; i++ { l.append(pb.Entry{Index: offset + i, Term: offset + i}) } diff --git a/raft/log_unstable.go b/raft/log_unstable.go index f0cc0ca74..7a1b2353d 100644 --- a/raft/log_unstable.go +++ b/raft/log_unstable.go @@ -26,6 +26,8 @@ type unstable struct { // all entries that have not yet been written to storage. entries []pb.Entry offset uint64 + + logger Logger } // maybeFirstIndex returns the index of the first possible entry in entries @@ -106,7 +108,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) { // directly append u.entries = append(u.entries, ents...) case after < u.offset: - raftLogger.Infof("replace the unstable entries from index %d", after+1) + u.logger.Infof("replace the unstable entries from index %d", after+1) // The log is being truncated to before our current offset // portion, so set the offset and replace the entries u.offset = after + 1 @@ -114,7 +116,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) { default: // truncate to after and copy to u.entries // then append - raftLogger.Infof("truncate the unstable entries to index %d", after) + u.logger.Infof("truncate the unstable entries to index %d", after) u.entries = append([]pb.Entry{}, u.slice(u.offset, after+1)...) u.entries = append(u.entries, ents...) } @@ -128,10 +130,10 @@ func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry { // u.offset <= lo <= hi <= u.offset+len(u.offset) func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) { if lo > hi { - raftLogger.Panicf("invalid unstable.slice %d > %d", lo, hi) + u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi) } upper := u.offset + uint64(len(u.entries)) if lo < u.offset || hi > upper { - raftLogger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper) + u.logger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper) } } diff --git a/raft/log_unstable_test.go b/raft/log_unstable_test.go index 8aac9de9c..8012795ec 100644 --- a/raft/log_unstable_test.go +++ b/raft/log_unstable_test.go @@ -55,6 +55,7 @@ func TestUnstableMaybeFirstIndex(t *testing.T) { entries: tt.entries, offset: tt.offset, snapshot: tt.snap, + logger: raftLogger, } index, ok := u.maybeFirstIndex() if ok != tt.wok { @@ -101,6 +102,7 @@ func TestMaybeLastIndex(t *testing.T) { entries: tt.entries, offset: tt.offset, snapshot: tt.snap, + logger: raftLogger, } index, ok := u.maybeLastIndex() if ok != tt.wok { @@ -176,6 +178,7 @@ func TestUnstableMaybeTerm(t *testing.T) { entries: tt.entries, offset: tt.offset, snapshot: tt.snap, + logger: raftLogger, } term, ok := u.maybeTerm(tt.index) if ok != tt.wok { @@ -192,6 +195,7 @@ func TestUnstableRestore(t *testing.T) { entries: []pb.Entry{{Index: 5, Term: 1}}, offset: 5, snapshot: &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}}, + logger: raftLogger, } s := pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 6, Term: 2}} u.restore(s) @@ -280,6 +284,7 @@ func TestUnstableStableTo(t *testing.T) { entries: tt.entries, offset: tt.offset, snapshot: tt.snap, + logger: raftLogger, } u.stableTo(tt.index, tt.term) if u.offset != tt.woffset { @@ -336,6 +341,7 @@ func TestUnstableTruncateAndAppend(t *testing.T) { entries: tt.entries, offset: tt.offset, snapshot: tt.snap, + logger: raftLogger, } u.truncateAndAppend(tt.toappend) if u.offset != tt.woffset { diff --git a/raft/raft.go b/raft/raft.go index 0485182b5..e7f535feb 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -96,6 +96,11 @@ type Config struct { // buffer over TCP/UDP. Setting MaxInflightMsgs to avoid overflowing that sending buffer. // TODO (xiangli): feedback to application to limit the proposal rate? MaxInflightMsgs int + + // logger is the logger used for raft log. For multinode which + // can host multiple raft group, each raft group can have its + // own logger + Logger Logger } func (c *Config) validate() error { @@ -119,6 +124,10 @@ func (c *Config) validate() error { return errors.New("max inflight messages must be greater than 0") } + if c.Logger == nil { + c.Logger = raftLogger + } + return nil } @@ -152,13 +161,15 @@ type raft struct { rand *rand.Rand tick func() step stepFunc + + logger Logger } func newRaft(c *Config) *raft { if err := c.validate(); err != nil { panic(err.Error()) } - raftlog := newLog(c.Storage) + raftlog := newLog(c.Storage, c.Logger) hs, cs, err := c.Storage.InitialState() if err != nil { panic(err) // TODO(bdarnell) @@ -185,6 +196,7 @@ func newRaft(c *Config) *raft { prs: make(map[uint64]*Progress), electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, + logger: c.Logger, } r.rand = rand.New(rand.NewSource(int64(c.ID))) for _, p := range peers { @@ -203,7 +215,7 @@ func newRaft(c *Config) *raft { nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n)) } - raftLogger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", + r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]", r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm()) return r } @@ -258,10 +270,10 @@ func (r *raft) sendAppend(to uint64) { } m.Snapshot = snapshot sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term - raftLogger.Infof("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", + r.logger.Infof("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]", r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr) pr.becomeSnapshot(sindex) - raftLogger.Infof("%x paused sending replication messages to %x [%s]", r.id, to, pr) + r.logger.Infof("%x paused sending replication messages to %x [%s]", r.id, to, pr) } else { m.Type = pb.MsgApp m.Index = pr.Next - 1 @@ -278,7 +290,7 @@ func (r *raft) sendAppend(to uint64) { case ProgressStateProbe: pr.pause() default: - raftLogger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) + r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State) } } } @@ -391,7 +403,7 @@ func (r *raft) becomeFollower(term uint64, lead uint64) { r.tick = r.tickElection r.lead = lead r.state = StateFollower - raftLogger.Infof("%x became follower at term %d", r.id, r.Term) + r.logger.Infof("%x became follower at term %d", r.id, r.Term) } func (r *raft) becomeCandidate() { @@ -404,7 +416,7 @@ func (r *raft) becomeCandidate() { r.tick = r.tickElection r.Vote = r.id r.state = StateCandidate - raftLogger.Infof("%x became candidate at term %d", r.id, r.Term) + r.logger.Infof("%x became candidate at term %d", r.id, r.Term) } func (r *raft) becomeLeader() { @@ -419,7 +431,7 @@ func (r *raft) becomeLeader() { r.state = StateLeader ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit) if err != nil { - raftLogger.Panicf("unexpected error getting uncommitted entries (%v)", err) + r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err) } for _, e := range ents { @@ -432,7 +444,7 @@ func (r *raft) becomeLeader() { r.pendingConf = true } r.appendEntry(pb.Entry{Data: nil}) - raftLogger.Infof("%x became leader at term %d", r.id, r.Term) + r.logger.Infof("%x became leader at term %d", r.id, r.Term) } func (r *raft) campaign() { @@ -445,7 +457,7 @@ func (r *raft) campaign() { if i == r.id { continue } - raftLogger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d", + r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term) r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()}) } @@ -453,9 +465,9 @@ func (r *raft) campaign() { func (r *raft) poll(id uint64, v bool) (granted int) { if v { - raftLogger.Infof("%x received vote from %x at term %d", r.id, id, r.Term) + r.logger.Infof("%x received vote from %x at term %d", r.id, id, r.Term) } else { - raftLogger.Infof("%x received vote rejection from %x at term %d", r.id, id, r.Term) + r.logger.Infof("%x received vote rejection from %x at term %d", r.id, id, r.Term) } if _, ok := r.votes[id]; !ok { r.votes[id] = v @@ -470,7 +482,7 @@ func (r *raft) poll(id uint64, v bool) (granted int) { func (r *raft) Step(m pb.Message) error { if m.Type == pb.MsgHup { - raftLogger.Infof("%x is starting a new election at term %d", r.id, r.Term) + r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) r.campaign() r.Commit = r.raftLog.committed return nil @@ -484,12 +496,12 @@ func (r *raft) Step(m pb.Message) error { if m.Type == pb.MsgVote { lead = None } - raftLogger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]", + r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]", r.id, r.Term, m.Type, m.From, m.Term) r.becomeFollower(m.Term, lead) case m.Term < r.Term: // ignore - raftLogger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]", + r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]", r.id, r.Term, m.Type, m.From, m.Term) return nil } @@ -508,7 +520,7 @@ func stepLeader(r *raft, m pb.Message) { r.bcastHeartbeat() case pb.MsgProp: if len(m.Entries) == 0 { - raftLogger.Panicf("%x stepped empty MsgProp", r.id) + r.logger.Panicf("%x stepped empty MsgProp", r.id) } for i, e := range m.Entries { if e.Type == pb.EntryConfChange { @@ -522,10 +534,10 @@ func stepLeader(r *raft, m pb.Message) { r.bcastAppend() case pb.MsgAppResp: if m.Reject { - raftLogger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d", + r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d", r.id, m.RejectHint, m.From, m.Index) if pr.maybeDecrTo(m.Index, m.RejectHint) { - raftLogger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) + r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) if pr.State == ProgressStateReplicate { pr.becomeProbe() } @@ -538,7 +550,7 @@ func stepLeader(r *raft, m pb.Message) { case pr.State == ProgressStateProbe: pr.becomeReplicate() case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort(): - raftLogger.Infof("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + r.logger.Infof("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr) pr.becomeProbe() case pr.State == ProgressStateReplicate: pr.ins.freeTo(m.Index) @@ -562,7 +574,7 @@ func stepLeader(r *raft, m pb.Message) { r.sendAppend(m.From) } case pb.MsgVote: - raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d", + r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) case pb.MsgSnapStatus: @@ -571,11 +583,11 @@ func stepLeader(r *raft, m pb.Message) { } if !m.Reject { pr.becomeProbe() - raftLogger.Infof("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + r.logger.Infof("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr) } else { pr.snapshotFailure() pr.becomeProbe() - raftLogger.Infof("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr) + r.logger.Infof("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr) } // If snapshot finish, wait for the msgAppResp from the remote node before sending // out the next msgApp. @@ -587,14 +599,14 @@ func stepLeader(r *raft, m pb.Message) { if pr.State == ProgressStateReplicate { pr.becomeProbe() } - raftLogger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr) + r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr) } } func stepCandidate(r *raft, m pb.Message) { switch m.Type { case pb.MsgProp: - raftLogger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) + r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return case pb.MsgApp: r.becomeFollower(r.Term, m.From) @@ -606,12 +618,12 @@ func stepCandidate(r *raft, m pb.Message) { r.becomeFollower(m.Term, m.From) r.handleSnapshot(m) case pb.MsgVote: - raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x", + r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) case pb.MsgVoteResp: gr := r.poll(m.From, !m.Reject) - raftLogger.Infof("%x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr) + r.logger.Infof("%x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr) switch r.q() { case gr: r.becomeLeader() @@ -626,7 +638,7 @@ func stepFollower(r *raft, m pb.Message) { switch m.Type { case pb.MsgProp: if r.lead == None { - raftLogger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) + r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term) return } m.To = r.lead @@ -645,12 +657,12 @@ func stepFollower(r *raft, m pb.Message) { case pb.MsgVote: if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) { r.elapsed = 0 - raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d", + r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.Vote = m.From r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp}) } else { - raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d", + r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d", r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term) r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true}) } @@ -666,8 +678,8 @@ func (r *raft) handleAppendEntries(m pb.Message) { if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { - raftLogger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x", - r.id, zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) + r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x", + r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()}) } } @@ -680,11 +692,11 @@ func (r *raft) handleHeartbeat(m pb.Message) { func (r *raft) handleSnapshot(m pb.Message) { sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term if r.restore(m.Snapshot) { - raftLogger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", + r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]", r.id, r.Commit, sindex, sterm) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()}) } else { - raftLogger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]", + r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]", r.id, r.Commit, sindex, sterm) r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) } @@ -697,13 +709,13 @@ func (r *raft) restore(s pb.Snapshot) bool { return false } if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) { - raftLogger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", + r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]", r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.commitTo(s.Metadata.Index) return false } - raftLogger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", + r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]", r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term) r.raftLog.restore(s) @@ -716,7 +728,7 @@ func (r *raft) restore(s pb.Snapshot) bool { match = 0 } r.setProgress(n, match, next) - raftLogger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n]) + r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n]) } return true } @@ -756,7 +768,7 @@ func (r *raft) delProgress(id uint64) { func (r *raft) loadState(state pb.HardState) { if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() { - raftLogger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex()) + r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex()) } r.raftLog.committed = state.Commit r.Term = state.Term diff --git a/raft/raft_test.go b/raft/raft_test.go index 3115d2cfa..dffdea6b2 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -492,7 +492,7 @@ func TestDuelingCandidates(t *testing.T) { }{ {a, StateFollower, 2, wlog}, {b, StateFollower, 2, wlog}, - {c, StateFollower, 2, newLog(NewMemoryStorage())}, + {c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)}, } for i, tt := range tests { @@ -638,7 +638,7 @@ func TestProposal(t *testing.T) { send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) - wantLog := newLog(NewMemoryStorage()) + wantLog := newLog(NewMemoryStorage(), raftLogger) if tt.success { wantLog = &raftLog{ storage: &MemoryStorage{