From 64d9bcabf1bed5b0edbbe67bf06b3de3a74a3ae4 Mon Sep 17 00:00:00 2001 From: Ben Darnell Date: Mon, 17 Nov 2014 16:37:46 -0500 Subject: [PATCH] Add Storage.Term() method and hide the first entry from other methods. The first entry in the log is a dummy which is used for matchTerm but may not have an actual payload. This change permits Storage implementations to treat this term value specially instead of storing it as a dummy Entry. Storage.FirstIndex() no longer includes the term-only entry. This reverses a recent decision to create entry zero as initially unstable; Storage implementations are now required to make Term(0) == 0 and the first unstable entry is now index 1. stableTo(0) is no longer allowed. --- etcdserver/server_test.go | 24 +++++++++++++-- raft/log.go | 45 +++++++++++++-------------- raft/log_test.go | 64 ++++++++++++++++++++++++++++----------- raft/node_test.go | 6 ++-- raft/raft.go | 6 ++-- raft/raft_paper_test.go | 40 ++++++++++++------------ raft/raft_test.go | 15 +++++---- raft/storage.go | 41 +++++++++++++++++-------- 8 files changed, 153 insertions(+), 88 deletions(-) diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 6eef6e245..e5d24e65d 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -840,9 +840,29 @@ func TestSnapshot(t *testing.T) { n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s) defer n.Stop() - // Save the initial state to storage so we have something to snapshot. + // Progress the node to the point where it has something to snapshot. + // TODO(bdarnell): this could be improved with changes in the raft internals. + // First, we must apply the initial conf changes so we can have an election. rd := <-n.Ready() s.Append(rd.Entries) + for _, e := range rd.CommittedEntries { + if e.Type == raftpb.EntryConfChange { + var cc raftpb.ConfChange + err := cc.Unmarshal(e.Data) + if err != nil { + t.Fatal(err) + } + n.ApplyConfChange(cc) + } + } + n.Advance() + + // Now we can have an election and persist the rest of the log. + // This causes HardState.Commit to advance. HardState.Commit must + // be > 0 to snapshot. + n.Campaign(context.Background()) + rd = <-n.Ready() + s.Append(rd.Entries) n.Advance() st := &storeRecorder{} @@ -854,7 +874,7 @@ func TestSnapshot(t *testing.T) { raftStorage: s, } - srv.snapshot(0, []uint64{1}) + srv.snapshot(1, []uint64{1}) gaction := st.Action() if len(gaction) != 1 { t.Fatalf("len(action) = %d, want 1", len(gaction)) diff --git a/raft/log.go b/raft/log.go index 428b0396d..7e5831f0e 100644 --- a/raft/log.go +++ b/raft/log.go @@ -53,14 +53,11 @@ func newLog(storage Storage) *raftLog { storage: storage, } lastIndex, err := storage.LastIndex() - if err == ErrStorageEmpty { - // When starting from scratch populate the list with a dummy entry at term zero. - log.unstableEnts = make([]pb.Entry, 1) - } else if err == nil { - log.unstable = lastIndex + 1 - } else { + if err != nil { panic(err) // TODO(bdarnell) } + log.unstable = lastIndex + 1 + return log } @@ -190,27 +187,28 @@ func (l *raftLog) lastTerm() uint64 { } func (l *raftLog) term(i uint64) uint64 { - if e := l.at(i); e != nil { - return e.Term + if i < l.unstable { + t, err := l.storage.Term(i) + if err == ErrSnapshotRequired { + return 0 + } else if err != nil { + panic(err) // TODO(bdarnell) + } + return t } - return 0 + if i >= l.unstable+uint64(len(l.unstableEnts)) { + return 0 + } + return l.unstableEnts[i-l.unstable].Term } func (l *raftLog) entries(i uint64) []pb.Entry { - // never send out the first entry - // first entry is only used for matching - // prevLogTerm - if i == 0 { - panic("cannot return the first entry in log") - } return l.slice(i, l.lastIndex()+1) } -// allEntries returns all entries in the log, including the initial -// entry that is only used for prevLogTerm validation. This method -// should only be used for testing. +// allEntries returns all entries in the log. func (l *raftLog) allEntries() []pb.Entry { - return l.slice(l.firstIndex(), l.lastIndex()+1) + return l.entries(l.firstIndex()) } // isUpToDate determines if the given (lastIndex,term) log is more up-to-date @@ -224,10 +222,7 @@ func (l *raftLog) isUpToDate(lasti, term uint64) bool { } func (l *raftLog) matchTerm(i, term uint64) bool { - if e := l.at(i); e != nil { - return e.Term == term - } - return false + return l.term(i) == term } func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { @@ -303,7 +298,9 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry { var ents []pb.Entry if lo < l.unstable { storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable)) - if err != nil { + if err == ErrSnapshotRequired { + return nil + } else if err != nil { panic(err) // TODO(bdarnell) } ents = append(ents, storedEnts...) diff --git a/raft/log_test.go b/raft/log_test.go index 27089d930..1d601c027 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -91,7 +91,7 @@ func TestIsUpToDate(t *testing.T) { } func TestAppend(t *testing.T) { - previousEnts := []pb.Entry{{}, {Term: 1}, {Term: 2}} + previousEnts := []pb.Entry{{Term: 1}, {Term: 2}} tests := []struct { after uint64 ents []pb.Entry @@ -283,7 +283,7 @@ func TestCompactionSideEffects(t *testing.T) { unstableIndex := uint64(750) lastTerm := lastIndex storage := NewMemoryStorage() - for i = 0; i <= unstableIndex; i++ { + for i = 1; i <= unstableIndex; i++ { storage.Append([]pb.Entry{{Term: uint64(i), Index: uint64(i)}}) } raftLog := newLog(storage) @@ -337,22 +337,23 @@ func TestCompactionSideEffects(t *testing.T) { } func TestUnstableEnts(t *testing.T) { - previousEnts := []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}} + previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}} tests := []struct { unstable uint64 wents []pb.Entry wunstable uint64 }{ {3, nil, 3}, - {1, previousEnts[1:], 3}, - {0, append([]pb.Entry{{}}, previousEnts...), 3}, + {1, previousEnts, 3}, } for i, tt := range tests { storage := NewMemoryStorage() - storage.Append(previousEnts[:tt.unstable]) + if tt.unstable > 0 { + storage.Append(previousEnts[:tt.unstable-1]) + } raftLog := newLog(storage) - raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable:]...) + raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable-1:]...) ents := raftLog.unstableEntries() if l := len(ents); l > 0 { raftLog.stableTo(ents[l-1].Index) @@ -371,7 +372,6 @@ func TestStableTo(t *testing.T) { stable uint64 wunstable uint64 }{ - {0, 1}, {1, 2}, {2, 3}, } @@ -396,9 +396,9 @@ func TestCompaction(t *testing.T) { }{ // out of upper bound {1000, 1000, []uint64{1001}, []int{-1}, false}, - {1000, 1000, []uint64{300, 500, 800, 900}, []int{701, 501, 201, 101}, true}, + {1000, 1000, []uint64{300, 500, 800, 900}, []int{700, 500, 200, 100}, true}, // out of lower bound - {1000, 1000, []uint64{300, 299}, []int{701, -1}, false}, + {1000, 1000, []uint64{300, 299}, []int{700, -1}, false}, {0, 1000, []uint64{1}, []int{-1}, false}, } @@ -413,7 +413,7 @@ func TestCompaction(t *testing.T) { }() storage := NewMemoryStorage() - for i := uint64(0); i <= tt.lastIndex; i++ { + for i := uint64(1); i <= tt.lastIndex; i++ { storage.Append([]pb.Entry{{}}) } raftLog := newLog(storage) @@ -442,11 +442,11 @@ func TestLogRestore(t *testing.T) { raftLog.restore(pb.Snapshot{Index: index, Term: term}) // only has the guard entry - if len(raftLog.allEntries()) != 1 { + if len(raftLog.allEntries()) != 0 { t.Errorf("len = %d, want 1", len(raftLog.allEntries())) } - if raftLog.firstIndex() != index { - t.Errorf("firstIndex = %d, want %d", raftLog.firstIndex(), index) + if raftLog.firstIndex() != index+1 { + t.Errorf("firstIndex = %d, want %d", raftLog.firstIndex(), index+1) } if raftLog.applied != index { t.Errorf("applied = %d, want %d", raftLog.applied, index) @@ -474,7 +474,7 @@ func TestIsOutOfBounds(t *testing.T) { w bool }{ {offset - 1, true}, - {offset, false}, + {offset, true}, {offset + num/2, false}, {offset + num, false}, {offset + num + 1, true}, @@ -504,7 +504,7 @@ func TestAt(t *testing.T) { w *pb.Entry }{ {offset - 1, nil}, - {offset, &pb.Entry{Term: 0}}, + {offset, nil}, {offset + num/2, &pb.Entry{Term: num / 2}}, {offset + num - 1, &pb.Entry{Term: num - 1}}, {offset + num, nil}, @@ -518,6 +518,36 @@ func TestAt(t *testing.T) { } } +func TestTerm(t *testing.T) { + var i uint64 + offset := uint64(100) + num := uint64(100) + + l := newLog(NewMemoryStorage()) + l.restore(pb.Snapshot{Index: offset}) + for i = 1; i < num; i++ { + l.append(offset+i-1, pb.Entry{Term: i}) + } + + tests := []struct { + index uint64 + w uint64 + }{ + {offset - 1, 0}, + {offset, 0}, + {offset + num/2, num / 2}, + {offset + num - 1, num - 1}, + {offset + num, 0}, + } + + for i, tt := range tests { + term := l.term(tt.index) + if !reflect.DeepEqual(term, tt.w) { + t.Errorf("#%d: at = %d, want %d", i, term, tt.w) + } + } +} + func TestSlice(t *testing.T) { var i uint64 offset := uint64(100) @@ -535,7 +565,7 @@ func TestSlice(t *testing.T) { w []pb.Entry }{ {offset - 1, offset + 1, nil}, - {offset, offset + 1, []pb.Entry{{Term: 0}}}, + {offset, offset + 1, nil}, {offset + num/2, offset + num/2 + 1, []pb.Entry{{Term: num / 2}}}, {offset + num - 1, offset + num, []pb.Entry{{Term: num - 1}}}, {offset + num, offset + num + 1, nil}, diff --git a/raft/node_test.go b/raft/node_test.go index 1ebbb9285..0d1199524 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -308,7 +308,6 @@ func TestNodeStart(t *testing.T) { SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader}, HardState: raftpb.HardState{Term: 1, Commit: 2}, Entries: []raftpb.Entry{ - {}, {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, {Term: 1, Index: 2}, }, @@ -352,7 +351,6 @@ func TestNodeStart(t *testing.T) { func TestNodeRestart(t *testing.T) { entries := []raftpb.Entry{ - {}, {Term: 1, Index: 1}, {Term: 1, Index: 2, Data: []byte("foo")}, } @@ -361,7 +359,7 @@ func TestNodeRestart(t *testing.T) { want := Ready{ HardState: emptyState, // commit upto index commit index in st - CommittedEntries: entries[1 : st.Commit+1], + CommittedEntries: entries[:st.Commit], } storage := NewMemoryStorage() @@ -429,7 +427,7 @@ func TestNodeCompact(t *testing.T) { } n.Stop() - if r.raftLog.firstIndex() != w.Index { + if r.raftLog.firstIndex() != w.Index+1 { t.Errorf("log.offset = %d, want %d", r.raftLog.firstIndex(), w.Index) } } diff --git a/raft/raft.go b/raft/raft.go index 9ec1c6145..20bde1dac 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -133,7 +133,7 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage } r.rand = rand.New(rand.NewSource(int64(id))) for _, p := range peers { - r.prs[p] = &progress{} + r.prs[p] = &progress{next: 1} } r.becomeFollower(0, None) return r @@ -187,12 +187,12 @@ func (r *raft) sendAppend(to uint64) { pr := r.prs[to] m := pb.Message{} m.To = to - m.Index = pr.next - 1 - if r.needSnapshot(m.Index) { + if r.needSnapshot(pr.next) { m.Type = pb.MsgSnap m.Snapshot = r.raftLog.snapshot } else { m.Type = pb.MsgApp + m.Index = pr.next - 1 m.LogTerm = r.raftLog.term(pr.next - 1) m.Entries = r.raftLog.entries(pr.next) m.Commit = r.raftLog.committed diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index 8433010c7..690e73836 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -514,7 +514,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) { } for i, tt := range tests { storage := NewMemoryStorage() - storage.Append(append([]pb.Entry{{}}, tt...)) + storage.Append(tt) r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage) r.loadState(pb.HardState{Term: 2}) r.becomeCandidate() @@ -591,17 +591,17 @@ func TestFollowerCommitEntry(t *testing.T) { // append entries. // Reference: section 5.3 func TestFollowerCheckMsgApp(t *testing.T) { - ents := []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}} + ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}} tests := []struct { term uint64 index uint64 wreject bool }{ + {0, 0, false}, {ents[0].Term, ents[0].Index, false}, {ents[1].Term, ents[1].Index, false}, - {ents[2].Term, ents[2].Index, false}, - {ents[1].Term, ents[1].Index + 1, true}, - {ents[1].Term + 1, ents[1].Index, true}, + {ents[0].Term, ents[0].Index + 1, true}, + {ents[0].Term + 1, ents[0].Index, true}, {3, 3, true}, } for i, tt := range tests { @@ -638,31 +638,31 @@ func TestFollowerAppendEntries(t *testing.T) { { 2, 2, []pb.Entry{{Term: 3, Index: 3}}, - []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}}, + []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}}, []pb.Entry{{Term: 3, Index: 3}}, }, { 1, 1, []pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}}, - []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 3, Index: 3}, {Term: 4, Index: 4}}, + []pb.Entry{{Term: 1, Index: 1}, {Term: 3, Index: 3}, {Term: 4, Index: 4}}, []pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}}, }, { 0, 0, []pb.Entry{{Term: 1, Index: 1}}, - []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}, + []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, nil, }, { 0, 0, []pb.Entry{{Term: 3, Index: 3}}, - []pb.Entry{{}, {Term: 3, Index: 3}}, + []pb.Entry{{Term: 3, Index: 3}}, []pb.Entry{{Term: 3, Index: 3}}, }, } for i, tt := range tests { storage := NewMemoryStorage() - storage.Append([]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}) + storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}) r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage) r.becomeFollower(2, 2) @@ -813,17 +813,17 @@ func TestVoter(t *testing.T) { wreject bool }{ // same logterm - {[]pb.Entry{{}, {Term: 1, Index: 1}}, 1, 1, false}, - {[]pb.Entry{{}, {Term: 1, Index: 1}}, 1, 2, false}, - {[]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true}, + {[]pb.Entry{{Term: 1, Index: 1}}, 1, 1, false}, + {[]pb.Entry{{Term: 1, Index: 1}}, 1, 2, false}, + {[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true}, // candidate higher logterm - {[]pb.Entry{{}, {Term: 1, Index: 1}}, 2, 1, false}, - {[]pb.Entry{{}, {Term: 1, Index: 1}}, 2, 2, false}, - {[]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 1, Index: 2}}, 2, 1, false}, + {[]pb.Entry{{Term: 1, Index: 1}}, 2, 1, false}, + {[]pb.Entry{{Term: 1, Index: 1}}, 2, 2, false}, + {[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 2, 1, false}, // voter higher logterm - {[]pb.Entry{{}, {Term: 2, Index: 1}}, 1, 1, true}, - {[]pb.Entry{{}, {Term: 2, Index: 1}}, 1, 2, true}, - {[]pb.Entry{{}, {Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true}, + {[]pb.Entry{{Term: 2, Index: 1}}, 1, 1, true}, + {[]pb.Entry{{Term: 2, Index: 1}}, 1, 2, true}, + {[]pb.Entry{{Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true}, } for i, tt := range tests { storage := NewMemoryStorage() @@ -850,7 +850,7 @@ func TestVoter(t *testing.T) { // current term are committed by counting replicas. // Reference: section 5.4.2 func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) { - ents := []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}} + ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}} tests := []struct { index uint64 wcommit uint64 diff --git a/raft/raft_test.go b/raft/raft_test.go index 21c8dfbef..6798b4932 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -547,8 +547,8 @@ func TestCompact(t *testing.T) { } sm.compact(tt.compacti, tt.nodes, tt.snapd) sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes)) - if sm.raftLog.firstIndex() != tt.compacti { - t.Errorf("%d: log.firstIndex = %d, want %d", i, sm.raftLog.firstIndex(), tt.compacti) + if sm.raftLog.firstIndex() != tt.compacti+1 { + t.Errorf("%d: log.firstIndex = %d, want %d", i, sm.raftLog.firstIndex(), tt.compacti+1) } if !reflect.DeepEqual(sm.raftLog.snapshot.Nodes, tt.nodes) { t.Errorf("%d: snap.nodes = %v, want %v", i, sm.raftLog.snapshot.Nodes, tt.nodes) @@ -836,9 +836,9 @@ func TestAllServerStepdown(t *testing.T) { wterm uint64 windex uint64 }{ - {StateFollower, StateFollower, 3, 1}, - {StateCandidate, StateFollower, 3, 1}, - {StateLeader, StateFollower, 3, 2}, + {StateFollower, StateFollower, 3, 0}, + {StateCandidate, StateFollower, 3, 0}, + {StateLeader, StateFollower, 3, 1}, } tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp} @@ -865,8 +865,11 @@ func TestAllServerStepdown(t *testing.T) { if sm.Term != tt.wterm { t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm) } + if uint64(sm.raftLog.lastIndex()) != tt.windex { + t.Errorf("#%d.%d index = %v , want %v", i, j, sm.raftLog.lastIndex(), tt.windex) + } if uint64(len(sm.raftLog.allEntries())) != tt.windex { - t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex) + t.Errorf("#%d.%d len(ents) = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex) } wlead := uint64(2) if msgType == pb.MsgVote { diff --git a/raft/storage.go b/raft/storage.go index 049b9ea17..072195c5b 100644 --- a/raft/storage.go +++ b/raft/storage.go @@ -23,9 +23,9 @@ import ( pb "github.com/coreos/etcd/raft/raftpb" ) -// ErrStorageEmpty is returned by Storage.GetLastIndex when there is -// no data. -var ErrStorageEmpty = errors.New("storage is empty") +// ErrSnapshotRequired is returned by Storage.Entries when a requested +// index is unavailable because it predates the last snapshot. +var ErrSnapshotRequired = errors.New("snapshot required; requested index is too old") // Storage is an interface that may be implemented by the application // to retrieve log entries from storage. @@ -36,11 +36,15 @@ var ErrStorageEmpty = errors.New("storage is empty") type Storage interface { // Entries returns a slice of log entries in the range [lo,hi). Entries(lo, hi uint64) ([]pb.Entry, error) - // GetLastIndex returns the index of the last entry in the log. - // If the log is empty it returns ErrStorageEmpty. + // Term returns the term of entry i, which must be in the range + // [FirstIndex()-1, LastIndex()]. The term of the entry before + // FirstIndex is retained for matching purposes even though the + // rest of that entry may not be available. + Term(i uint64) (uint64, error) + // LastIndex returns the index of the last entry in the log. LastIndex() (uint64, error) - // GetFirstIndex returns the index of the first log entry that is - // available via GetEntries (older entries have been incorporated + // FirstIndex returns the index of the first log entry that is + // available via Entries (older entries have been incorporated // into the latest Snapshot). FirstIndex() (uint64, error) // Compact discards all log entries prior to i. @@ -65,23 +69,36 @@ type MemoryStorage struct { // NewMemoryStorage creates an empty MemoryStorage. func NewMemoryStorage() *MemoryStorage { - return &MemoryStorage{} + return &MemoryStorage{ + // When starting from scratch populate the list with a dummy entry at term zero. + ents: make([]pb.Entry, 1), + } } // Entries implements the Storage interface. func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) { ms.Lock() defer ms.Unlock() + if lo <= ms.offset { + return nil, ErrSnapshotRequired + } return ms.ents[lo-ms.offset : hi-ms.offset], nil } +// Term implements the Storage interface. +func (ms *MemoryStorage) Term(i uint64) (uint64, error) { + ms.Lock() + defer ms.Unlock() + if i < ms.offset || i > ms.offset+uint64(len(ms.ents)) { + return 0, ErrSnapshotRequired + } + return ms.ents[i-ms.offset].Term, nil +} + // LastIndex implements the Storage interface. func (ms *MemoryStorage) LastIndex() (uint64, error) { ms.Lock() defer ms.Unlock() - if len(ms.ents) == 0 { - return 0, ErrStorageEmpty - } return ms.offset + uint64(len(ms.ents)) - 1, nil } @@ -89,7 +106,7 @@ func (ms *MemoryStorage) LastIndex() (uint64, error) { func (ms *MemoryStorage) FirstIndex() (uint64, error) { ms.Lock() defer ms.Unlock() - return ms.offset, nil + return ms.offset + 1, nil } // Compact implements the Storage interface.