diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 8b505a056..772ba2079 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -838,6 +838,12 @@ func TestSnapshot(t *testing.T) { s := raft.NewMemoryStorage() n := raft.StartNode(0xBAD0, mustMakePeerSlice(t, 0xBAD0), 10, 1, s) defer n.Stop() + + // Save the initial state to storage so we have something to snapshot. + rd := <-n.Ready() + s.Append(rd.Entries) + n.Advance() + st := &storeRecorder{} p := &storageRecorder{} srv := &EtcdServer{ diff --git a/raft/log.go b/raft/log.go index c2b950430..c1dcdbbc2 100644 --- a/raft/log.go +++ b/raft/log.go @@ -49,14 +49,19 @@ func newLog(storage Storage) *raftLog { if storage == nil { panic("storage must not be nil") } + log := &raftLog{ + storage: storage, + } lastIndex, err := storage.GetLastIndex() - if err != nil { + if err == ErrStorageEmpty { + // When starting from scratch populate the list with a dummy entry at term zero. + log.unstableEnts = make([]pb.Entry, 1) + } else if err == nil { + log.unstable = lastIndex + 1 + } else { panic(err) // TODO(bdarnell) } - return &raftLog{ - storage: storage, - unstable: lastIndex + 1, - } + return log } func (l *raftLog) load(ents []pb.Entry) { @@ -67,6 +72,7 @@ func (l *raftLog) load(ents []pb.Entry) { } ms.ents = ents l.unstable = ms.offset + uint64(len(ents)) + l.unstableEnts = nil } func (l *raftLog) String() string { diff --git a/raft/log_test.go b/raft/log_test.go index 5987cfb88..4965c8054 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -91,7 +91,7 @@ func TestIsUpToDate(t *testing.T) { } func TestAppend(t *testing.T) { - previousEnts := []pb.Entry{{Term: 1}, {Term: 2}} + previousEnts := []pb.Entry{{}, {Term: 1}, {Term: 2}} tests := []struct { after uint64 ents []pb.Entry @@ -283,7 +283,7 @@ func TestCompactionSideEffects(t *testing.T) { unstableIndex := uint64(750) lastTerm := lastIndex storage := NewMemoryStorage() - for i = 1; i <= unstableIndex; i++ { + for i = 0; i <= unstableIndex; i++ { storage.Append([]pb.Entry{{Term: uint64(i), Index: uint64(i)}}) } raftLog := newLog(storage) @@ -337,21 +337,21 @@ func TestCompactionSideEffects(t *testing.T) { } func TestUnstableEnts(t *testing.T) { - previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}} + previousEnts := []pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}} tests := []struct { unstable uint64 wents []pb.Entry wunstable uint64 }{ {3, nil, 3}, - {1, previousEnts, 3}, + {1, previousEnts[1:], 3}, } for i, tt := range tests { storage := NewMemoryStorage() - storage.Append(previousEnts[:tt.unstable-1]) + storage.Append(previousEnts[:tt.unstable]) raftLog := newLog(storage) - raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable-1:]...) + raftLog.append(raftLog.lastIndex(), previousEnts[tt.unstable:]...) ents := raftLog.unstableEntries() raftLog.stableTo(raftLog.lastIndex()) if !reflect.DeepEqual(ents, tt.wents) { @@ -363,7 +363,7 @@ func TestUnstableEnts(t *testing.T) { } } -//TestCompaction ensures that the number of log entreis is correct after compactions. +//TestCompaction ensures that the number of log entries is correct after compactions. func TestCompaction(t *testing.T) { tests := []struct { applied uint64 @@ -391,7 +391,7 @@ func TestCompaction(t *testing.T) { }() storage := NewMemoryStorage() - for i := uint64(0); i < tt.lastIndex; i++ { + for i := uint64(0); i <= tt.lastIndex; i++ { storage.Append([]pb.Entry{{}}) } raftLog := newLog(storage) diff --git a/raft/node_test.go b/raft/node_test.go index 1d10f2d1c..ea6827716 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -175,6 +175,7 @@ func TestNode(t *testing.T) { SoftState: &SoftState{Lead: 1, Nodes: []uint64{1}, RaftState: StateLeader}, HardState: raftpb.HardState{Term: 1, Commit: 2}, Entries: []raftpb.Entry{ + {}, {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, {Term: 1, Index: 2}, }, @@ -195,7 +196,7 @@ func TestNode(t *testing.T) { n.Campaign(ctx) g := <-n.Ready() if !reflect.DeepEqual(g, wants[0]) { - t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) + t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) } else { storage.Append(g.Entries) n.Advance() @@ -230,7 +231,9 @@ func TestNodeRestart(t *testing.T) { CommittedEntries: entries[1 : st.Commit+1], } - n := RestartNode(1, 10, 1, nil, st, entries, NewMemoryStorage()) + storage := NewMemoryStorage() + storage.Append(entries) + n := RestartNode(1, 10, 1, nil, st, nil, storage) if g := <-n.Ready(); !reflect.DeepEqual(g, want) { t.Errorf("g = %+v,\n w %+v", g, want) } else { diff --git a/raft/raft_paper_test.go b/raft/raft_paper_test.go index efbdac192..3a54d1584 100644 --- a/raft/raft_paper_test.go +++ b/raft/raft_paper_test.go @@ -513,8 +513,9 @@ func TestLeaderCommitPrecedingEntries(t *testing.T) { {{Term: 1, Index: 1}}, } for i, tt := range tests { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - r.loadEnts(append([]pb.Entry{{}}, tt...)) + storage := NewMemoryStorage() + storage.Append(append([]pb.Entry{{}}, tt...)) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage) r.loadState(pb.HardState{Term: 2}) r.becomeCandidate() r.becomeLeader() @@ -659,8 +660,9 @@ func TestFollowerAppendEntries(t *testing.T) { }, } for i, tt := range tests { - r := newRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage()) - r.loadEnts([]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}) + storage := NewMemoryStorage() + storage.Append([]pb.Entry{{}, {Term: 1, Index: 1}, {Term: 2, Index: 2}}) + r := newRaft(1, []uint64{1, 2, 3}, 10, 1, storage) r.becomeFollower(2, 2) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents}) diff --git a/raft/storage.go b/raft/storage.go index d559d95ac..7733b2ba0 100644 --- a/raft/storage.go +++ b/raft/storage.go @@ -17,11 +17,16 @@ package raft import ( + "errors" "sync" pb "github.com/coreos/etcd/raft/raftpb" ) +// ErrStorageEmpty is returned by Storage.GetLastIndex when there is +// no data. +var ErrStorageEmpty = errors.New("storage is empty") + // Storage is an interface that may be implemented by the application // to retrieve log entries from storage. // @@ -32,6 +37,7 @@ type Storage interface { // GetEntries returns a slice of log entries in the range [lo,hi). GetEntries(lo, hi uint64) ([]pb.Entry, error) // GetLastIndex returns the index of the last entry in the log. + // If the log is empty it returns ErrStorageEmpty. GetLastIndex() (uint64, error) // GetFirstIndex returns the index of the first log entry that is // available via GetEntries (older entries have been incorporated @@ -58,10 +64,7 @@ type MemoryStorage struct { // NewMemoryStorage creates an empty MemoryStorage. func NewMemoryStorage() *MemoryStorage { - return &MemoryStorage{ - // Populate the list with a dummy entry at term zero. - ents: make([]pb.Entry, 1), - } + return &MemoryStorage{} } // GetEntries implements the Storage interface. @@ -75,6 +78,9 @@ func (ms *MemoryStorage) GetEntries(lo, hi uint64) ([]pb.Entry, error) { func (ms *MemoryStorage) GetLastIndex() (uint64, error) { ms.Lock() defer ms.Unlock() + if len(ms.ents) == 0 { + return 0, ErrStorageEmpty + } return ms.offset + uint64(len(ms.ents)) - 1, nil }