diff --git a/etcdserver/server.go b/etcdserver/server.go index bcb4151c7..8f4858400 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -843,10 +843,6 @@ func (s *EtcdServer) snapshot(snapi uint64, confState *raftpb.ConfState) { } log.Panicf("etcdserver: unexpected compaction error %v", err) } - log.Printf("etcdserver: compacted log at index %d", snapi) - if err := s.r.storage.Cut(); err != nil { - log.Panicf("etcdserver: rotate wal file should never fail: %v", err) - } log.Printf("etcdserver: saved snapshot at index %d", snap.Metadata.Index) } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 7a906c8ca..e7e7016dc 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -720,15 +720,12 @@ func TestSnapshot(t *testing.T) { t.Errorf("action = %s, want Save", gaction[0]) } gaction = p.Action() - if len(gaction) != 2 { - t.Fatalf("len(action) = %d, want 2", len(gaction)) + if len(gaction) != 1 { + t.Fatalf("len(action) = %d, want 1", len(gaction)) } if !reflect.DeepEqual(gaction[0], testutil.Action{Name: "SaveSnap"}) { t.Errorf("action = %s, want SaveSnap", gaction[0]) } - if !reflect.DeepEqual(gaction[1], testutil.Action{Name: "Cut"}) { - t.Errorf("action = %s, want Cut", gaction[1]) - } } // Applied > SnapCount should trigger a SaveSnap event @@ -755,12 +752,12 @@ func TestTriggerSnap(t *testing.T) { gaction := p.Action() // each operation is recorded as a Save - // (SnapCount+1) * Puts + Cut + SaveSnap = (SnapCount+1) * Save + SaveSnap + CUT - wcnt := 3 + snapc + // (SnapCount+1) * Puts + SaveSnap = (SnapCount+1) * Save + SaveSnap + wcnt := 2 + snapc if len(gaction) != wcnt { t.Fatalf("len(action) = %d, want %d", len(gaction), wcnt) } - if !reflect.DeepEqual(gaction[wcnt-2], testutil.Action{Name: "SaveSnap"}) { + if !reflect.DeepEqual(gaction[wcnt-1], testutil.Action{Name: "SaveSnap"}) { t.Errorf("action = %s, want SaveSnap", gaction[wcnt-1]) } } @@ -1267,10 +1264,6 @@ func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error { p.Record(testutil.Action{Name: "Save"}) return nil } -func (p *storageRecorder) Cut() error { - p.Record(testutil.Action{Name: "Cut"}) - return nil -} func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error { if !raft.IsEmptySnap(st) { p.Record(testutil.Action{Name: "SaveSnap"}) diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 1e7536f89..69c293092 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -35,11 +35,6 @@ type Storage interface { Save(st raftpb.HardState, ents []raftpb.Entry) error // SaveSnap function saves snapshot to the underlying stable storage. SaveSnap(snap raftpb.Snapshot) error - - // TODO: WAL should be able to control cut itself. After implement self-controlled cut, - // remove it in this interface. - // Cut cuts out a new wal file for saving new state and entries. - Cut() error // Close closes the Storage and performs finalization. Close() error } diff --git a/wal/doc.go b/wal/doc.go index 15911c182..769b522f0 100644 --- a/wal/doc.go +++ b/wal/doc.go @@ -17,7 +17,7 @@ Package wal provides an implementation of a write ahead log that is used by etcd. A WAL is created at a particular directory and is made up of a number of -discrete WAL files. Inside of each file the raft state and entries are appended +segmented WAL files. Inside of each file the raft state and entries are appended to it with the Save method: metadata := []byte{} @@ -41,12 +41,11 @@ The first WAL file to be created will be 0000000000000000-0000000000000000.wal indicating an initial sequence of 0 and an initial raft index of 0. The first entry written to WAL MUST have raft index 0. -Periodically a user will want to "cut" the WAL and place new entries into a new -file. This will increment an internal sequence number and cause a new file to -be created. If the last raft index saved was 0x20 and this is the first time -Cut has been called on this WAL then the sequence will increment from 0x0 to -0x1. The new file will be: 0000000000000001-0000000000000021.wal. If a second -Cut issues 0x10 entries with incremental index later then the file will be called: +WAL will cuts its current wal files if its size exceeds 8MB. This will increment an internal +sequence number and cause a new file to be created. If the last raft index saved +was 0x20 and this is the first time cut has been called on this WAL then the sequence will +increment from 0x0 to 0x1. The new file will be: 0000000000000001-0000000000000021.wal. +If a second cut issues 0x10 entries with incremental index later then the file will be called: 0000000000000002-0000000000000031.wal. At a later time a WAL can be opened at a particular snapshot. If there is no diff --git a/wal/wal.go b/wal/wal.go index b5e3ac84b..73c95e906 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -41,6 +41,10 @@ const ( // the owner can make/remove files inside the directory privateDirMode = 0700 + + // the expected size of each wal segment file. + // the actual size might be bigger than it. + segmentSizeBytes = 64 * 1000 * 1000 // 64MB ) var ( @@ -274,14 +278,15 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. return metadata, state, ents, err } -// Cut closes current file written and creates a new one ready to append. -func (w *WAL) Cut() error { +// cut closes current file written and creates a new one ready to append. +func (w *WAL) cut() error { // create a new wal file with name sequence + 1 fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1)) f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600) if err != nil { return err } + log.Printf("wal: segmented wal file %v is created", fpath) l, err := fileutil.NewLock(f.Name()) if err != nil { return err @@ -402,7 +407,16 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { return err } } - return w.sync() + + fstat, err := w.f.Stat() + if err != nil { + return err + } + if fstat.Size() < segmentSizeBytes { + return w.sync() + } + // TODO: add a test for this code path when refactoring the tests + return w.cut() } func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { diff --git a/wal/wal_test.go b/wal/wal_test.go index b59265c92..76d7c5414 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -156,7 +156,7 @@ func TestCut(t *testing.T) { if err := w.Save(state, []raftpb.Entry{{}}); err != nil { t.Fatal(err) } - if err := w.Cut(); err != nil { + if err := w.cut(); err != nil { t.Fatal(err) } wname := walName(1, 1) @@ -168,7 +168,7 @@ func TestCut(t *testing.T) { if err := w.Save(raftpb.HardState{}, es); err != nil { t.Fatal(err) } - if err := w.Cut(); err != nil { + if err := w.cut(); err != nil { t.Fatal(err) } snap := walpb.Snapshot{Index: 2, Term: 1} @@ -335,7 +335,7 @@ func TestRecoverAfterCut(t *testing.T) { if err = md.Save(raftpb.HardState{}, es); err != nil { t.Fatal(err) } - if err = md.Cut(); err != nil { + if err = md.cut(); err != nil { t.Fatal(err) } } @@ -427,7 +427,7 @@ func TestOpenNotInUse(t *testing.T) { if err = w.Save(raftpb.HardState{}, es); err != nil { t.Fatal(err) } - if err = w.Cut(); err != nil { + if err = w.cut(); err != nil { t.Fatal(err) } }