diff --git a/main.go b/main.go index bcb174ae4..3fbd5657d 100644 --- a/main.go +++ b/main.go @@ -110,7 +110,7 @@ func startRaft(id int64, peerIDs []int64, waldir string) (raft.Node, *wal.WAL) { // restart a node from previous wal // TODO(xiangli): check snapshot; not open from one - w, err := wal.OpenAtIndex(waldir, 1) + w, err := wal.OpenAtIndex(waldir, 0) if err != nil { log.Fatal(err) } diff --git a/raft/log.go b/raft/log.go index 07a7fd2d2..cbfa90ee4 100644 --- a/raft/log.go +++ b/raft/log.go @@ -27,7 +27,7 @@ type raftLog struct { func newLog() *raftLog { return &raftLog{ ents: make([]pb.Entry, 1), - unstable: 1, + unstable: 0, committed: 0, applied: 0, compactThreshold: defaultCompactThreshold, @@ -38,6 +38,11 @@ func (l *raftLog) isEmpty() bool { return l.offset == 0 && len(l.ents) == 1 } +func (l *raftLog) load(ents []pb.Entry) { + l.ents = ents + l.unstable = l.offset + int64(len(ents)) +} + func (l *raftLog) String() string { return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents)) } @@ -77,7 +82,7 @@ func (l *raftLog) findConflict(from int64, ents []pb.Entry) int64 { } func (l *raftLog) unstableEnts() []pb.Entry { - ents := l.entries(l.unstable) + ents := l.slice(l.unstable, l.lastIndex()+1) if ents == nil { return nil } diff --git a/raft/node_test.go b/raft/node_test.go index 7d33ad225..eb461611a 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -141,7 +141,7 @@ func TestNode(t *testing.T) { wants := []Ready{ { State: raftpb.State{Term: 1, Commit: 1, LastIndex: 1}, - Entries: []raftpb.Entry{{Term: 1, Index: 1}}, + Entries: []raftpb.Entry{{}, {Term: 1, Index: 1}}, CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}}, }, { @@ -171,6 +171,7 @@ func TestNode(t *testing.T) { func TestNodeRestart(t *testing.T) { entries := []raftpb.Entry{ + {}, {Term: 1, Index: 1}, {Term: 1, Index: 2, Data: []byte("foo")}, } @@ -179,7 +180,7 @@ func TestNodeRestart(t *testing.T) { want := Ready{ State: emptyState, // commit upto index commit index in st - CommittedEntries: entries[:st.Commit], + CommittedEntries: entries[1 : st.Commit+1], } n := Restart(1, []int64{1}, 0, 0, st, entries) diff --git a/raft/raft.go b/raft/raft.go index 56d6a057e..6189eb1d9 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -503,8 +503,7 @@ func (r *raft) loadEnts(ents []pb.Entry) { if !r.raftLog.isEmpty() { panic("cannot load entries when log is not empty") } - r.raftLog.append(0, ents...) - r.raftLog.unstable = r.raftLog.lastIndex() + 1 + r.raftLog.load(ents) } func (r *raft) loadState(state pb.State) { diff --git a/wal/doc.go b/wal/doc.go index 32803dbda..27bd5df93 100644 --- a/wal/doc.go +++ b/wal/doc.go @@ -32,15 +32,16 @@ WAL files are placed inside of the directory in the following format: $seq-$index.wal The first WAL file to be created will be 0000000000000000-0000000000000000.wal -indicating an initial sequence of 0 and an initial raft index of 0. +indicating an initial sequence of 0 and an initial raft index of 0. The first +entry written to WAL MUST have raft index 0. Periodically a user will want to "cut" the WAL and place new entries into a new file. This will increment an internal sequence number and cause a new file to be created. If the last raft index saved was 0x20 and this is the first time Cut has been called on this WAL then the sequence will increment from 0x0 to -0x1. The new file will be: 0000000000000001-0000000000000020.wal. If a second -Cut is issues 0x10 entries later then the file will be called: -0000000000000002-0000000000000030.wal. +0x1. The new file will be: 0000000000000001-0000000000000021.wal. If a second +Cut issues 0x10 entries with incremental index later then the file will be called: +0000000000000002-0000000000000031.wal. At a later time a WAL can be opened at a particular raft index: diff --git a/wal/wal.go b/wal/wal.go index 663104496..57c994103 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -75,7 +75,7 @@ func Create(dirpath string) (*WAL, error) { return nil, err } - p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 1)) + p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0)) f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { return nil, err diff --git a/wal/wal_test.go b/wal/wal_test.go index 2913086dc..57f7057f1 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -32,7 +32,7 @@ var ( infoData = []byte("\b\xef\xfd\x02") infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...) - firstWalName = "0000000000000000-0000000000000001.wal" + firstWalName = "0000000000000000-0000000000000000.wal" ) func TestNew(t *testing.T) { @@ -78,7 +78,7 @@ func TestOpenAtIndex(t *testing.T) { } f.Close() - w, err := OpenAtIndex(dir, 1) + w, err := OpenAtIndex(dir, 0) if err != nil { t.Fatalf("err = %v, want nil", err) } @@ -126,6 +126,10 @@ func TestCut(t *testing.T) { } defer w.Close() + // TODO(unihorn): remove this when cut can operate on an empty file + if err := w.SaveEntry(&raftpb.Entry{}); err != nil { + t.Fatal(err) + } if err := w.Cut(0); err != nil { t.Fatal(err) } @@ -162,7 +166,7 @@ func TestRecover(t *testing.T) { if err = w.SaveInfo(i); err != nil { t.Fatal(err) } - ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}} + ents := []raftpb.Entry{{Index: 0, Term: 0}, {Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}} for _, e := range ents { if err = w.SaveEntry(&e); err != nil { t.Fatal(err) @@ -176,7 +180,7 @@ func TestRecover(t *testing.T) { } w.Close() - if w, err = OpenAtIndex(p, 1); err != nil { + if w, err = OpenAtIndex(p, 0); err != nil { t.Fatal(err) } id, state, entries, err := w.ReadAll() @@ -279,6 +283,10 @@ func TestRecoverAfterCut(t *testing.T) { if err = w.SaveInfo(info); err != nil { t.Fatal(err) } + // TODO(unihorn): remove this when cut can operate on an empty file + if err = w.SaveEntry(&raftpb.Entry{}); err != nil { + t.Fatal(err) + } if err = w.Cut(0); err != nil { t.Fatal(err) } @@ -322,7 +330,7 @@ func TestRecoverAfterCut(t *testing.T) { } for j, e := range entries { if e.Index != int64(j+i) { - t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1) + t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i) } } }