Merge pull request #3254 from es-chow/log-group

set groupID in multinode as log context so it can be logged
This commit is contained in:
Xiang Li 2015-08-12 08:05:50 -07:00
commit 18ecc297bc
6 changed files with 100 additions and 76 deletions

View File

@ -36,16 +36,19 @@ type raftLog struct {
// been instructed to apply to its state machine.
// Invariant: applied <= committed
applied uint64
logger Logger
}
// newLog returns log using the given storage. It recovers the log to the state
// that it just commits and applies the latest snapshot.
func newLog(storage Storage) *raftLog {
func newLog(storage Storage, logger Logger) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
@ -56,6 +59,7 @@ func newLog(storage Storage) *raftLog {
panic(err) // TODO(bdarnell)
}
log.unstable.offset = lastIndex + 1
log.unstable.logger = logger
// Initialize our committed and applied pointers to the time of the last compaction.
log.committed = firstIndex - 1
log.applied = firstIndex - 1
@ -76,7 +80,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry
switch {
case ci == 0:
case ci <= l.committed:
raftLogger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
l.append(ents[ci-offset:]...)
@ -92,7 +96,7 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 {
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
raftLogger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
@ -113,8 +117,8 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
for _, ne := range ents {
if !l.matchTerm(ne.Index, ne.Term) {
if ne.Index <= l.lastIndex() {
raftLogger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
ne.Index, zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
}
return ne.Index
}
@ -137,7 +141,7 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) {
if l.committed+1 > off {
ents, err := l.slice(off, l.committed+1, noLimit)
if err != nil {
raftLogger.Panicf("unexpected error when getting unapplied entries (%v)", err)
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
return ents
}
@ -177,7 +181,7 @@ func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
if l.lastIndex() < tocommit {
raftLogger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]", tocommit, l.lastIndex())
l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]", tocommit, l.lastIndex())
}
l.committed = tocommit
}
@ -188,7 +192,7 @@ func (l *raftLog) appliedTo(i uint64) {
return
}
if l.committed < i || i < l.applied {
raftLogger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
}
l.applied = i
}
@ -200,7 +204,7 @@ func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
func (l *raftLog) lastTerm() uint64 {
t, err := l.term(l.lastIndex())
if err != nil {
raftLogger.Panicf("unexpected error when getting the last term (%v)", err)
l.logger.Panicf("unexpected error when getting the last term (%v)", err)
}
return t
}
@ -266,7 +270,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool {
}
func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
if maxIndex > l.committed && zeroTermOnErrCompacted(l.term(maxIndex)) == term {
if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
return true
}
@ -274,7 +278,7 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
}
func (l *raftLog) restore(s pb.Snapshot) {
raftLogger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
l.committed = s.Metadata.Index
l.unstable.restore(s)
}
@ -294,7 +298,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
if err == ErrCompacted {
return nil, err
} else if err == ErrUnavailable {
raftLogger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
} else if err != nil {
panic(err) // TODO(bdarnell)
}
@ -321,7 +325,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
if lo > hi {
raftLogger.Panicf("invalid slice %d > %d", lo, hi)
l.logger.Panicf("invalid slice %d > %d", lo, hi)
}
fi := l.firstIndex()
if lo < fi {
@ -330,18 +334,18 @@ func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
length := l.lastIndex() - fi + 1
if lo < fi || hi > fi+length {
raftLogger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
}
return nil
}
func zeroTermOnErrCompacted(t uint64, err error) uint64 {
func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 {
if err == nil {
return t
}
if err == ErrCompacted {
return 0
}
raftLogger.Panicf("unexpected error (%v)", err)
l.logger.Panicf("unexpected error (%v)", err)
return 0
}

View File

@ -46,7 +46,7 @@ func TestFindConflict(t *testing.T) {
}
for i, tt := range tests {
raftLog := newLog(NewMemoryStorage())
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
gconflict := raftLog.findConflict(tt.ents)
@ -58,7 +58,7 @@ func TestFindConflict(t *testing.T) {
func TestIsUpToDate(t *testing.T) {
previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}
raftLog := newLog(NewMemoryStorage())
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
tests := []struct {
lastIndex uint64
@ -126,7 +126,7 @@ func TestAppend(t *testing.T) {
for i, tt := range tests {
storage := NewMemoryStorage()
storage.Append(previousEnts)
raftLog := newLog(storage)
raftLog := newLog(storage, raftLogger)
index := raftLog.append(tt.ents...)
if index != tt.windex {
@ -237,7 +237,7 @@ func TestLogMaybeAppend(t *testing.T) {
}
for i, tt := range tests {
raftLog := newLog(NewMemoryStorage())
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
raftLog.committed = commit
func() {
@ -285,7 +285,7 @@ func TestCompactionSideEffects(t *testing.T) {
for i = 1; i <= unstableIndex; i++ {
storage.Append([]pb.Entry{{Term: uint64(i), Index: uint64(i)}})
}
raftLog := newLog(storage)
raftLog := newLog(storage, raftLogger)
for i = unstableIndex; i < lastIndex; i++ {
raftLog.append(pb.Entry{Term: uint64(i + 1), Index: uint64(i + 1)})
}
@ -359,7 +359,7 @@ func TestNextEnts(t *testing.T) {
for i, tt := range tests {
storage := NewMemoryStorage()
storage.ApplySnapshot(snap)
raftLog := newLog(storage)
raftLog := newLog(storage, raftLogger)
raftLog.append(ents...)
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied)
@ -389,7 +389,7 @@ func TestUnstableEnts(t *testing.T) {
storage.Append(previousEnts[:tt.unstable-1])
// append unstable entries to raftlog
raftLog := newLog(storage)
raftLog := newLog(storage, raftLogger)
raftLog.append(previousEnts[tt.unstable-1:]...)
ents := raftLog.unstableEntries()
@ -427,7 +427,7 @@ func TestCommitTo(t *testing.T) {
}
}
}()
raftLog := newLog(NewMemoryStorage())
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append(previousEnts...)
raftLog.committed = commit
raftLog.commitTo(tt.commit)
@ -450,7 +450,7 @@ func TestStableTo(t *testing.T) {
{3, 1, 1}, // bad index
}
for i, tt := range tests {
raftLog := newLog(NewMemoryStorage())
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
raftLog.stableTo(tt.stablei, tt.stablet)
if raftLog.unstable.offset != tt.wunstable {
@ -487,7 +487,7 @@ func TestStableToWithSnap(t *testing.T) {
for i, tt := range tests {
s := NewMemoryStorage()
s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}})
raftLog := newLog(s)
raftLog := newLog(s, raftLogger)
raftLog.append(tt.newEnts...)
raftLog.stableTo(tt.stablei, tt.stablet)
if raftLog.unstable.offset != tt.wunstable {
@ -525,7 +525,7 @@ func TestCompaction(t *testing.T) {
for i := uint64(1); i <= tt.lastIndex; i++ {
storage.Append([]pb.Entry{{Index: i}})
}
raftLog := newLog(storage)
raftLog := newLog(storage, raftLogger)
raftLog.maybeCommit(tt.lastIndex, 0)
raftLog.appliedTo(raftLog.committed)
@ -551,7 +551,7 @@ func TestLogRestore(t *testing.T) {
snap := pb.SnapshotMetadata{Index: index, Term: term}
storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: snap})
raftLog := newLog(storage)
raftLog := newLog(storage, raftLogger)
if len(raftLog.allEntries()) != 0 {
t.Errorf("len = %d, want 0", len(raftLog.allEntries()))
@ -563,7 +563,7 @@ func TestLogRestore(t *testing.T) {
t.Errorf("committed = %d, want %d", raftLog.committed, index)
}
if raftLog.unstable.offset != index+1 {
t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1)
t.Errorf("unstable = %d, want %d", raftLog.unstable.offset, index+1)
}
if mustTerm(raftLog.term(index)) != term {
t.Errorf("term = %d, want %d", mustTerm(raftLog.term(index)), term)
@ -575,7 +575,7 @@ func TestIsOutOfBounds(t *testing.T) {
num := uint64(100)
storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}})
l := newLog(storage)
l := newLog(storage, raftLogger)
for i := uint64(1); i <= num; i++ {
l.append(pb.Entry{Index: i + offset})
}
@ -658,7 +658,7 @@ func TestTerm(t *testing.T) {
storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}})
l := newLog(storage)
l := newLog(storage, raftLogger)
for i = 1; i < num; i++ {
l.append(pb.Entry{Index: offset + i, Term: i})
}
@ -688,7 +688,7 @@ func TestTermWithUnstableSnapshot(t *testing.T) {
storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: storagesnapi, Term: 1}})
l := newLog(storage)
l := newLog(storage, raftLogger)
l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}})
tests := []struct {
@ -725,7 +725,7 @@ func TestSlice(t *testing.T) {
for i = 1; i < num/2; i++ {
storage.Append([]pb.Entry{{Index: offset + i, Term: offset + i}})
}
l := newLog(storage)
l := newLog(storage, raftLogger)
for i = num / 2; i < num; i++ {
l.append(pb.Entry{Index: offset + i, Term: offset + i})
}

View File

@ -26,6 +26,8 @@ type unstable struct {
// all entries that have not yet been written to storage.
entries []pb.Entry
offset uint64
logger Logger
}
// maybeFirstIndex returns the index of the first possible entry in entries
@ -106,7 +108,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) {
// directly append
u.entries = append(u.entries, ents...)
case after < u.offset:
raftLogger.Infof("replace the unstable entries from index %d", after+1)
u.logger.Infof("replace the unstable entries from index %d", after+1)
// The log is being truncated to before our current offset
// portion, so set the offset and replace the entries
u.offset = after + 1
@ -114,7 +116,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) {
default:
// truncate to after and copy to u.entries
// then append
raftLogger.Infof("truncate the unstable entries to index %d", after)
u.logger.Infof("truncate the unstable entries to index %d", after)
u.entries = append([]pb.Entry{}, u.slice(u.offset, after+1)...)
u.entries = append(u.entries, ents...)
}
@ -128,10 +130,10 @@ func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
// u.offset <= lo <= hi <= u.offset+len(u.offset)
func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
if lo > hi {
raftLogger.Panicf("invalid unstable.slice %d > %d", lo, hi)
u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi)
}
upper := u.offset + uint64(len(u.entries))
if lo < u.offset || hi > upper {
raftLogger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
u.logger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
}
}

View File

@ -55,6 +55,7 @@ func TestUnstableMaybeFirstIndex(t *testing.T) {
entries: tt.entries,
offset: tt.offset,
snapshot: tt.snap,
logger: raftLogger,
}
index, ok := u.maybeFirstIndex()
if ok != tt.wok {
@ -101,6 +102,7 @@ func TestMaybeLastIndex(t *testing.T) {
entries: tt.entries,
offset: tt.offset,
snapshot: tt.snap,
logger: raftLogger,
}
index, ok := u.maybeLastIndex()
if ok != tt.wok {
@ -176,6 +178,7 @@ func TestUnstableMaybeTerm(t *testing.T) {
entries: tt.entries,
offset: tt.offset,
snapshot: tt.snap,
logger: raftLogger,
}
term, ok := u.maybeTerm(tt.index)
if ok != tt.wok {
@ -192,6 +195,7 @@ func TestUnstableRestore(t *testing.T) {
entries: []pb.Entry{{Index: 5, Term: 1}},
offset: 5,
snapshot: &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}},
logger: raftLogger,
}
s := pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 6, Term: 2}}
u.restore(s)
@ -280,6 +284,7 @@ func TestUnstableStableTo(t *testing.T) {
entries: tt.entries,
offset: tt.offset,
snapshot: tt.snap,
logger: raftLogger,
}
u.stableTo(tt.index, tt.term)
if u.offset != tt.woffset {
@ -336,6 +341,7 @@ func TestUnstableTruncateAndAppend(t *testing.T) {
entries: tt.entries,
offset: tt.offset,
snapshot: tt.snap,
logger: raftLogger,
}
u.truncateAndAppend(tt.toappend)
if u.offset != tt.woffset {

View File

@ -96,6 +96,11 @@ type Config struct {
// buffer over TCP/UDP. Setting MaxInflightMsgs to avoid overflowing that sending buffer.
// TODO (xiangli): feedback to application to limit the proposal rate?
MaxInflightMsgs int
// logger is the logger used for raft log. For multinode which
// can host multiple raft group, each raft group can have its
// own logger
Logger Logger
}
func (c *Config) validate() error {
@ -119,6 +124,10 @@ func (c *Config) validate() error {
return errors.New("max inflight messages must be greater than 0")
}
if c.Logger == nil {
c.Logger = raftLogger
}
return nil
}
@ -152,13 +161,15 @@ type raft struct {
rand *rand.Rand
tick func()
step stepFunc
logger Logger
}
func newRaft(c *Config) *raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
raftlog := newLog(c.Storage)
raftlog := newLog(c.Storage, c.Logger)
hs, cs, err := c.Storage.InitialState()
if err != nil {
panic(err) // TODO(bdarnell)
@ -185,6 +196,7 @@ func newRaft(c *Config) *raft {
prs: make(map[uint64]*Progress),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
}
r.rand = rand.New(rand.NewSource(int64(c.ID)))
for _, p := range peers {
@ -203,7 +215,7 @@ func newRaft(c *Config) *raft {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
raftLogger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
return r
}
@ -258,10 +270,10 @@ func (r *raft) sendAppend(to uint64) {
}
m.Snapshot = snapshot
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
raftLogger.Infof("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.logger.Infof("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
pr.becomeSnapshot(sindex)
raftLogger.Infof("%x paused sending replication messages to %x [%s]", r.id, to, pr)
r.logger.Infof("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1
@ -278,7 +290,7 @@ func (r *raft) sendAppend(to uint64) {
case ProgressStateProbe:
pr.pause()
default:
raftLogger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
}
}
}
@ -391,7 +403,7 @@ func (r *raft) becomeFollower(term uint64, lead uint64) {
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
raftLogger.Infof("%x became follower at term %d", r.id, r.Term)
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
}
func (r *raft) becomeCandidate() {
@ -404,7 +416,7 @@ func (r *raft) becomeCandidate() {
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
raftLogger.Infof("%x became candidate at term %d", r.id, r.Term)
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
}
func (r *raft) becomeLeader() {
@ -419,7 +431,7 @@ func (r *raft) becomeLeader() {
r.state = StateLeader
ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
if err != nil {
raftLogger.Panicf("unexpected error getting uncommitted entries (%v)", err)
r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
}
for _, e := range ents {
@ -432,7 +444,7 @@ func (r *raft) becomeLeader() {
r.pendingConf = true
}
r.appendEntry(pb.Entry{Data: nil})
raftLogger.Infof("%x became leader at term %d", r.id, r.Term)
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}
func (r *raft) campaign() {
@ -445,7 +457,7 @@ func (r *raft) campaign() {
if i == r.id {
continue
}
raftLogger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d",
r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term)
r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
}
@ -453,9 +465,9 @@ func (r *raft) campaign() {
func (r *raft) poll(id uint64, v bool) (granted int) {
if v {
raftLogger.Infof("%x received vote from %x at term %d", r.id, id, r.Term)
r.logger.Infof("%x received vote from %x at term %d", r.id, id, r.Term)
} else {
raftLogger.Infof("%x received vote rejection from %x at term %d", r.id, id, r.Term)
r.logger.Infof("%x received vote rejection from %x at term %d", r.id, id, r.Term)
}
if _, ok := r.votes[id]; !ok {
r.votes[id] = v
@ -470,7 +482,7 @@ func (r *raft) poll(id uint64, v bool) (granted int) {
func (r *raft) Step(m pb.Message) error {
if m.Type == pb.MsgHup {
raftLogger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign()
r.Commit = r.raftLog.committed
return nil
@ -484,12 +496,12 @@ func (r *raft) Step(m pb.Message) error {
if m.Type == pb.MsgVote {
lead = None
}
raftLogger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
r.becomeFollower(m.Term, lead)
case m.Term < r.Term:
// ignore
raftLogger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
r.id, r.Term, m.Type, m.From, m.Term)
return nil
}
@ -508,7 +520,7 @@ func stepLeader(r *raft, m pb.Message) {
r.bcastHeartbeat()
case pb.MsgProp:
if len(m.Entries) == 0 {
raftLogger.Panicf("%x stepped empty MsgProp", r.id)
r.logger.Panicf("%x stepped empty MsgProp", r.id)
}
for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
@ -522,10 +534,10 @@ func stepLeader(r *raft, m pb.Message) {
r.bcastAppend()
case pb.MsgAppResp:
if m.Reject {
raftLogger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
r.id, m.RejectHint, m.From, m.Index)
if pr.maybeDecrTo(m.Index, m.RejectHint) {
raftLogger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
@ -538,7 +550,7 @@ func stepLeader(r *raft, m pb.Message) {
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
raftLogger.Infof("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
r.logger.Infof("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
@ -562,7 +574,7 @@ func stepLeader(r *raft, m pb.Message) {
r.sendAppend(m.From)
}
case pb.MsgVote:
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
case pb.MsgSnapStatus:
@ -571,11 +583,11 @@ func stepLeader(r *raft, m pb.Message) {
}
if !m.Reject {
pr.becomeProbe()
raftLogger.Infof("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
r.logger.Infof("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
pr.snapshotFailure()
pr.becomeProbe()
raftLogger.Infof("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
r.logger.Infof("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
// If snapshot finish, wait for the msgAppResp from the remote node before sending
// out the next msgApp.
@ -587,14 +599,14 @@ func stepLeader(r *raft, m pb.Message) {
if pr.State == ProgressStateReplicate {
pr.becomeProbe()
}
raftLogger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
}
}
func stepCandidate(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
raftLogger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
case pb.MsgApp:
r.becomeFollower(r.Term, m.From)
@ -606,12 +618,12 @@ func stepCandidate(r *raft, m pb.Message) {
r.becomeFollower(m.Term, m.From)
r.handleSnapshot(m)
case pb.MsgVote:
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
case pb.MsgVoteResp:
gr := r.poll(m.From, !m.Reject)
raftLogger.Infof("%x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
r.logger.Infof("%x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
switch r.q() {
case gr:
r.becomeLeader()
@ -626,7 +638,7 @@ func stepFollower(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
if r.lead == None {
raftLogger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
return
}
m.To = r.lead
@ -645,12 +657,12 @@ func stepFollower(r *raft, m pb.Message) {
case pb.MsgVote:
if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.elapsed = 0
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d",
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.Vote = m.From
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
} else {
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
}
@ -666,8 +678,8 @@ func (r *raft) handleAppendEntries(m pb.Message) {
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
raftLogger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
}
}
@ -680,11 +692,11 @@ func (r *raft) handleHeartbeat(m pb.Message) {
func (r *raft) handleSnapshot(m pb.Message) {
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
if r.restore(m.Snapshot) {
raftLogger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
} else {
raftLogger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
r.id, r.Commit, sindex, sterm)
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
}
@ -697,13 +709,13 @@ func (r *raft) restore(s pb.Snapshot) bool {
return false
}
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
raftLogger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
r.raftLog.commitTo(s.Metadata.Index)
return false
}
raftLogger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
r.raftLog.restore(s)
@ -716,7 +728,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
match = 0
}
r.setProgress(n, match, next)
raftLogger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n])
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n])
}
return true
}
@ -756,7 +768,7 @@ func (r *raft) delProgress(id uint64) {
func (r *raft) loadState(state pb.HardState) {
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
raftLogger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
}
r.raftLog.committed = state.Commit
r.Term = state.Term

View File

@ -492,7 +492,7 @@ func TestDuelingCandidates(t *testing.T) {
}{
{a, StateFollower, 2, wlog},
{b, StateFollower, 2, wlog},
{c, StateFollower, 2, newLog(NewMemoryStorage())},
{c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)},
}
for i, tt := range tests {
@ -638,7 +638,7 @@ func TestProposal(t *testing.T) {
send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
wantLog := newLog(NewMemoryStorage())
wantLog := newLog(NewMemoryStorage(), raftLogger)
if tt.success {
wantLog = &raftLog{
storage: &MemoryStorage{