From 609e13a240876bfc63eca04ced9d9b4f3739bfca Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 23 Jul 2014 21:45:57 -0700 Subject: [PATCH] raft: add node.Unstable Be able to return all unstable log entries. Application must store unstable log entries before send out any messages after calling step. --- raft/log.go | 14 ++++++++++++- raft/log_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++---- raft/node.go | 5 +++++ 3 files changed, 66 insertions(+), 5 deletions(-) diff --git a/raft/log.go b/raft/log.go index cd770266d..a6b1fa452 100644 --- a/raft/log.go +++ b/raft/log.go @@ -26,6 +26,7 @@ func (e *Entry) isConfig() bool { type raftLog struct { ents []Entry + unstable int64 committed int64 applied int64 offset int64 @@ -38,6 +39,7 @@ type raftLog struct { func newLog() *raftLog { return &raftLog{ ents: make([]Entry, 1), + unstable: 1, committed: 0, applied: 0, compactThreshold: defaultCompactThreshold, @@ -69,6 +71,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed int64, ents ...Entry) bo func (l *raftLog) append(after int64, ents ...Entry) int64 { l.ents = append(l.slice(l.offset, after+1), ents...) + l.unstable = min(l.unstable, after+1) return l.lastIndex() } @@ -81,6 +84,12 @@ func (l *raftLog) findConflict(from int64, ents []Entry) int64 { return -1 } +func (l *raftLog) unstableEnts() []Entry { + ents := l.entries(l.unstable) + l.unstable = l.lastIndex() + 1 + return ents +} + func (l *raftLog) lastIndex() int64 { return int64(len(l.ents)) - 1 + l.offset } @@ -132,7 +141,8 @@ func (l *raftLog) nextEnts() (ents []Entry) { return ents } -// compact removes the log entries before i, exclusive. +// compact compacts all log entries until i. +// It removes the log entries before i, exclusive. // i must be not smaller than the index of the first entry // and not greater than the index of the last entry. // the number of entries after compaction will be returned. @@ -141,6 +151,7 @@ func (l *raftLog) compact(i int64) int64 { panic(fmt.Sprintf("compact %d out of bounds [%d:%d]", i, l.offset, l.lastIndex())) } l.ents = l.slice(i, l.lastIndex()+1) + l.unstable = max(i+1, l.unstable) l.offset = i return int64(len(l.ents)) } @@ -151,6 +162,7 @@ func (l *raftLog) shouldCompact() bool { func (l *raftLog) restore(index, term int64) { l.ents = []Entry{{Term: term}} + l.unstable = index + 1 l.committed = index l.applied = index l.offset = index diff --git a/raft/log_test.go b/raft/log_test.go index 0ae0dd499..a8bbd7736 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -12,23 +12,27 @@ import ( // 2.Append any new entries not already in the log func TestAppend(t *testing.T) { previousEnts := []Entry{{Term: 1}, {Term: 2}} + previousUnstable := int64(3) tests := []struct { - after int64 - ents []Entry - windex int64 - wents []Entry + after int64 + ents []Entry + windex int64 + wents []Entry + wunstable int64 }{ { 2, []Entry{}, 2, []Entry{{Term: 1}, {Term: 2}}, + 3, }, { 2, []Entry{{Term: 2}}, 3, []Entry{{Term: 1}, {Term: 2}, {Term: 2}}, + 3, }, // conflicts with index 1 { @@ -36,6 +40,7 @@ func TestAppend(t *testing.T) { []Entry{{Term: 2}}, 1, []Entry{{Term: 2}}, + 1, }, // conflicts with index 2 { @@ -43,12 +48,14 @@ func TestAppend(t *testing.T) { []Entry{{Term: 3}, {Term: 3}}, 3, []Entry{{Term: 1}, {Term: 3}, {Term: 3}}, + 2, }, } for i, tt := range tests { raftLog := newLog() raftLog.ents = append(raftLog.ents, previousEnts...) + raftLog.unstable = previousUnstable index := raftLog.append(tt.after, tt.ents...) if index != tt.windex { t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex) @@ -56,6 +63,9 @@ func TestAppend(t *testing.T) { if g := raftLog.entries(1); !reflect.DeepEqual(g, tt.wents) { t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents) } + if g := raftLog.unstable; g != tt.wunstable { + t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable) + } } } @@ -88,6 +98,11 @@ func TestCompactionSideEffects(t *testing.T) { } } + unstableEnts := raftLog.unstableEnts() + if g := len(unstableEnts); g != 500 { + t.Errorf("len(unstableEntries) = %d, want = %d", g, 500) + } + prev := raftLog.lastIndex() raftLog.append(raftLog.lastIndex(), Entry{Term: raftLog.lastIndex() + 1}) if raftLog.lastIndex() != prev+1 { @@ -100,6 +115,32 @@ func TestCompactionSideEffects(t *testing.T) { } } +func TestUnstableEnts(t *testing.T) { + previousEnts := []Entry{{Term: 1}, {Term: 2}} + tests := []struct { + unstable int64 + wents []Entry + wunstable int64 + }{ + {3, nil, 3}, + {1, []Entry{{Term: 1}, {Term: 2}}, 3}, + } + + for i, tt := range tests { + raftLog := newLog() + raftLog.ents = append(raftLog.ents, previousEnts...) + raftLog.unstable = tt.unstable + ents := raftLog.unstableEnts() + if !reflect.DeepEqual(ents, tt.wents) { + t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents) + } + if g := raftLog.unstable; g != tt.wunstable { + t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable) + } + } + +} + //TestCompaction ensures that the number of log entreis is correct after compactions. func TestCompaction(t *testing.T) { tests := []struct { @@ -164,6 +205,9 @@ func TestLogRestore(t *testing.T) { if raftLog.committed != index { t.Errorf("comitted = %d, want %d", raftLog.committed, index) } + if raftLog.unstable != index+1 { + t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1) + } if raftLog.term(index) != term { t.Errorf("term = %d, want %d", raftLog.term(index), term) } diff --git a/raft/node.go b/raft/node.go index 8a707971d..bf7b2ef05 100644 --- a/raft/node.go +++ b/raft/node.go @@ -204,3 +204,8 @@ func (n *Node) UpdateConf(t int64, c *Config) { } n.propose(t, data) } + +// UnstableEnts retuens all the entries that need to be persistent. +func (n *Node) UnstableEnts() []Entry { + return n.sm.raftLog.unstableEnts() +}