diff --git a/wal/doc.go b/wal/doc.go index 27bd5df93..a337e4f25 100644 --- a/wal/doc.go +++ b/wal/doc.go @@ -48,6 +48,10 @@ At a later time a WAL can be opened at a particular raft index: w, err := wal.OpenAtIndex("/var/lib/etcd", 0) ... +The raft index must have been written to the WAL. When opening without a +snapshot the raft index should always be 0. When opening with a snapshot +the raft index should be the index of the last entry covered by the snapshot. + Additional items cannot be Saved to this WAL until all of the items from 0 to the end of the WAL are read first: diff --git a/wal/util.go b/wal/util.go index d2de12369..9bf18bc0f 100644 --- a/wal/util.go +++ b/wal/util.go @@ -83,6 +83,10 @@ func parseWalName(str string) (seq, index int64, err error) { return } +func walName(seq, index int64) string { + return fmt.Sprintf("%016x-%016x.wal", seq, index) +} + func max(a, b int64) int64 { if a > b { return a diff --git a/wal/wal.go b/wal/wal.go index 57c994103..d369a23c9 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -60,7 +60,8 @@ type WAL struct { decoder *decoder // decoder to decode records f *os.File // underlay file opened for appending, sync - seq int64 // current sequence of the wal file + seq int64 // sequence of the wal file currently used for writes + enti int64 // index of the last entry saved to the wal encoder *encoder // encoder to encode records } @@ -75,7 +76,7 @@ func Create(dirpath string) (*WAL, error) { return nil, err } - p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0)) + p := path.Join(dirpath, walName(0, 0)) f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { return nil, err @@ -93,6 +94,7 @@ func Create(dirpath string) (*WAL, error) { } // OpenAtIndex opens the WAL at the given index. +// The index MUST have been previously committed to the WAL. // The returned WAL is ready to read and the first record will be the given // index. The WAL cannot be appended to before reading out all of its // previous records. @@ -126,6 +128,11 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) { rc := MultiReadCloser(rcs...) // open the lastest wal file for appending + seq, _, err := parseWalName(names[len(names)-1]) + if err != nil { + rc.Close() + return nil, err + } last := path.Join(dirpath, names[len(names)-1]) f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0) if err != nil { @@ -138,7 +145,8 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) { ri: index, decoder: newDecoder(rc), - f: f, + f: f, + seq: seq, } return w, nil } @@ -156,6 +164,7 @@ func (w *WAL) ReadAll() (id int64, state raftpb.State, ents []raftpb.Entry, err if e.Index >= w.ri { ents = append(ents[:e.Index-w.ri], e) } + w.enti = e.Index case stateType: state = mustUnmarshalState(rec.Data) case infoType: @@ -194,21 +203,19 @@ func (w *WAL) ReadAll() (id int64, state raftpb.State, ents []raftpb.Entry, err return id, state, ents, nil } -// index should be the index of last log entry. // Cut closes current file written and creates a new one ready to append. -func (w *WAL) Cut(index int64) error { - log.Printf("wal.cut index=%d", index) - +func (w *WAL) Cut() error { // create a new wal file with name sequence + 1 - fpath := path.Join(w.dir, fmt.Sprintf("%016x-%016x.wal", w.seq+1, index+1)) + fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1)) f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { return err } - w.Sync() w.f.Close() + log.Printf("wal.cut index=%d prevfile=%s curfile=%s", w.enti, w.f.Name(), f.Name()) + // update writer and save the previous crc w.f = f w.seq++ @@ -250,7 +257,11 @@ func (w *WAL) SaveEntry(e *raftpb.Entry) error { panic(err) } rec := &walpb.Record{Type: entryType, Data: b} - return w.encoder.encode(rec) + if err := w.encoder.encode(rec); err != nil { + return err + } + w.enti = e.Index + return nil } func (w *WAL) SaveState(s *raftpb.State) error { diff --git a/wal/wal_test.go b/wal/wal_test.go index 57f7057f1..49580cf14 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -18,7 +18,6 @@ package wal import ( "bytes" - "fmt" "io/ioutil" "os" "path" @@ -31,8 +30,6 @@ import ( var ( infoData = []byte("\b\xef\xfd\x02") infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...) - - firstWalName = "0000000000000000-0000000000000000.wal" ) func TestNew(t *testing.T) { @@ -46,8 +43,8 @@ func TestNew(t *testing.T) { if err != nil { t.Fatalf("err = %v, want nil", err) } - if g := path.Base(w.f.Name()); g != firstWalName { - t.Errorf("name = %+v, want %+v", g, firstWalName) + if g := path.Base(w.f.Name()); g != walName(0, 0) { + t.Errorf("name = %+v, want %+v", g, walName(0, 0)) } w.Close() } @@ -59,7 +56,7 @@ func TestNewForInitedDir(t *testing.T) { } defer os.RemoveAll(p) - os.Create(path.Join(p, firstWalName)) + os.Create(path.Join(p, walName(0, 0))) if _, err = Create(p); err == nil || err != os.ErrExist { t.Errorf("err = %v, want %v", err, os.ErrExist) } @@ -72,7 +69,7 @@ func TestOpenAtIndex(t *testing.T) { } defer os.RemoveAll(dir) - f, err := os.Create(path.Join(dir, firstWalName)) + f, err := os.Create(path.Join(dir, walName(0, 0))) if err != nil { t.Fatal(err) } @@ -82,12 +79,15 @@ func TestOpenAtIndex(t *testing.T) { if err != nil { t.Fatalf("err = %v, want nil", err) } - if g := path.Base(w.f.Name()); g != firstWalName { - t.Errorf("name = %+v, want %+v", g, firstWalName) + if g := path.Base(w.f.Name()); g != walName(0, 0) { + t.Errorf("name = %+v, want %+v", g, walName(0, 0)) + } + if w.seq != 0 { + t.Errorf("seq = %d, want %d", w.seq, 0) } w.Close() - wname := fmt.Sprintf("%016x-%016x.wal", 2, 10) + wname := walName(2, 10) f, err = os.Create(path.Join(dir, wname)) if err != nil { t.Fatal(err) @@ -101,6 +101,9 @@ func TestOpenAtIndex(t *testing.T) { if g := path.Base(w.f.Name()); g != wname { t.Errorf("name = %+v, want %+v", g, wname) } + if w.seq != 2 { + t.Errorf("seq = %d, want %d", w.seq, 2) + } w.Close() emptydir, err := ioutil.TempDir(os.TempDir(), "waltestempty") @@ -130,10 +133,10 @@ func TestCut(t *testing.T) { if err := w.SaveEntry(&raftpb.Entry{}); err != nil { t.Fatal(err) } - if err := w.Cut(0); err != nil { + if err := w.Cut(); err != nil { t.Fatal(err) } - wname := fmt.Sprintf("%016x-%016x.wal", 1, 1) + wname := walName(1, 1) if g := path.Base(w.f.Name()); g != wname { t.Errorf("name = %s, want %s", g, wname) } @@ -142,10 +145,10 @@ func TestCut(t *testing.T) { if err := w.SaveEntry(e); err != nil { t.Fatal(err) } - if err := w.Cut(1); err != nil { + if err := w.Cut(); err != nil { t.Fatal(err) } - wname = fmt.Sprintf("%016x-%016x.wal", 2, 2) + wname = walName(2, 2) if g := path.Base(w.f.Name()); g != wname { t.Errorf("name = %s, want %s", g, wname) } @@ -287,7 +290,7 @@ func TestRecoverAfterCut(t *testing.T) { if err = w.SaveEntry(&raftpb.Entry{}); err != nil { t.Fatal(err) } - if err = w.Cut(0); err != nil { + if err = w.Cut(); err != nil { t.Fatal(err) } for i := 1; i < 10; i++ { @@ -295,7 +298,7 @@ func TestRecoverAfterCut(t *testing.T) { if err = w.SaveEntry(&e); err != nil { t.Fatal(err) } - if err = w.Cut(e.Index); err != nil { + if err = w.Cut(); err != nil { t.Fatal(err) } if err = w.SaveInfo(info); err != nil { @@ -304,7 +307,7 @@ func TestRecoverAfterCut(t *testing.T) { } w.Close() - if err := os.Remove(path.Join(p, "0000000000000004-0000000000000004.wal")); err != nil { + if err := os.Remove(path.Join(p, walName(4, 4))); err != nil { t.Fatal(err) }