From 8b4eed29e5b0f808ea792e169ff5632bd95b962b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 28 Mar 2015 10:33:52 -0700 Subject: [PATCH 1/5] wal: fix the unexpectedEOF error in the last wal. It is safe to repair the unexpectedEOF error in the last wal. raft will not send out message before the entry successfully comitted into wal. Thus we can safely truncate the last entry in the wal to repair. --- etcdserver/storage.go | 30 ++++++++++---- wal/repair.go | 73 ++++++++++++++++++++++++++++++++++ wal/repair_test.go | 91 +++++++++++++++++++++++++++++++++++++++++++ wal/wal.go | 6 +-- 4 files changed, 190 insertions(+), 10 deletions(-) create mode 100644 wal/repair.go create mode 100644 wal/repair_test.go diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 36c2923cb..24603a6c3 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -15,6 +15,7 @@ package etcdserver import ( + "io" "log" "os" "path" @@ -72,13 +73,28 @@ func (st *storage) SaveSnap(snap raftpb.Snapshot) error { } func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) { - var err error - if w, err = wal.Open(waldir, snap); err != nil { - log.Fatalf("etcdserver: open wal error: %v", err) - } - var wmetadata []byte - if wmetadata, st, ents, err = w.ReadAll(); err != nil { - log.Fatalf("etcdserver: read wal error: %v", err) + var ( + err error + wmetadata []byte + ) + + for i := 0; i < 2; i++ { + if w, err = wal.Open(waldir, snap); err != nil { + log.Fatalf("etcdserver: open wal error: %v", err) + } + if wmetadata, st, ents, err = w.ReadAll(); err != nil { + w.Close() + if i != 0 || err != io.ErrUnexpectedEOF { + log.Fatalf("etcdserver: read wal error: %v", err) + } + if !wal.Repair(waldir) { + log.Fatalf("etcdserver: WAL error (%v) cannot be repaired", err) + } else { + log.Printf("etcdserver: repaired WAL error (%v)", err) + } + continue + } + break } var metadata pb.Metadata pbutil.MustUnmarshal(&metadata, wmetadata) diff --git a/wal/repair.go b/wal/repair.go new file mode 100644 index 000000000..846bb0127 --- /dev/null +++ b/wal/repair.go @@ -0,0 +1,73 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "io" + "os" + "path" + + "github.com/coreos/etcd/pkg/fileutil" + "github.com/coreos/etcd/wal/walpb" +) + +// Repair tries to repair the unexpectedEOF error in the +// last wal file by truncating. +func Repair(dirpath string) bool { + f, err := openLast(dirpath) + if err != nil { + return false + } + + n := 0 + rec := &walpb.Record{} + + decoder := newDecoder(f) + defer decoder.close() + for { + err := decoder.decode(rec) + switch err { + case nil: + n += 8 + rec.Size() + continue + case io.EOF: + return true + case io.ErrUnexpectedEOF: + err = f.Truncate(int64(n)) + if err != nil { + return false + } + err = f.Sync() + if err != nil { + return false + } + return true + } + } +} + +// openLast opens the last wal file for read and write. +func openLast(dirpath string) (*os.File, error) { + names, err := fileutil.ReadDir(dirpath) + if err != nil { + return nil, err + } + names = checkWalNames(names) + if len(names) == 0 { + return nil, ErrFileNotFound + } + last := path.Join(dirpath, names[len(names)-1]) + return os.OpenFile(last, os.O_RDWR, 0) +} diff --git a/wal/repair_test.go b/wal/repair_test.go new file mode 100644 index 000000000..38c03b0c9 --- /dev/null +++ b/wal/repair_test.go @@ -0,0 +1,91 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "io" + "io/ioutil" + "os" + "testing" + + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/wal/walpb" +) + +func TestRepair(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + // create WAL + w, err := Create(p, nil) + defer w.Close() + if err != nil { + t.Fatal(err) + } + + n := 10 + for i := 1; i <= n; i++ { + es := []raftpb.Entry{{Index: uint64(i)}} + if err = w.Save(raftpb.HardState{}, es); err != nil { + t.Fatal(err) + } + } + w.Close() + + // break the wal. + f, err := openLast(p) + if err != nil { + t.Fatal(err) + } + offset, err := f.Seek(-4, os.SEEK_END) + if err != nil { + t.Fatal(err) + } + err = f.Truncate(offset) + if err != nil { + t.Fatal(err) + } + + // verify we have broke the wal + w, err = Open(p, walpb.Snapshot{}) + if err != nil { + t.Fatal(err) + } + _, _, _, err = w.ReadAll() + if err != io.ErrUnexpectedEOF { + t.Fatalf("err = %v, want %v", err, io.ErrUnexpectedEOF) + } + w.Close() + + // repair the wal + ok := Repair(p) + if !ok { + t.Fatalf("fix = %t, want %t", ok, true) + } + + w, err = Open(p, walpb.Snapshot{}) + if err != nil { + t.Fatal(err) + } + _, _, ents, err := w.ReadAll() + if err != nil { + t.Fatalf("err = %v, want %v", err, nil) + } + if len(ents) != n-1 { + t.Fatalf("len(ents) = %d, want %d", len(ents), n-1) + } +} diff --git a/wal/wal.go b/wal/wal.go index ecc809446..d242647ee 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -414,14 +414,14 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { } // TODO(xiangli): no more reference operator - if err := w.saveState(&st); err != nil { - return err - } for i := range ents { if err := w.saveEntry(&ents[i]); err != nil { return err } } + if err := w.saveState(&st); err != nil { + return err + } fstat, err := w.f.Stat() if err != nil { From 1231f82f2243d951be8bcfade00c87779f71e471 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 29 Mar 2015 08:36:26 -0700 Subject: [PATCH 2/5] etcdserver: save snapshot into wal first --- etcdserver/storage.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 24603a6c3..6f278d387 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -53,15 +53,15 @@ func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage { // SaveSnap saves the snapshot to disk and release the locked // wal files since they will not be used. func (st *storage) SaveSnap(snap raftpb.Snapshot) error { - err := st.Snapshotter.SaveSnap(snap) - if err != nil { - return err - } walsnap := walpb.Snapshot{ Index: snap.Metadata.Index, Term: snap.Metadata.Term, } - err = st.WAL.SaveSnapshot(walsnap) + err := st.WAL.SaveSnapshot(walsnap) + if err != nil { + return err + } + err = st.Snapshotter.SaveSnap(snap) if err != nil { return err } From 684ebd95ae46b7d11dbe4b52dd175a556d906978 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 29 Mar 2015 15:42:59 -0700 Subject: [PATCH 3/5] wal: backup broken wal before repairing --- wal/repair.go | 28 ++++++++++++++++++++++++---- wal/util.go | 11 ++++++++++- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/wal/repair.go b/wal/repair.go index 846bb0127..0f2c70216 100644 --- a/wal/repair.go +++ b/wal/repair.go @@ -16,6 +16,7 @@ package wal import ( "io" + "log" "os" "path" @@ -30,6 +31,7 @@ func Repair(dirpath string) bool { if err != nil { return false } + defer f.Close() n := 0 rec := &walpb.Record{} @@ -45,12 +47,30 @@ func Repair(dirpath string) bool { case io.EOF: return true case io.ErrUnexpectedEOF: - err = f.Truncate(int64(n)) - if err != nil { + log.Printf("wal: repairing %v", f.Name()) + bf, bferr := os.Create(f.Name() + ".broken") + if bferr != nil { + log.Printf("wal: could not repair %v, failed to create backup file", f.Name()) return false } - err = f.Sync() - if err != nil { + defer bf.Close() + + if _, err = f.Seek(0, os.SEEK_SET); err != nil { + log.Printf("wal: could not repair %v, failed to read file", f.Name()) + return false + } + + if _, err = io.Copy(bf, f); err != nil { + log.Printf("wal: could not repair %v, failed to copy file", f.Name()) + return false + } + + if err = f.Truncate(int64(n)); err != nil { + log.Printf("wal: could not repair %v, failed to truncate file", f.Name()) + return false + } + if err = f.Sync(); err != nil { + log.Printf("wal: could not repair %v, failed to sync file", f.Name()) return false } return true diff --git a/wal/util.go b/wal/util.go index c80fbdef2..b8b292fe7 100644 --- a/wal/util.go +++ b/wal/util.go @@ -15,12 +15,18 @@ package wal import ( + "errors" "fmt" "log" + "strings" "github.com/coreos/etcd/pkg/fileutil" ) +var ( + badWalName = errors.New("bad wal name") +) + func Exist(dirpath string) bool { names, err := fileutil.ReadDir(dirpath) if err != nil { @@ -76,8 +82,11 @@ func checkWalNames(names []string) []string { } func parseWalName(str string) (seq, index uint64, err error) { + if !strings.HasSuffix(str, ".wal") { + return 0, 0, badWalName + } _, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index) - return + return seq, index, err } func walName(seq, index uint64) string { From 0b9a318e68e3843e25d05250057e927bda07cf44 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 29 Mar 2015 21:05:26 -0700 Subject: [PATCH 4/5] etcdserver: make the wal repairing logic clear --- etcdserver/storage.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/etcdserver/storage.go b/etcdserver/storage.go index 6f278d387..998d4a399 100644 --- a/etcdserver/storage.go +++ b/etcdserver/storage.go @@ -78,19 +78,22 @@ func readWAL(waldir string, snap walpb.Snapshot) (w *wal.WAL, id, cid types.ID, wmetadata []byte ) - for i := 0; i < 2; i++ { + repaired := false + for { if w, err = wal.Open(waldir, snap); err != nil { log.Fatalf("etcdserver: open wal error: %v", err) } if wmetadata, st, ents, err = w.ReadAll(); err != nil { w.Close() - if i != 0 || err != io.ErrUnexpectedEOF { - log.Fatalf("etcdserver: read wal error: %v", err) + // we can only repair ErrUnexpectedEOF and we never repair twice. + if repaired || err != io.ErrUnexpectedEOF { + log.Fatalf("etcdserver: read wal error (%v) and cannot be repaired", err) } if !wal.Repair(waldir) { log.Fatalf("etcdserver: WAL error (%v) cannot be repaired", err) } else { log.Printf("etcdserver: repaired WAL error (%v)", err) + repaired = true } continue } From 3e9a033cd2086baa6caf1b1d73527b5a8fc823a5 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 30 Mar 2015 10:49:05 -0700 Subject: [PATCH 5/5] wal: repair decoder needs to update its crc --- wal/repair.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/wal/repair.go b/wal/repair.go index 0f2c70216..979df647b 100644 --- a/wal/repair.go +++ b/wal/repair.go @@ -43,6 +43,17 @@ func Repair(dirpath string) bool { switch err { case nil: n += 8 + rec.Size() + // update crc of the decoder when necessary + switch rec.Type { + case crcType: + crc := decoder.crc.Sum32() + // current crc of decoder must match the crc of the record. + // do no need to match 0 crc, since the decoder is a new one at this case. + if crc != 0 && rec.Validate(crc) != nil { + return false + } + decoder.updateCRC(rec.Crc) + } continue case io.EOF: return true @@ -74,6 +85,9 @@ func Repair(dirpath string) bool { return false } return true + default: + log.Printf("wal: could not repair error (%v)", err) + return false } } }