The initial term=0 log entry is now initially unstable.

This entry is now persisted through the normal flow instead of appearing
in the stored log at creation time.  This is how things worked before
the Storage interface was introduced. (see coreos/etcd#1689)
This commit is contained in:
Ben Darnell 2014-11-12 18:21:30 -05:00
parent 76a3de9a33
commit 147fd614ce
6 changed files with 46 additions and 23 deletions

View File

@ -838,6 +838,12 @@ func TestSnapshot(t *testing.T) {
s := raft.NewMemoryStorage() s := raft.NewMemoryStorage()
n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s) n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s)
defer n.Stop() defer n.Stop()
// Save the initial state to storage so we have something to snapshot.
rd := <-n.Ready()
s.Append(rd.Entries)
n.Advance()
st := &storeRecorder{} st := &storeRecorder{}
p := &storageRecorder{} p := &storageRecorder{}
srv := &EtcdServer{ srv := &EtcdServer{

View File

@ -49,14 +49,19 @@ func newLog(storage Storage) *raftLog {
if storage == nil { if storage == nil {
panic("storage must not be nil") panic("storage must not be nil")
} }
log := &raftLog{
storage: storage,
}
lastIndex, err := storage.GetLastIndex() lastIndex, err := storage.GetLastIndex()
if err != nil { if err == ErrStorageEmpty {
// When starting from scratch populate the list with a dummy entry at term zero.
log.unstableEnts = make([]pb.Entry, 1)
} else if err == nil {
log.unstable = lastIndex + 1
} else {
panic(err) // TODO(bdarnell) panic(err) // TODO(bdarnell)
} }
return &raftLog{ return log
storage: storage,
unstable: lastIndex + 1,
}
} }
func (l *raftLog) load(ents []pb.Entry) { func (l *raftLog) load(ents []pb.Entry) {
@ -67,6 +72,7 @@ func (l *raftLog) load(ents []pb.Entry) {
} }
ms.ents = ents ms.ents = ents
l.unstable = ms.offset + uint64(len(ents)) l.unstable = ms.offset + uint64(len(ents))
l.unstableEnts = nil
} }
func (l *raftLog) String() string { func (l *raftLog) String() string {

View File

@ -91,7 +91,7 @@ func TestIsUpToDate(t *testing.T) {
} }
func TestAppend(t *testing.T) { func TestAppend(t *testing.T) {
previousEnts := []pb.Entry{{Term: 1}, {Term: 2}} previousEnts := []pb.Entry{{}, {Term: 1}, {Term: 2}}
tests := []struct { tests := []struct {
after uint64 after uint64
ents []pb.Entry ents []pb.Entry
@ -283,7 +283,7 @@ func TestCompactionSideEffects(t *testing.T) {
unstableIndex := uint64(750) unstableIndex := uint64(750)
lastTerm := lastIndex lastTerm := lastIndex
storage := NewMemoryStorage() storage := NewMemoryStorage()
for i = 1; i <= unstableIndex; i++ { for i = 0; i <= unstableIndex; i++ {
storage.Append([]pb.Entry{{Term: uint64(i), Index: uint64(i)}}) storage.Append([]pb.Entry{{Term: uint64(i), Index: uint64(i)}})
} }
raftLog := newLog(storage) raftLog := newLog(storage)
@ -337,21 +337,21 @@ func TestCompactionSideEffects(t *testing.T) {
} }
func TestUnstableEnts(t *testing.T) { func TestUnstableEnts(t *testing.T) {
previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}} previousEnts := []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}
tests := []struct { tests := []struct {
unstable uint64 unstable uint64
wents []pb.Entry wents []pb.Entry
wunstable uint64 wunstable uint64
}{ }{
{3, nil, 3}, {3, nil, 3},
{1, previousEnts, 3}, {1, previousEnts[1:], 3},
} }
for i, tt := range tests { for i, tt := range tests {
storage := NewMemoryStorage() storage := NewMemoryStorage()
storage.Append(previousEnts[:tt.unstable-1]) storage.Append(previousEnts[:tt.unstable])
raftLog := newLog(storage) raftLog := newLog(storage)
raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable-1:]...) raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable:]...)
ents := raftLog.unstableEntries() ents := raftLog.unstableEntries()
raftLog.stableTo(raftLog.lastIndex()) raftLog.stableTo(raftLog.lastIndex())
if !reflect.DeepEqual(ents, tt.wents) { if !reflect.DeepEqual(ents, tt.wents) {
@ -363,7 +363,7 @@ func TestUnstableEnts(t *testing.T) {
} }
} }
//TestCompaction ensures that the number of log entreis is correct after compactions. //TestCompaction ensures that the number of log entries is correct after compactions.
func TestCompaction(t *testing.T) { func TestCompaction(t *testing.T) {
tests := []struct { tests := []struct {
applied uint64 applied uint64
@ -391,7 +391,7 @@ func TestCompaction(t *testing.T) {
}() }()
storage := NewMemoryStorage() storage := NewMemoryStorage()
for i := uint64(0); i < tt.lastIndex; i++ { for i := uint64(0); i <= tt.lastIndex; i++ {
storage.Append([]pb.Entry{{}}) storage.Append([]pb.Entry{{}})
} }
raftLog := newLog(storage) raftLog := newLog(storage)

View File

@ -175,6 +175,7 @@ func TestNode(t *testing.T) {
SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader}, SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader},
HardState: raftpb.HardState{Term: 1, Commit: 2}, HardState: raftpb.HardState{Term: 1, Commit: 2},
Entries: []raftpb.Entry{ Entries: []raftpb.Entry{
{},
{Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
{Term: 1, Index: 2}, {Term: 1, Index: 2},
}, },
@ -195,7 +196,7 @@ func TestNode(t *testing.T) {
n.Campaign(ctx) n.Campaign(ctx)
g := <-n.Ready() g := <-n.Ready()
if !reflect.DeepEqual(g, wants[0]) { if !reflect.DeepEqual(g, wants[0]) {
t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
} else { } else {
storage.Append(g.Entries) storage.Append(g.Entries)
n.Advance() n.Advance()
@ -230,7 +231,9 @@ func TestNodeRestart(t *testing.T) {
CommittedEntries: entries[1 : st.Commit+1], CommittedEntries: entries[1 : st.Commit+1],
} }
n := RestartNode(1, 10, 1, nil, st, entries, NewMemoryStorage()) storage := NewMemoryStorage()
storage.Append(entries)
n := RestartNode(1, 10, 1, nil, st, nil, storage)
if g := <-n.Ready(); !reflect.DeepEqual(g, want) { if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
t.Errorf("g = %+v,\n w %+v", g, want) t.Errorf("g = %+v,\n w %+v", g, want)
} else { } else {

View File

@ -513,8 +513,9 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) {
{{Term: 1, Index: 1}}, {{Term: 1, Index: 1}},
} }
for i, tt := range tests { for i, tt := range tests {
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) storage := NewMemoryStorage()
r.loadEnts(append([]pb.Entry{{}}, tt...)) storage.Append(append([]pb.Entry{{}}, tt...))
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
r.loadState(pb.HardState{Term: 2}) r.loadState(pb.HardState{Term: 2})
r.becomeCandidate() r.becomeCandidate()
r.becomeLeader() r.becomeLeader()
@ -659,8 +660,9 @@ func TestFollowerAppendEntries(t *testing.T) {
}, },
} }
for i, tt := range tests { for i, tt := range tests {
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) storage := NewMemoryStorage()
r.loadEnts([]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}) storage.Append([]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}})
r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage)
r.becomeFollower(2, 2) r.becomeFollower(2, 2)
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents}) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents})

View File

@ -17,11 +17,16 @@
package raft package raft
import ( import (
"errors"
"sync" "sync"
pb "github.com/coreos/etcd/raft/raftpb" pb "github.com/coreos/etcd/raft/raftpb"
) )
// ErrStorageEmpty is returned by Storage.GetLastIndex when there is
// no data.
var ErrStorageEmpty = errors.New("storage is empty")
// Storage is an interface that may be implemented by the application // Storage is an interface that may be implemented by the application
// to retrieve log entries from storage. // to retrieve log entries from storage.
// //
@ -32,6 +37,7 @@ type Storage interface {
// GetEntries returns a slice of log entries in the range [lo,hi). // GetEntries returns a slice of log entries in the range [lo,hi).
GetEntries(lo, hi uint64) ([]pb.Entry, error) GetEntries(lo, hi uint64) ([]pb.Entry, error)
// GetLastIndex returns the index of the last entry in the log. // GetLastIndex returns the index of the last entry in the log.
// If the log is empty it returns ErrStorageEmpty.
GetLastIndex() (uint64, error) GetLastIndex() (uint64, error)
// GetFirstIndex returns the index of the first log entry that is // GetFirstIndex returns the index of the first log entry that is
// available via GetEntries (older entries have been incorporated // available via GetEntries (older entries have been incorporated
@ -58,10 +64,7 @@ type MemoryStorage struct {
// NewMemoryStorage creates an empty MemoryStorage. // NewMemoryStorage creates an empty MemoryStorage.
func NewMemoryStorage() *MemoryStorage { func NewMemoryStorage() *MemoryStorage {
return &MemoryStorage{ return &MemoryStorage{}
// Populate the list with a dummy entry at term zero.
ents: make([]pb.Entry, 1),
}
} }
// GetEntries implements the Storage interface. // GetEntries implements the Storage interface.
@ -75,6 +78,9 @@ func (ms *MemoryStorage) GetEntries(lo, hi uint64) ([]pb.Entry, error) {
func (ms *MemoryStorage) GetLastIndex() (uint64, error) { func (ms *MemoryStorage) GetLastIndex() (uint64, error) {
ms.Lock() ms.Lock()
defer ms.Unlock() defer ms.Unlock()
if len(ms.ents) == 0 {
return 0, ErrStorageEmpty
}
return ms.offset + uint64(len(ms.ents)) - 1, nil return ms.offset + uint64(len(ms.ents)) - 1, nil
} }