Add Storage.Term() method and hide the first entry from other methods.

The first entry in the log is a dummy which is used for matchTerm
but may not have an actual payload. This change permits Storage
implementations to treat this term value specially instead of
storing it as a dummy Entry.

Storage.FirstIndex() no longer includes the term-only entry.

This reverses a recent decision to create entry zero as initially
unstable; Storage implementations are now required to make
Term(0) == 0 and the first unstable entry is now index 1.
stableTo(0) is no longer allowed.
This commit is contained in:
Ben Darnell 2014-11-17 16:37:46 -05:00
parent 45e96be605
commit 64d9bcabf1
8 changed files with 153 additions and 88 deletions

View File

@ -840,9 +840,29 @@ func TestSnapshot(t *testing.T) {
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s)
defer n.Stop()
// Save the initial state to storage so we have something to snapshot.
// Progress the node to the point where it has something to snapshot.
// TODO(bdarnell): this could be improved with changes in the raft internals.
// First, we must apply the initial conf changes so we can have an election.
rd := <-n.Ready()
s.Append(rd.Entries)
for _, e := range rd.CommittedEntries {
if e.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
err := cc.Unmarshal(e.Data)
if err != nil {
t.Fatal(err)
}
n.ApplyConfChange(cc)
}
}
n.Advance()
// Now we can have an election and persist the rest of the log.
// This causes HardState.Commit to advance. HardState.Commit must
// be > 0 to snapshot.
n.Campaign(context.Background())
rd = <-n.Ready()
s.Append(rd.Entries)
n.Advance()
st := &storeRecorder{}
@ -854,7 +874,7 @@ func TestSnapshot(t *testing.T) {
raftStorage: s,
}
srv.snapshot(0, []uint64{1})
srv.snapshot(1, []uint64{1})
gaction := st.Action()
if len(gaction) != 1 {
t.Fatalf("len(action) = %d, want 1", len(gaction))

View File

@ -53,14 +53,11 @@ func newLog(storage Storage) *raftLog {
storage: storage,
}
lastIndex, err := storage.LastIndex()
if err == ErrStorageEmpty {
// When starting from scratch populate the list with a dummy entry at term zero.
log.unstableEnts = make([]pb.Entry, 1)
} else if err == nil {
log.unstable = lastIndex + 1
} else {
if err != nil {
panic(err) // TODO(bdarnell)
}
log.unstable = lastIndex + 1
return log
}
@ -190,27 +187,28 @@ func (l *raftLog) lastTerm() uint64 {
}
func (l *raftLog) term(i uint64) uint64 {
if e := l.at(i); e != nil {
return e.Term
if i < l.unstable {
t, err := l.storage.Term(i)
if err == ErrSnapshotRequired {
return 0
} else if err != nil {
panic(err) // TODO(bdarnell)
}
return t
}
return 0
if i >= l.unstable+uint64(len(l.unstableEnts)) {
return 0
}
return l.unstableEnts[i-l.unstable].Term
}
func (l *raftLog) entries(i uint64) []pb.Entry {
// never send out the first entry
// first entry is only used for matching
// prevLogTerm
if i == 0 {
panic("cannot return the first entry in log")
}
return l.slice(i, l.lastIndex()+1)
}
// allEntries returns all entries in the log, including the initial
// entry that is only used for prevLogTerm validation. This method
// should only be used for testing.
// allEntries returns all entries in the log.
func (l *raftLog) allEntries() []pb.Entry {
return l.slice(l.firstIndex(), l.lastIndex()+1)
return l.entries(l.firstIndex())
}
// isUpToDate determines if the given (lastIndex,term) log is more up-to-date
@ -224,10 +222,7 @@ func (l *raftLog) isUpToDate(lasti, term uint64) bool {
}
func (l *raftLog) matchTerm(i, term uint64) bool {
if e := l.at(i); e != nil {
return e.Term == term
}
return false
return l.term(i) == term
}
func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
@ -303,7 +298,9 @@ func (l *raftLog) slice(lo uint64, hi uint64) []pb.Entry {
var ents []pb.Entry
if lo < l.unstable {
storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable))
if err != nil {
if err == ErrSnapshotRequired {
return nil
} else if err != nil {
panic(err) // TODO(bdarnell)
}
ents = append(ents, storedEnts...)

View File

@ -91,7 +91,7 @@ func TestIsUpToDate(t *testing.T) {
}
func TestAppend(t *testing.T) {
previousEnts := []pb.Entry{{}, {Term: 1}, {Term: 2}}
previousEnts := []pb.Entry{{Term: 1}, {Term: 2}}
tests := []struct {
after uint64
ents []pb.Entry
@ -283,7 +283,7 @@ func TestCompactionSideEffects(t *testing.T) {
unstableIndex := uint64(750)
lastTerm := lastIndex
storage := NewMemoryStorage()
for i = 0; i <= unstableIndex; i++ {
for i = 1; i <= unstableIndex; i++ {
storage.Append([]pb.Entry{{Term: uint64(i), Index: uint64(i)}})
}
raftLog := newLog(storage)
@ -337,22 +337,23 @@ func TestCompactionSideEffects(t *testing.T) {
}
func TestUnstableEnts(t *testing.T) {
previousEnts := []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
tests := []struct {
unstable uint64
wents []pb.Entry
wunstable uint64
}{
{3, nil, 3},
{1, previousEnts[1:], 3},
{0, append([]pb.Entry{{}}, previousEnts...), 3},
{1, previousEnts, 3},
}
for i, tt := range tests {
storage := NewMemoryStorage()
storage.Append(previousEnts[:tt.unstable])
if tt.unstable > 0 {
storage.Append(previousEnts[:tt.unstable-1])
}
raftLog := newLog(storage)
raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable:]...)
raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable-1:]...)
ents := raftLog.unstableEntries()
if l := len(ents); l > 0 {
raftLog.stableTo(ents[l-1].Index)
@ -371,7 +372,6 @@ func TestStableTo(t *testing.T) {
stable uint64
wunstable uint64
}{
{0, 1},
{1, 2},
{2, 3},
}
@ -396,9 +396,9 @@ func TestCompaction(t *testing.T) {
}{
// out of upper bound
{1000, 1000, []uint64{1001}, []int{-1}, false},
{1000, 1000, []uint64{300, 500, 800, 900}, []int{701, 501, 201, 101}, true},
{1000, 1000, []uint64{300, 500, 800, 900}, []int{700, 500, 200, 100}, true},
// out of lower bound
{1000, 1000, []uint64{300, 299}, []int{701, -1}, false},
{1000, 1000, []uint64{300, 299}, []int{700, -1}, false},
{0, 1000, []uint64{1}, []int{-1}, false},
}
@ -413,7 +413,7 @@ func TestCompaction(t *testing.T) {
}()
storage := NewMemoryStorage()
for i := uint64(0); i <= tt.lastIndex; i++ {
for i := uint64(1); i <= tt.lastIndex; i++ {
storage.Append([]pb.Entry{{}})
}
raftLog := newLog(storage)
@ -442,11 +442,11 @@ func TestLogRestore(t *testing.T) {
raftLog.restore(pb.Snapshot{Index: index, Term: term})
// only has the guard entry
if len(raftLog.allEntries()) != 1 {
if len(raftLog.allEntries()) != 0 {
t.Errorf("len = %d, want 1", len(raftLog.allEntries()))
}
if raftLog.firstIndex() != index {
t.Errorf("firstIndex = %d, want %d", raftLog.firstIndex(), index)
if raftLog.firstIndex() != index+1 {
t.Errorf("firstIndex = %d, want %d", raftLog.firstIndex(), index+1)
}
if raftLog.applied != index {
t.Errorf("applied = %d, want %d", raftLog.applied, index)
@ -474,7 +474,7 @@ func TestIsOutOfBounds(t *testing.T) {
w bool
}{
{offset - 1, true},
{offset, false},
{offset, true},
{offset + num/2, false},
{offset + num, false},
{offset + num + 1, true},
@ -504,7 +504,7 @@ func TestAt(t *testing.T) {
w *pb.Entry
}{
{offset - 1, nil},
{offset, &pb.Entry{Term: 0}},
{offset, nil},
{offset + num/2, &pb.Entry{Term: num / 2}},
{offset + num - 1, &pb.Entry{Term: num - 1}},
{offset + num, nil},
@ -518,6 +518,36 @@ func TestAt(t *testing.T) {
}
}
func TestTerm(t *testing.T) {
var i uint64
offset := uint64(100)
num := uint64(100)
l := newLog(NewMemoryStorage())
l.restore(pb.Snapshot{Index: offset})
for i = 1; i < num; i++ {
l.append(offset+i-1, pb.Entry{Term: i})
}
tests := []struct {
index uint64
w uint64
}{
{offset - 1, 0},
{offset, 0},
{offset + num/2, num / 2},
{offset + num - 1, num - 1},
{offset + num, 0},
}
for i, tt := range tests {
term := l.term(tt.index)
if !reflect.DeepEqual(term, tt.w) {
t.Errorf("#%d: at = %d, want %d", i, term, tt.w)
}
}
}
func TestSlice(t *testing.T) {
var i uint64
offset := uint64(100)
@ -535,7 +565,7 @@ func TestSlice(t *testing.T) {
w []pb.Entry
}{
{offset - 1, offset + 1, nil},
{offset, offset + 1, []pb.Entry{{Term: 0}}},
{offset, offset + 1, nil},
{offset + num/2, offset + num/2 + 1, []pb.Entry{{Term: num / 2}}},
{offset + num - 1, offset + num, []pb.Entry{{Term: num - 1}}},
{offset + num, offset + num + 1, nil},

View File

@ -308,7 +308,6 @@ func TestNodeStart(t *testing.T) {
SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader},
HardState: raftpb.HardState{Term: 1, Commit: 2},
Entries: []raftpb.Entry{
{},
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
{Term: 1, Index: 2},
},
@ -352,7 +351,6 @@ func TestNodeStart(t *testing.T) {
func TestNodeRestart(t *testing.T) {
entries := []raftpb.Entry{
{},
{Term: 1, Index: 1},
{Term: 1, Index: 2, Data: []byte("foo")},
}
@ -361,7 +359,7 @@ func TestNodeRestart(t *testing.T) {
want := Ready{
HardState: emptyState,
// commit upto index commit index in st
CommittedEntries: entries[1 : st.Commit+1],
CommittedEntries: entries[:st.Commit],
}
storage := NewMemoryStorage()
@ -429,7 +427,7 @@ func TestNodeCompact(t *testing.T) {
}
n.Stop()
if r.raftLog.firstIndex() != w.Index {
if r.raftLog.firstIndex() != w.Index+1 {
t.Errorf("log.offset = %d, want %d", r.raftLog.firstIndex(), w.Index)
}
}

View File

@ -133,7 +133,7 @@ func newRaft(id uint64, peers []uint64, election, heartbeat int, storage Storage
}
r.rand = rand.New(rand.NewSource(int64(id)))
for _, p := range peers {
r.prs[p] = &progress{}
r.prs[p] = &progress{next: 1}
}
r.becomeFollower(0, None)
return r
@ -187,12 +187,12 @@ func (r *raft) sendAppend(to uint64) {
pr := r.prs[to]
m := pb.Message{}
m.To = to
m.Index = pr.next - 1
if r.needSnapshot(m.Index) {
if r.needSnapshot(pr.next) {
m.Type = pb.MsgSnap
m.Snapshot = r.raftLog.snapshot
} else {
m.Type = pb.MsgApp
m.Index = pr.next - 1
m.LogTerm = r.raftLog.term(pr.next - 1)
m.Entries = r.raftLog.entries(pr.next)
m.Commit = r.raftLog.committed

View File

@ -514,7 +514,7 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) {
}
for i, tt := range tests {
storage := NewMemoryStorage()
storage.Append(append([]pb.Entry{{}}, tt...))
storage.Append(tt)
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
r.loadState(pb.HardState{Term: 2})
r.becomeCandidate()
@ -591,17 +591,17 @@ func TestFollowerCommitEntry(t *testing.T) {
// append entries.
// Reference: section 5.3
func TestFollowerCheckMsgApp(t *testing.T) {
ents := []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}
ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
tests := []struct {
term uint64
index uint64
wreject bool
}{
{0, 0, false},
{ents[0].Term, ents[0].Index, false},
{ents[1].Term, ents[1].Index, false},
{ents[2].Term, ents[2].Index, false},
{ents[1].Term, ents[1].Index + 1, true},
{ents[1].Term + 1, ents[1].Index, true},
{ents[0].Term, ents[0].Index + 1, true},
{ents[0].Term + 1, ents[0].Index, true},
{3, 3, true},
}
for i, tt := range tests {
@ -638,31 +638,31 @@ func TestFollowerAppendEntries(t *testing.T) {
{
2, 2,
[]pb.Entry{{Term: 3, Index: 3}},
[]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}},
[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}},
[]pb.Entry{{Term: 3, Index: 3}},
},
{
1, 1,
[]pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}},
[]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 3, Index: 3}, {Term: 4, Index: 4}},
[]pb.Entry{{Term: 1, Index: 1}, {Term: 3, Index: 3}, {Term: 4, Index: 4}},
[]pb.Entry{{Term: 3, Index: 3}, {Term: 4, Index: 4}},
},
{
0, 0,
[]pb.Entry{{Term: 1, Index: 1}},
[]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}},
[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}},
nil,
},
{
0, 0,
[]pb.Entry{{Term: 3, Index: 3}},
[]pb.Entry{{}, {Term: 3, Index: 3}},
[]pb.Entry{{Term: 3, Index: 3}},
[]pb.Entry{{Term: 3, Index: 3}},
},
}
for i, tt := range tests {
storage := NewMemoryStorage()
storage.Append([]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}})
storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}})
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
r.becomeFollower(2, 2)
@ -813,17 +813,17 @@ func TestVoter(t *testing.T) {
wreject bool
}{
// same logterm
{[]pb.Entry{{}, {Term: 1, Index: 1}}, 1, 1, false},
{[]pb.Entry{{}, {Term: 1, Index: 1}}, 1, 2, false},
{[]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
{[]pb.Entry{{Term: 1, Index: 1}}, 1, 1, false},
{[]pb.Entry{{Term: 1, Index: 1}}, 1, 2, false},
{[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
// candidate higher logterm
{[]pb.Entry{{}, {Term: 1, Index: 1}}, 2, 1, false},
{[]pb.Entry{{}, {Term: 1, Index: 1}}, 2, 2, false},
{[]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 1, Index: 2}}, 2, 1, false},
{[]pb.Entry{{Term: 1, Index: 1}}, 2, 1, false},
{[]pb.Entry{{Term: 1, Index: 1}}, 2, 2, false},
{[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 2, 1, false},
// voter higher logterm
{[]pb.Entry{{}, {Term: 2, Index: 1}}, 1, 1, true},
{[]pb.Entry{{}, {Term: 2, Index: 1}}, 1, 2, true},
{[]pb.Entry{{}, {Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
{[]pb.Entry{{Term: 2, Index: 1}}, 1, 1, true},
{[]pb.Entry{{Term: 2, Index: 1}}, 1, 2, true},
{[]pb.Entry{{Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
}
for i, tt := range tests {
storage := NewMemoryStorage()
@ -850,7 +850,7 @@ func TestVoter(t *testing.T) {
// current term are committed by counting replicas.
// Reference: section 5.4.2
func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) {
ents := []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}
ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
tests := []struct {
index uint64
wcommit uint64

View File

@ -547,8 +547,8 @@ func TestCompact(t *testing.T) {
}
sm.compact(tt.compacti, tt.nodes, tt.snapd)
sort.Sort(uint64Slice(sm.raftLog.snapshot.Nodes))
if sm.raftLog.firstIndex() != tt.compacti {
t.Errorf("%d: log.firstIndex = %d, want %d", i, sm.raftLog.firstIndex(), tt.compacti)
if sm.raftLog.firstIndex() != tt.compacti+1 {
t.Errorf("%d: log.firstIndex = %d, want %d", i, sm.raftLog.firstIndex(), tt.compacti+1)
}
if !reflect.DeepEqual(sm.raftLog.snapshot.Nodes, tt.nodes) {
t.Errorf("%d: snap.nodes = %v, want %v", i, sm.raftLog.snapshot.Nodes, tt.nodes)
@ -836,9 +836,9 @@ func TestAllServerStepdown(t *testing.T) {
wterm uint64
windex uint64
}{
{StateFollower, StateFollower, 3, 1},
{StateCandidate, StateFollower, 3, 1},
{StateLeader, StateFollower, 3, 2},
{StateFollower, StateFollower, 3, 0},
{StateCandidate, StateFollower, 3, 0},
{StateLeader, StateFollower, 3, 1},
}
tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
@ -865,8 +865,11 @@ func TestAllServerStepdown(t *testing.T) {
if sm.Term != tt.wterm {
t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
}
if uint64(sm.raftLog.lastIndex()) != tt.windex {
t.Errorf("#%d.%d index = %v , want %v", i, j, sm.raftLog.lastIndex(), tt.windex)
}
if uint64(len(sm.raftLog.allEntries())) != tt.windex {
t.Errorf("#%d.%d index = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex)
t.Errorf("#%d.%d len(ents) = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex)
}
wlead := uint64(2)
if msgType == pb.MsgVote {

View File

@ -23,9 +23,9 @@ import (
pb "github.com/coreos/etcd/raft/raftpb"
)
// ErrStorageEmpty is returned by Storage.GetLastIndex when there is
// no data.
var ErrStorageEmpty = errors.New("storage is empty")
// ErrSnapshotRequired is returned by Storage.Entries when a requested
// index is unavailable because it predates the last snapshot.
var ErrSnapshotRequired = errors.New("snapshot required; requested index is too old")
// Storage is an interface that may be implemented by the application
// to retrieve log entries from storage.
@ -36,11 +36,15 @@ var ErrStorageEmpty = errors.New("storage is empty")
type Storage interface {
// Entries returns a slice of log entries in the range [lo,hi).
Entries(lo, hi uint64) ([]pb.Entry, error)
// GetLastIndex returns the index of the last entry in the log.
// If the log is empty it returns ErrStorageEmpty.
// Term returns the term of entry i, which must be in the range
// [FirstIndex()-1, LastIndex()]. The term of the entry before
// FirstIndex is retained for matching purposes even though the
// rest of that entry may not be available.
Term(i uint64) (uint64, error)
// LastIndex returns the index of the last entry in the log.
LastIndex() (uint64, error)
// GetFirstIndex returns the index of the first log entry that is
// available via GetEntries (older entries have been incorporated
// FirstIndex returns the index of the first log entry that is
// available via Entries (older entries have been incorporated
// into the latest Snapshot).
FirstIndex() (uint64, error)
// Compact discards all log entries prior to i.
@ -65,23 +69,36 @@ type MemoryStorage struct {
// NewMemoryStorage creates an empty MemoryStorage.
func NewMemoryStorage() *MemoryStorage {
return &MemoryStorage{}
return &MemoryStorage{
// When starting from scratch populate the list with a dummy entry at term zero.
ents: make([]pb.Entry, 1),
}
}
// Entries implements the Storage interface.
func (ms *MemoryStorage) Entries(lo, hi uint64) ([]pb.Entry, error) {
ms.Lock()
defer ms.Unlock()
if lo <= ms.offset {
return nil, ErrSnapshotRequired
}
return ms.ents[lo-ms.offset : hi-ms.offset], nil
}
// Term implements the Storage interface.
func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
ms.Lock()
defer ms.Unlock()
if i < ms.offset || i > ms.offset+uint64(len(ms.ents)) {
return 0, ErrSnapshotRequired
}
return ms.ents[i-ms.offset].Term, nil
}
// LastIndex implements the Storage interface.
func (ms *MemoryStorage) LastIndex() (uint64, error) {
ms.Lock()
defer ms.Unlock()
if len(ms.ents) == 0 {
return 0, ErrStorageEmpty
}
return ms.offset + uint64(len(ms.ents)) - 1, nil
}
@ -89,7 +106,7 @@ func (ms *MemoryStorage) LastIndex() (uint64, error) {
func (ms *MemoryStorage) FirstIndex() (uint64, error) {
ms.Lock()
defer ms.Unlock()
return ms.offset, nil
return ms.offset + 1, nil
}
// Compact implements the Storage interface.