mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
raft: set logger to raft so log context such as multinode groupID can be logged
This commit is contained in:
parent
042afcf2a3
commit
cc362ccdad
36
raft/log.go
36
raft/log.go
@ -36,16 +36,19 @@ type raftLog struct {
|
|||||||
// been instructed to apply to its state machine.
|
// been instructed to apply to its state machine.
|
||||||
// Invariant: applied <= committed
|
// Invariant: applied <= committed
|
||||||
applied uint64
|
applied uint64
|
||||||
|
|
||||||
|
logger Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLog returns log using the given storage. It recovers the log to the state
|
// newLog returns log using the given storage. It recovers the log to the state
|
||||||
// that it just commits and applies the latest snapshot.
|
// that it just commits and applies the latest snapshot.
|
||||||
func newLog(storage Storage) *raftLog {
|
func newLog(storage Storage, logger Logger) *raftLog {
|
||||||
if storage == nil {
|
if storage == nil {
|
||||||
log.Panic("storage must not be nil")
|
log.Panic("storage must not be nil")
|
||||||
}
|
}
|
||||||
log := &raftLog{
|
log := &raftLog{
|
||||||
storage: storage,
|
storage: storage,
|
||||||
|
logger: logger,
|
||||||
}
|
}
|
||||||
firstIndex, err := storage.FirstIndex()
|
firstIndex, err := storage.FirstIndex()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -56,6 +59,7 @@ func newLog(storage Storage) *raftLog {
|
|||||||
panic(err) // TODO(bdarnell)
|
panic(err) // TODO(bdarnell)
|
||||||
}
|
}
|
||||||
log.unstable.offset = lastIndex + 1
|
log.unstable.offset = lastIndex + 1
|
||||||
|
log.unstable.logger = logger
|
||||||
// Initialize our committed and applied pointers to the time of the last compaction.
|
// Initialize our committed and applied pointers to the time of the last compaction.
|
||||||
log.committed = firstIndex - 1
|
log.committed = firstIndex - 1
|
||||||
log.applied = firstIndex - 1
|
log.applied = firstIndex - 1
|
||||||
@ -76,7 +80,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry
|
|||||||
switch {
|
switch {
|
||||||
case ci == 0:
|
case ci == 0:
|
||||||
case ci <= l.committed:
|
case ci <= l.committed:
|
||||||
raftLogger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
|
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
|
||||||
default:
|
default:
|
||||||
offset := index + 1
|
offset := index + 1
|
||||||
l.append(ents[ci-offset:]...)
|
l.append(ents[ci-offset:]...)
|
||||||
@ -92,7 +96,7 @@ func (l *raftLog) append(ents ...pb.Entry) uint64 {
|
|||||||
return l.lastIndex()
|
return l.lastIndex()
|
||||||
}
|
}
|
||||||
if after := ents[0].Index - 1; after < l.committed {
|
if after := ents[0].Index - 1; after < l.committed {
|
||||||
raftLogger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
|
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
|
||||||
}
|
}
|
||||||
l.unstable.truncateAndAppend(ents)
|
l.unstable.truncateAndAppend(ents)
|
||||||
return l.lastIndex()
|
return l.lastIndex()
|
||||||
@ -113,8 +117,8 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
|
|||||||
for _, ne := range ents {
|
for _, ne := range ents {
|
||||||
if !l.matchTerm(ne.Index, ne.Term) {
|
if !l.matchTerm(ne.Index, ne.Term) {
|
||||||
if ne.Index <= l.lastIndex() {
|
if ne.Index <= l.lastIndex() {
|
||||||
raftLogger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
|
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
|
||||||
ne.Index, zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
|
ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
|
||||||
}
|
}
|
||||||
return ne.Index
|
return ne.Index
|
||||||
}
|
}
|
||||||
@ -137,7 +141,7 @@ func (l *raftLog) nextEnts() (ents []pb.Entry) {
|
|||||||
if l.committed+1 > off {
|
if l.committed+1 > off {
|
||||||
ents, err := l.slice(off, l.committed+1, noLimit)
|
ents, err := l.slice(off, l.committed+1, noLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
raftLogger.Panicf("unexpected error when getting unapplied entries (%v)", err)
|
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
|
||||||
}
|
}
|
||||||
return ents
|
return ents
|
||||||
}
|
}
|
||||||
@ -177,7 +181,7 @@ func (l *raftLog) commitTo(tocommit uint64) {
|
|||||||
// never decrease commit
|
// never decrease commit
|
||||||
if l.committed < tocommit {
|
if l.committed < tocommit {
|
||||||
if l.lastIndex() < tocommit {
|
if l.lastIndex() < tocommit {
|
||||||
raftLogger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]", tocommit, l.lastIndex())
|
l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]", tocommit, l.lastIndex())
|
||||||
}
|
}
|
||||||
l.committed = tocommit
|
l.committed = tocommit
|
||||||
}
|
}
|
||||||
@ -188,7 +192,7 @@ func (l *raftLog) appliedTo(i uint64) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if l.committed < i || i < l.applied {
|
if l.committed < i || i < l.applied {
|
||||||
raftLogger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
|
l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
|
||||||
}
|
}
|
||||||
l.applied = i
|
l.applied = i
|
||||||
}
|
}
|
||||||
@ -200,7 +204,7 @@ func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
|
|||||||
func (l *raftLog) lastTerm() uint64 {
|
func (l *raftLog) lastTerm() uint64 {
|
||||||
t, err := l.term(l.lastIndex())
|
t, err := l.term(l.lastIndex())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
raftLogger.Panicf("unexpected error when getting the last term (%v)", err)
|
l.logger.Panicf("unexpected error when getting the last term (%v)", err)
|
||||||
}
|
}
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
@ -266,7 +270,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
|
func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
|
||||||
if maxIndex > l.committed && zeroTermOnErrCompacted(l.term(maxIndex)) == term {
|
if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
|
||||||
l.commitTo(maxIndex)
|
l.commitTo(maxIndex)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@ -274,7 +278,7 @@ func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *raftLog) restore(s pb.Snapshot) {
|
func (l *raftLog) restore(s pb.Snapshot) {
|
||||||
raftLogger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
|
l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
|
||||||
l.committed = s.Metadata.Index
|
l.committed = s.Metadata.Index
|
||||||
l.unstable.restore(s)
|
l.unstable.restore(s)
|
||||||
}
|
}
|
||||||
@ -294,7 +298,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
|
|||||||
if err == ErrCompacted {
|
if err == ErrCompacted {
|
||||||
return nil, err
|
return nil, err
|
||||||
} else if err == ErrUnavailable {
|
} else if err == ErrUnavailable {
|
||||||
raftLogger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
|
l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
panic(err) // TODO(bdarnell)
|
panic(err) // TODO(bdarnell)
|
||||||
}
|
}
|
||||||
@ -321,7 +325,7 @@ func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
|
|||||||
// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
|
// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
|
||||||
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
|
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
|
||||||
if lo > hi {
|
if lo > hi {
|
||||||
raftLogger.Panicf("invalid slice %d > %d", lo, hi)
|
l.logger.Panicf("invalid slice %d > %d", lo, hi)
|
||||||
}
|
}
|
||||||
fi := l.firstIndex()
|
fi := l.firstIndex()
|
||||||
if lo < fi {
|
if lo < fi {
|
||||||
@ -330,18 +334,18 @@ func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
|
|||||||
|
|
||||||
length := l.lastIndex() - fi + 1
|
length := l.lastIndex() - fi + 1
|
||||||
if lo < fi || hi > fi+length {
|
if lo < fi || hi > fi+length {
|
||||||
raftLogger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
|
l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func zeroTermOnErrCompacted(t uint64, err error) uint64 {
|
func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
if err == ErrCompacted {
|
if err == ErrCompacted {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
raftLogger.Panicf("unexpected error (%v)", err)
|
l.logger.Panicf("unexpected error (%v)", err)
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,7 @@ func TestFindConflict(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
raftLog := newLog(NewMemoryStorage())
|
raftLog := newLog(NewMemoryStorage(), raftLogger)
|
||||||
raftLog.append(previousEnts...)
|
raftLog.append(previousEnts...)
|
||||||
|
|
||||||
gconflict := raftLog.findConflict(tt.ents)
|
gconflict := raftLog.findConflict(tt.ents)
|
||||||
@ -58,7 +58,7 @@ func TestFindConflict(t *testing.T) {
|
|||||||
|
|
||||||
func TestIsUpToDate(t *testing.T) {
|
func TestIsUpToDate(t *testing.T) {
|
||||||
previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}
|
previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}
|
||||||
raftLog := newLog(NewMemoryStorage())
|
raftLog := newLog(NewMemoryStorage(), raftLogger)
|
||||||
raftLog.append(previousEnts...)
|
raftLog.append(previousEnts...)
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
lastIndex uint64
|
lastIndex uint64
|
||||||
@ -126,7 +126,7 @@ func TestAppend(t *testing.T) {
|
|||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.Append(previousEnts)
|
storage.Append(previousEnts)
|
||||||
raftLog := newLog(storage)
|
raftLog := newLog(storage, raftLogger)
|
||||||
|
|
||||||
index := raftLog.append(tt.ents...)
|
index := raftLog.append(tt.ents...)
|
||||||
if index != tt.windex {
|
if index != tt.windex {
|
||||||
@ -237,7 +237,7 @@ func TestLogMaybeAppend(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
raftLog := newLog(NewMemoryStorage())
|
raftLog := newLog(NewMemoryStorage(), raftLogger)
|
||||||
raftLog.append(previousEnts...)
|
raftLog.append(previousEnts...)
|
||||||
raftLog.committed = commit
|
raftLog.committed = commit
|
||||||
func() {
|
func() {
|
||||||
@ -285,7 +285,7 @@ func TestCompactionSideEffects(t *testing.T) {
|
|||||||
for i = 1; i <= unstableIndex; i++ {
|
for i = 1; i <= unstableIndex; i++ {
|
||||||
storage.Append([]pb.Entry{{Term: uint64(i), Index: uint64(i)}})
|
storage.Append([]pb.Entry{{Term: uint64(i), Index: uint64(i)}})
|
||||||
}
|
}
|
||||||
raftLog := newLog(storage)
|
raftLog := newLog(storage, raftLogger)
|
||||||
for i = unstableIndex; i < lastIndex; i++ {
|
for i = unstableIndex; i < lastIndex; i++ {
|
||||||
raftLog.append(pb.Entry{Term: uint64(i + 1), Index: uint64(i + 1)})
|
raftLog.append(pb.Entry{Term: uint64(i + 1), Index: uint64(i + 1)})
|
||||||
}
|
}
|
||||||
@ -359,7 +359,7 @@ func TestNextEnts(t *testing.T) {
|
|||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.ApplySnapshot(snap)
|
storage.ApplySnapshot(snap)
|
||||||
raftLog := newLog(storage)
|
raftLog := newLog(storage, raftLogger)
|
||||||
raftLog.append(ents...)
|
raftLog.append(ents...)
|
||||||
raftLog.maybeCommit(5, 1)
|
raftLog.maybeCommit(5, 1)
|
||||||
raftLog.appliedTo(tt.applied)
|
raftLog.appliedTo(tt.applied)
|
||||||
@ -389,7 +389,7 @@ func TestUnstableEnts(t *testing.T) {
|
|||||||
storage.Append(previousEnts[:tt.unstable-1])
|
storage.Append(previousEnts[:tt.unstable-1])
|
||||||
|
|
||||||
// append unstable entries to raftlog
|
// append unstable entries to raftlog
|
||||||
raftLog := newLog(storage)
|
raftLog := newLog(storage, raftLogger)
|
||||||
raftLog.append(previousEnts[tt.unstable-1:]...)
|
raftLog.append(previousEnts[tt.unstable-1:]...)
|
||||||
|
|
||||||
ents := raftLog.unstableEntries()
|
ents := raftLog.unstableEntries()
|
||||||
@ -427,7 +427,7 @@ func TestCommitTo(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
raftLog := newLog(NewMemoryStorage())
|
raftLog := newLog(NewMemoryStorage(), raftLogger)
|
||||||
raftLog.append(previousEnts...)
|
raftLog.append(previousEnts...)
|
||||||
raftLog.committed = commit
|
raftLog.committed = commit
|
||||||
raftLog.commitTo(tt.commit)
|
raftLog.commitTo(tt.commit)
|
||||||
@ -450,7 +450,7 @@ func TestStableTo(t *testing.T) {
|
|||||||
{3, 1, 1}, // bad index
|
{3, 1, 1}, // bad index
|
||||||
}
|
}
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
raftLog := newLog(NewMemoryStorage())
|
raftLog := newLog(NewMemoryStorage(), raftLogger)
|
||||||
raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
|
raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
|
||||||
raftLog.stableTo(tt.stablei, tt.stablet)
|
raftLog.stableTo(tt.stablei, tt.stablet)
|
||||||
if raftLog.unstable.offset != tt.wunstable {
|
if raftLog.unstable.offset != tt.wunstable {
|
||||||
@ -487,7 +487,7 @@ func TestStableToWithSnap(t *testing.T) {
|
|||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
s := NewMemoryStorage()
|
s := NewMemoryStorage()
|
||||||
s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}})
|
s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}})
|
||||||
raftLog := newLog(s)
|
raftLog := newLog(s, raftLogger)
|
||||||
raftLog.append(tt.newEnts...)
|
raftLog.append(tt.newEnts...)
|
||||||
raftLog.stableTo(tt.stablei, tt.stablet)
|
raftLog.stableTo(tt.stablei, tt.stablet)
|
||||||
if raftLog.unstable.offset != tt.wunstable {
|
if raftLog.unstable.offset != tt.wunstable {
|
||||||
@ -525,7 +525,7 @@ func TestCompaction(t *testing.T) {
|
|||||||
for i := uint64(1); i <= tt.lastIndex; i++ {
|
for i := uint64(1); i <= tt.lastIndex; i++ {
|
||||||
storage.Append([]pb.Entry{{Index: i}})
|
storage.Append([]pb.Entry{{Index: i}})
|
||||||
}
|
}
|
||||||
raftLog := newLog(storage)
|
raftLog := newLog(storage, raftLogger)
|
||||||
raftLog.maybeCommit(tt.lastIndex, 0)
|
raftLog.maybeCommit(tt.lastIndex, 0)
|
||||||
raftLog.appliedTo(raftLog.committed)
|
raftLog.appliedTo(raftLog.committed)
|
||||||
|
|
||||||
@ -551,7 +551,7 @@ func TestLogRestore(t *testing.T) {
|
|||||||
snap := pb.SnapshotMetadata{Index: index, Term: term}
|
snap := pb.SnapshotMetadata{Index: index, Term: term}
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.ApplySnapshot(pb.Snapshot{Metadata: snap})
|
storage.ApplySnapshot(pb.Snapshot{Metadata: snap})
|
||||||
raftLog := newLog(storage)
|
raftLog := newLog(storage, raftLogger)
|
||||||
|
|
||||||
if len(raftLog.allEntries()) != 0 {
|
if len(raftLog.allEntries()) != 0 {
|
||||||
t.Errorf("len = %d, want 0", len(raftLog.allEntries()))
|
t.Errorf("len = %d, want 0", len(raftLog.allEntries()))
|
||||||
@ -563,7 +563,7 @@ func TestLogRestore(t *testing.T) {
|
|||||||
t.Errorf("committed = %d, want %d", raftLog.committed, index)
|
t.Errorf("committed = %d, want %d", raftLog.committed, index)
|
||||||
}
|
}
|
||||||
if raftLog.unstable.offset != index+1 {
|
if raftLog.unstable.offset != index+1 {
|
||||||
t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1)
|
t.Errorf("unstable = %d, want %d", raftLog.unstable.offset, index+1)
|
||||||
}
|
}
|
||||||
if mustTerm(raftLog.term(index)) != term {
|
if mustTerm(raftLog.term(index)) != term {
|
||||||
t.Errorf("term = %d, want %d", mustTerm(raftLog.term(index)), term)
|
t.Errorf("term = %d, want %d", mustTerm(raftLog.term(index)), term)
|
||||||
@ -575,7 +575,7 @@ func TestIsOutOfBounds(t *testing.T) {
|
|||||||
num := uint64(100)
|
num := uint64(100)
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}})
|
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}})
|
||||||
l := newLog(storage)
|
l := newLog(storage, raftLogger)
|
||||||
for i := uint64(1); i <= num; i++ {
|
for i := uint64(1); i <= num; i++ {
|
||||||
l.append(pb.Entry{Index: i + offset})
|
l.append(pb.Entry{Index: i + offset})
|
||||||
}
|
}
|
||||||
@ -658,7 +658,7 @@ func TestTerm(t *testing.T) {
|
|||||||
|
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}})
|
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}})
|
||||||
l := newLog(storage)
|
l := newLog(storage, raftLogger)
|
||||||
for i = 1; i < num; i++ {
|
for i = 1; i < num; i++ {
|
||||||
l.append(pb.Entry{Index: offset + i, Term: i})
|
l.append(pb.Entry{Index: offset + i, Term: i})
|
||||||
}
|
}
|
||||||
@ -688,7 +688,7 @@ func TestTermWithUnstableSnapshot(t *testing.T) {
|
|||||||
|
|
||||||
storage := NewMemoryStorage()
|
storage := NewMemoryStorage()
|
||||||
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: storagesnapi, Term: 1}})
|
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: storagesnapi, Term: 1}})
|
||||||
l := newLog(storage)
|
l := newLog(storage, raftLogger)
|
||||||
l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}})
|
l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}})
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
@ -725,7 +725,7 @@ func TestSlice(t *testing.T) {
|
|||||||
for i = 1; i < num/2; i++ {
|
for i = 1; i < num/2; i++ {
|
||||||
storage.Append([]pb.Entry{{Index: offset + i, Term: offset + i}})
|
storage.Append([]pb.Entry{{Index: offset + i, Term: offset + i}})
|
||||||
}
|
}
|
||||||
l := newLog(storage)
|
l := newLog(storage, raftLogger)
|
||||||
for i = num / 2; i < num; i++ {
|
for i = num / 2; i < num; i++ {
|
||||||
l.append(pb.Entry{Index: offset + i, Term: offset + i})
|
l.append(pb.Entry{Index: offset + i, Term: offset + i})
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,8 @@ type unstable struct {
|
|||||||
// all entries that have not yet been written to storage.
|
// all entries that have not yet been written to storage.
|
||||||
entries []pb.Entry
|
entries []pb.Entry
|
||||||
offset uint64
|
offset uint64
|
||||||
|
|
||||||
|
logger Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybeFirstIndex returns the index of the first possible entry in entries
|
// maybeFirstIndex returns the index of the first possible entry in entries
|
||||||
@ -106,7 +108,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) {
|
|||||||
// directly append
|
// directly append
|
||||||
u.entries = append(u.entries, ents...)
|
u.entries = append(u.entries, ents...)
|
||||||
case after < u.offset:
|
case after < u.offset:
|
||||||
raftLogger.Infof("replace the unstable entries from index %d", after+1)
|
u.logger.Infof("replace the unstable entries from index %d", after+1)
|
||||||
// The log is being truncated to before our current offset
|
// The log is being truncated to before our current offset
|
||||||
// portion, so set the offset and replace the entries
|
// portion, so set the offset and replace the entries
|
||||||
u.offset = after + 1
|
u.offset = after + 1
|
||||||
@ -114,7 +116,7 @@ func (u *unstable) truncateAndAppend(ents []pb.Entry) {
|
|||||||
default:
|
default:
|
||||||
// truncate to after and copy to u.entries
|
// truncate to after and copy to u.entries
|
||||||
// then append
|
// then append
|
||||||
raftLogger.Infof("truncate the unstable entries to index %d", after)
|
u.logger.Infof("truncate the unstable entries to index %d", after)
|
||||||
u.entries = append([]pb.Entry{}, u.slice(u.offset, after+1)...)
|
u.entries = append([]pb.Entry{}, u.slice(u.offset, after+1)...)
|
||||||
u.entries = append(u.entries, ents...)
|
u.entries = append(u.entries, ents...)
|
||||||
}
|
}
|
||||||
@ -128,10 +130,10 @@ func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
|
|||||||
// u.offset <= lo <= hi <= u.offset+len(u.offset)
|
// u.offset <= lo <= hi <= u.offset+len(u.offset)
|
||||||
func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
|
func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
|
||||||
if lo > hi {
|
if lo > hi {
|
||||||
raftLogger.Panicf("invalid unstable.slice %d > %d", lo, hi)
|
u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi)
|
||||||
}
|
}
|
||||||
upper := u.offset + uint64(len(u.entries))
|
upper := u.offset + uint64(len(u.entries))
|
||||||
if lo < u.offset || hi > upper {
|
if lo < u.offset || hi > upper {
|
||||||
raftLogger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
|
u.logger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -55,6 +55,7 @@ func TestUnstableMaybeFirstIndex(t *testing.T) {
|
|||||||
entries: tt.entries,
|
entries: tt.entries,
|
||||||
offset: tt.offset,
|
offset: tt.offset,
|
||||||
snapshot: tt.snap,
|
snapshot: tt.snap,
|
||||||
|
logger: raftLogger,
|
||||||
}
|
}
|
||||||
index, ok := u.maybeFirstIndex()
|
index, ok := u.maybeFirstIndex()
|
||||||
if ok != tt.wok {
|
if ok != tt.wok {
|
||||||
@ -101,6 +102,7 @@ func TestMaybeLastIndex(t *testing.T) {
|
|||||||
entries: tt.entries,
|
entries: tt.entries,
|
||||||
offset: tt.offset,
|
offset: tt.offset,
|
||||||
snapshot: tt.snap,
|
snapshot: tt.snap,
|
||||||
|
logger: raftLogger,
|
||||||
}
|
}
|
||||||
index, ok := u.maybeLastIndex()
|
index, ok := u.maybeLastIndex()
|
||||||
if ok != tt.wok {
|
if ok != tt.wok {
|
||||||
@ -176,6 +178,7 @@ func TestUnstableMaybeTerm(t *testing.T) {
|
|||||||
entries: tt.entries,
|
entries: tt.entries,
|
||||||
offset: tt.offset,
|
offset: tt.offset,
|
||||||
snapshot: tt.snap,
|
snapshot: tt.snap,
|
||||||
|
logger: raftLogger,
|
||||||
}
|
}
|
||||||
term, ok := u.maybeTerm(tt.index)
|
term, ok := u.maybeTerm(tt.index)
|
||||||
if ok != tt.wok {
|
if ok != tt.wok {
|
||||||
@ -192,6 +195,7 @@ func TestUnstableRestore(t *testing.T) {
|
|||||||
entries: []pb.Entry{{Index: 5, Term: 1}},
|
entries: []pb.Entry{{Index: 5, Term: 1}},
|
||||||
offset: 5,
|
offset: 5,
|
||||||
snapshot: &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}},
|
snapshot: &pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 4, Term: 1}},
|
||||||
|
logger: raftLogger,
|
||||||
}
|
}
|
||||||
s := pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 6, Term: 2}}
|
s := pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: 6, Term: 2}}
|
||||||
u.restore(s)
|
u.restore(s)
|
||||||
@ -280,6 +284,7 @@ func TestUnstableStableTo(t *testing.T) {
|
|||||||
entries: tt.entries,
|
entries: tt.entries,
|
||||||
offset: tt.offset,
|
offset: tt.offset,
|
||||||
snapshot: tt.snap,
|
snapshot: tt.snap,
|
||||||
|
logger: raftLogger,
|
||||||
}
|
}
|
||||||
u.stableTo(tt.index, tt.term)
|
u.stableTo(tt.index, tt.term)
|
||||||
if u.offset != tt.woffset {
|
if u.offset != tt.woffset {
|
||||||
@ -336,6 +341,7 @@ func TestUnstableTruncateAndAppend(t *testing.T) {
|
|||||||
entries: tt.entries,
|
entries: tt.entries,
|
||||||
offset: tt.offset,
|
offset: tt.offset,
|
||||||
snapshot: tt.snap,
|
snapshot: tt.snap,
|
||||||
|
logger: raftLogger,
|
||||||
}
|
}
|
||||||
u.truncateAndAppend(tt.toappend)
|
u.truncateAndAppend(tt.toappend)
|
||||||
if u.offset != tt.woffset {
|
if u.offset != tt.woffset {
|
||||||
|
86
raft/raft.go
86
raft/raft.go
@ -96,6 +96,11 @@ type Config struct {
|
|||||||
// buffer over TCP/UDP. Setting MaxInflightMsgs to avoid overflowing that sending buffer.
|
// buffer over TCP/UDP. Setting MaxInflightMsgs to avoid overflowing that sending buffer.
|
||||||
// TODO (xiangli): feedback to application to limit the proposal rate?
|
// TODO (xiangli): feedback to application to limit the proposal rate?
|
||||||
MaxInflightMsgs int
|
MaxInflightMsgs int
|
||||||
|
|
||||||
|
// logger is the logger used for raft log. For multinode which
|
||||||
|
// can host multiple raft group, each raft group can have its
|
||||||
|
// own logger
|
||||||
|
Logger Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) validate() error {
|
func (c *Config) validate() error {
|
||||||
@ -119,6 +124,10 @@ func (c *Config) validate() error {
|
|||||||
return errors.New("max inflight messages must be greater than 0")
|
return errors.New("max inflight messages must be greater than 0")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.Logger == nil {
|
||||||
|
c.Logger = raftLogger
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -152,13 +161,15 @@ type raft struct {
|
|||||||
rand *rand.Rand
|
rand *rand.Rand
|
||||||
tick func()
|
tick func()
|
||||||
step stepFunc
|
step stepFunc
|
||||||
|
|
||||||
|
logger Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRaft(c *Config) *raft {
|
func newRaft(c *Config) *raft {
|
||||||
if err := c.validate(); err != nil {
|
if err := c.validate(); err != nil {
|
||||||
panic(err.Error())
|
panic(err.Error())
|
||||||
}
|
}
|
||||||
raftlog := newLog(c.Storage)
|
raftlog := newLog(c.Storage, c.Logger)
|
||||||
hs, cs, err := c.Storage.InitialState()
|
hs, cs, err := c.Storage.InitialState()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err) // TODO(bdarnell)
|
panic(err) // TODO(bdarnell)
|
||||||
@ -185,6 +196,7 @@ func newRaft(c *Config) *raft {
|
|||||||
prs: make(map[uint64]*Progress),
|
prs: make(map[uint64]*Progress),
|
||||||
electionTimeout: c.ElectionTick,
|
electionTimeout: c.ElectionTick,
|
||||||
heartbeatTimeout: c.HeartbeatTick,
|
heartbeatTimeout: c.HeartbeatTick,
|
||||||
|
logger: c.Logger,
|
||||||
}
|
}
|
||||||
r.rand = rand.New(rand.NewSource(int64(c.ID)))
|
r.rand = rand.New(rand.NewSource(int64(c.ID)))
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
@ -203,7 +215,7 @@ func newRaft(c *Config) *raft {
|
|||||||
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
|
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
|
||||||
}
|
}
|
||||||
|
|
||||||
raftLogger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
|
r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
|
||||||
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
|
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
@ -258,10 +270,10 @@ func (r *raft) sendAppend(to uint64) {
|
|||||||
}
|
}
|
||||||
m.Snapshot = snapshot
|
m.Snapshot = snapshot
|
||||||
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
|
||||||
raftLogger.Infof("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
r.logger.Infof("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
|
||||||
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
|
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
|
||||||
pr.becomeSnapshot(sindex)
|
pr.becomeSnapshot(sindex)
|
||||||
raftLogger.Infof("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
r.logger.Infof("%x paused sending replication messages to %x [%s]", r.id, to, pr)
|
||||||
} else {
|
} else {
|
||||||
m.Type = pb.MsgApp
|
m.Type = pb.MsgApp
|
||||||
m.Index = pr.Next - 1
|
m.Index = pr.Next - 1
|
||||||
@ -278,7 +290,7 @@ func (r *raft) sendAppend(to uint64) {
|
|||||||
case ProgressStateProbe:
|
case ProgressStateProbe:
|
||||||
pr.pause()
|
pr.pause()
|
||||||
default:
|
default:
|
||||||
raftLogger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
|
r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -391,7 +403,7 @@ func (r *raft) becomeFollower(term uint64, lead uint64) {
|
|||||||
r.tick = r.tickElection
|
r.tick = r.tickElection
|
||||||
r.lead = lead
|
r.lead = lead
|
||||||
r.state = StateFollower
|
r.state = StateFollower
|
||||||
raftLogger.Infof("%x became follower at term %d", r.id, r.Term)
|
r.logger.Infof("%x became follower at term %d", r.id, r.Term)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) becomeCandidate() {
|
func (r *raft) becomeCandidate() {
|
||||||
@ -404,7 +416,7 @@ func (r *raft) becomeCandidate() {
|
|||||||
r.tick = r.tickElection
|
r.tick = r.tickElection
|
||||||
r.Vote = r.id
|
r.Vote = r.id
|
||||||
r.state = StateCandidate
|
r.state = StateCandidate
|
||||||
raftLogger.Infof("%x became candidate at term %d", r.id, r.Term)
|
r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) becomeLeader() {
|
func (r *raft) becomeLeader() {
|
||||||
@ -419,7 +431,7 @@ func (r *raft) becomeLeader() {
|
|||||||
r.state = StateLeader
|
r.state = StateLeader
|
||||||
ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
|
ents, err := r.raftLog.entries(r.raftLog.committed+1, noLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
raftLogger.Panicf("unexpected error getting uncommitted entries (%v)", err)
|
r.logger.Panicf("unexpected error getting uncommitted entries (%v)", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, e := range ents {
|
for _, e := range ents {
|
||||||
@ -432,7 +444,7 @@ func (r *raft) becomeLeader() {
|
|||||||
r.pendingConf = true
|
r.pendingConf = true
|
||||||
}
|
}
|
||||||
r.appendEntry(pb.Entry{Data: nil})
|
r.appendEntry(pb.Entry{Data: nil})
|
||||||
raftLogger.Infof("%x became leader at term %d", r.id, r.Term)
|
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *raft) campaign() {
|
func (r *raft) campaign() {
|
||||||
@ -445,7 +457,7 @@ func (r *raft) campaign() {
|
|||||||
if i == r.id {
|
if i == r.id {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
raftLogger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d",
|
r.logger.Infof("%x [logterm: %d, index: %d] sent vote request to %x at term %d",
|
||||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term)
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), i, r.Term)
|
||||||
r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
|
r.send(pb.Message{To: i, Type: pb.MsgVote, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm()})
|
||||||
}
|
}
|
||||||
@ -453,9 +465,9 @@ func (r *raft) campaign() {
|
|||||||
|
|
||||||
func (r *raft) poll(id uint64, v bool) (granted int) {
|
func (r *raft) poll(id uint64, v bool) (granted int) {
|
||||||
if v {
|
if v {
|
||||||
raftLogger.Infof("%x received vote from %x at term %d", r.id, id, r.Term)
|
r.logger.Infof("%x received vote from %x at term %d", r.id, id, r.Term)
|
||||||
} else {
|
} else {
|
||||||
raftLogger.Infof("%x received vote rejection from %x at term %d", r.id, id, r.Term)
|
r.logger.Infof("%x received vote rejection from %x at term %d", r.id, id, r.Term)
|
||||||
}
|
}
|
||||||
if _, ok := r.votes[id]; !ok {
|
if _, ok := r.votes[id]; !ok {
|
||||||
r.votes[id] = v
|
r.votes[id] = v
|
||||||
@ -470,7 +482,7 @@ func (r *raft) poll(id uint64, v bool) (granted int) {
|
|||||||
|
|
||||||
func (r *raft) Step(m pb.Message) error {
|
func (r *raft) Step(m pb.Message) error {
|
||||||
if m.Type == pb.MsgHup {
|
if m.Type == pb.MsgHup {
|
||||||
raftLogger.Infof("%x is starting a new election at term %d", r.id, r.Term)
|
r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
|
||||||
r.campaign()
|
r.campaign()
|
||||||
r.Commit = r.raftLog.committed
|
r.Commit = r.raftLog.committed
|
||||||
return nil
|
return nil
|
||||||
@ -484,12 +496,12 @@ func (r *raft) Step(m pb.Message) error {
|
|||||||
if m.Type == pb.MsgVote {
|
if m.Type == pb.MsgVote {
|
||||||
lead = None
|
lead = None
|
||||||
}
|
}
|
||||||
raftLogger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
|
r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
|
||||||
r.id, r.Term, m.Type, m.From, m.Term)
|
r.id, r.Term, m.Type, m.From, m.Term)
|
||||||
r.becomeFollower(m.Term, lead)
|
r.becomeFollower(m.Term, lead)
|
||||||
case m.Term < r.Term:
|
case m.Term < r.Term:
|
||||||
// ignore
|
// ignore
|
||||||
raftLogger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
|
r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
|
||||||
r.id, r.Term, m.Type, m.From, m.Term)
|
r.id, r.Term, m.Type, m.From, m.Term)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -508,7 +520,7 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
r.bcastHeartbeat()
|
r.bcastHeartbeat()
|
||||||
case pb.MsgProp:
|
case pb.MsgProp:
|
||||||
if len(m.Entries) == 0 {
|
if len(m.Entries) == 0 {
|
||||||
raftLogger.Panicf("%x stepped empty MsgProp", r.id)
|
r.logger.Panicf("%x stepped empty MsgProp", r.id)
|
||||||
}
|
}
|
||||||
for i, e := range m.Entries {
|
for i, e := range m.Entries {
|
||||||
if e.Type == pb.EntryConfChange {
|
if e.Type == pb.EntryConfChange {
|
||||||
@ -522,10 +534,10 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
r.bcastAppend()
|
r.bcastAppend()
|
||||||
case pb.MsgAppResp:
|
case pb.MsgAppResp:
|
||||||
if m.Reject {
|
if m.Reject {
|
||||||
raftLogger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
|
r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
|
||||||
r.id, m.RejectHint, m.From, m.Index)
|
r.id, m.RejectHint, m.From, m.Index)
|
||||||
if pr.maybeDecrTo(m.Index, m.RejectHint) {
|
if pr.maybeDecrTo(m.Index, m.RejectHint) {
|
||||||
raftLogger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
|
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
|
||||||
if pr.State == ProgressStateReplicate {
|
if pr.State == ProgressStateReplicate {
|
||||||
pr.becomeProbe()
|
pr.becomeProbe()
|
||||||
}
|
}
|
||||||
@ -538,7 +550,7 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
case pr.State == ProgressStateProbe:
|
case pr.State == ProgressStateProbe:
|
||||||
pr.becomeReplicate()
|
pr.becomeReplicate()
|
||||||
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
|
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
|
||||||
raftLogger.Infof("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
r.logger.Infof("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||||
pr.becomeProbe()
|
pr.becomeProbe()
|
||||||
case pr.State == ProgressStateReplicate:
|
case pr.State == ProgressStateReplicate:
|
||||||
pr.ins.freeTo(m.Index)
|
pr.ins.freeTo(m.Index)
|
||||||
@ -562,7 +574,7 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
r.sendAppend(m.From)
|
r.sendAppend(m.From)
|
||||||
}
|
}
|
||||||
case pb.MsgVote:
|
case pb.MsgVote:
|
||||||
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
||||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
||||||
case pb.MsgSnapStatus:
|
case pb.MsgSnapStatus:
|
||||||
@ -571,11 +583,11 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
}
|
}
|
||||||
if !m.Reject {
|
if !m.Reject {
|
||||||
pr.becomeProbe()
|
pr.becomeProbe()
|
||||||
raftLogger.Infof("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
r.logger.Infof("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||||
} else {
|
} else {
|
||||||
pr.snapshotFailure()
|
pr.snapshotFailure()
|
||||||
pr.becomeProbe()
|
pr.becomeProbe()
|
||||||
raftLogger.Infof("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
r.logger.Infof("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
|
||||||
}
|
}
|
||||||
// If snapshot finish, wait for the msgAppResp from the remote node before sending
|
// If snapshot finish, wait for the msgAppResp from the remote node before sending
|
||||||
// out the next msgApp.
|
// out the next msgApp.
|
||||||
@ -587,14 +599,14 @@ func stepLeader(r *raft, m pb.Message) {
|
|||||||
if pr.State == ProgressStateReplicate {
|
if pr.State == ProgressStateReplicate {
|
||||||
pr.becomeProbe()
|
pr.becomeProbe()
|
||||||
}
|
}
|
||||||
raftLogger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
|
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func stepCandidate(r *raft, m pb.Message) {
|
func stepCandidate(r *raft, m pb.Message) {
|
||||||
switch m.Type {
|
switch m.Type {
|
||||||
case pb.MsgProp:
|
case pb.MsgProp:
|
||||||
raftLogger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
|
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
|
||||||
return
|
return
|
||||||
case pb.MsgApp:
|
case pb.MsgApp:
|
||||||
r.becomeFollower(r.Term, m.From)
|
r.becomeFollower(r.Term, m.From)
|
||||||
@ -606,12 +618,12 @@ func stepCandidate(r *raft, m pb.Message) {
|
|||||||
r.becomeFollower(m.Term, m.From)
|
r.becomeFollower(m.Term, m.From)
|
||||||
r.handleSnapshot(m)
|
r.handleSnapshot(m)
|
||||||
case pb.MsgVote:
|
case pb.MsgVote:
|
||||||
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
|
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %x",
|
||||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
||||||
case pb.MsgVoteResp:
|
case pb.MsgVoteResp:
|
||||||
gr := r.poll(m.From, !m.Reject)
|
gr := r.poll(m.From, !m.Reject)
|
||||||
raftLogger.Infof("%x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
|
r.logger.Infof("%x [q:%d] has received %d votes and %d vote rejections", r.id, r.q(), gr, len(r.votes)-gr)
|
||||||
switch r.q() {
|
switch r.q() {
|
||||||
case gr:
|
case gr:
|
||||||
r.becomeLeader()
|
r.becomeLeader()
|
||||||
@ -626,7 +638,7 @@ func stepFollower(r *raft, m pb.Message) {
|
|||||||
switch m.Type {
|
switch m.Type {
|
||||||
case pb.MsgProp:
|
case pb.MsgProp:
|
||||||
if r.lead == None {
|
if r.lead == None {
|
||||||
raftLogger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
|
r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
m.To = r.lead
|
m.To = r.lead
|
||||||
@ -645,12 +657,12 @@ func stepFollower(r *raft, m pb.Message) {
|
|||||||
case pb.MsgVote:
|
case pb.MsgVote:
|
||||||
if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
|
if (r.Vote == None || r.Vote == m.From) && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
|
||||||
r.elapsed = 0
|
r.elapsed = 0
|
||||||
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d",
|
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] voted for %x [logterm: %d, index: %d] at term %d",
|
||||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||||
r.Vote = m.From
|
r.Vote = m.From
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp})
|
||||||
} else {
|
} else {
|
||||||
raftLogger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected vote from %x [logterm: %d, index: %d] at term %d",
|
||||||
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
|
||||||
}
|
}
|
||||||
@ -666,8 +678,8 @@ func (r *raft) handleAppendEntries(m pb.Message) {
|
|||||||
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
|
if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
|
||||||
} else {
|
} else {
|
||||||
raftLogger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
|
r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
|
||||||
r.id, zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
|
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -680,11 +692,11 @@ func (r *raft) handleHeartbeat(m pb.Message) {
|
|||||||
func (r *raft) handleSnapshot(m pb.Message) {
|
func (r *raft) handleSnapshot(m pb.Message) {
|
||||||
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
|
sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
|
||||||
if r.restore(m.Snapshot) {
|
if r.restore(m.Snapshot) {
|
||||||
raftLogger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
|
r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
|
||||||
r.id, r.Commit, sindex, sterm)
|
r.id, r.Commit, sindex, sterm)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
|
||||||
} else {
|
} else {
|
||||||
raftLogger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
|
r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
|
||||||
r.id, r.Commit, sindex, sterm)
|
r.id, r.Commit, sindex, sterm)
|
||||||
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
|
||||||
}
|
}
|
||||||
@ -697,13 +709,13 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
|
if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
|
||||||
raftLogger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
|
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
|
||||||
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||||
r.raftLog.commitTo(s.Metadata.Index)
|
r.raftLog.commitTo(s.Metadata.Index)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
raftLogger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
|
r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
|
||||||
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
r.id, r.Commit, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
|
||||||
|
|
||||||
r.raftLog.restore(s)
|
r.raftLog.restore(s)
|
||||||
@ -716,7 +728,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
|
|||||||
match = 0
|
match = 0
|
||||||
}
|
}
|
||||||
r.setProgress(n, match, next)
|
r.setProgress(n, match, next)
|
||||||
raftLogger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n])
|
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n])
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@ -756,7 +768,7 @@ func (r *raft) delProgress(id uint64) {
|
|||||||
|
|
||||||
func (r *raft) loadState(state pb.HardState) {
|
func (r *raft) loadState(state pb.HardState) {
|
||||||
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
|
if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
|
||||||
raftLogger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
|
r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
|
||||||
}
|
}
|
||||||
r.raftLog.committed = state.Commit
|
r.raftLog.committed = state.Commit
|
||||||
r.Term = state.Term
|
r.Term = state.Term
|
||||||
|
@ -492,7 +492,7 @@ func TestDuelingCandidates(t *testing.T) {
|
|||||||
}{
|
}{
|
||||||
{a, StateFollower, 2, wlog},
|
{a, StateFollower, 2, wlog},
|
||||||
{b, StateFollower, 2, wlog},
|
{b, StateFollower, 2, wlog},
|
||||||
{c, StateFollower, 2, newLog(NewMemoryStorage())},
|
{c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i, tt := range tests {
|
for i, tt := range tests {
|
||||||
@ -638,7 +638,7 @@ func TestProposal(t *testing.T) {
|
|||||||
send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
|
||||||
send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
|
send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
|
||||||
|
|
||||||
wantLog := newLog(NewMemoryStorage())
|
wantLog := newLog(NewMemoryStorage(), raftLogger)
|
||||||
if tt.success {
|
if tt.success {
|
||||||
wantLog = &raftLog{
|
wantLog = &raftLog{
|
||||||
storage: &MemoryStorage{
|
storage: &MemoryStorage{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user