diff --git a/server/storage/wal/decoder.go b/server/storage/wal/decoder.go index e1864cdca..11fb3985f 100644 --- a/server/storage/wal/decoder.go +++ b/server/storage/wal/decoder.go @@ -40,6 +40,11 @@ type decoder struct { // lastValidOff file offset following the last valid decoded record lastValidOff int64 crc hash.Hash32 + + // continueOnCrcError - causes the decoder to continue working even in case of crc mismatch. + // This is a desired mode for tools performing inspection of the corrupted WAL logs. + // See comments on 'decode' method for semantic. + continueOnCrcError bool } func newDecoder(r ...fileutil.FileReader) *decoder { @@ -53,6 +58,11 @@ func newDecoder(r ...fileutil.FileReader) *decoder { } } +// decode reads the next record out of the file. +// In the success path, fills 'rec' and returns nil. +// When it fails, it returns err and usually resets 'rec' to the defaults. +// When continueOnCrcError is set, the method may return ErrUnexpectedEOF or ErrCRCMismatch, but preserve the read +// (potentially corrupted) record content. func (d *decoder) decode(rec *walpb.Record) error { rec.Reset() d.mu.Lock() @@ -108,6 +118,13 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error { if rec.Type != crcType { d.crc.Write(rec.Data) if err := rec.Validate(d.crc.Sum32()); err != nil { + if !d.continueOnCrcError { + rec.Reset() + } else { + // If we continue, we want to update lastValidOff, such that following errors are consistent + defer func() { d.lastValidOff += frameSizeBytes + recBytes + padBytes }() + } + if d.isTornEntry(data) { return fmt.Errorf("%w: in file '%s' at position: %d", io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), d.lastValidOff) } diff --git a/server/storage/wal/repair.go b/server/storage/wal/repair.go index e81ac8ddd..0cb1be892 100644 --- a/server/storage/wal/repair.go +++ b/server/storage/wal/repair.go @@ -15,6 +15,7 @@ package wal import ( + "errors" "io" "os" "path/filepath" @@ -45,8 +46,8 @@ func Repair(lg *zap.Logger, dirpath string) bool { for { lastOffset := decoder.lastOffset() err := decoder.decode(rec) - switch err { - case nil: + switch { + case err == nil: // update crc of the decoder when necessary switch rec.Type { case crcType: @@ -60,11 +61,11 @@ func Repair(lg *zap.Logger, dirpath string) bool { } continue - case io.EOF: + case errors.Is(err, io.EOF): lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF)) return true - case io.ErrUnexpectedEOF: + case errors.Is(err, io.ErrUnexpectedEOF): brokenName := f.Name() + ".broken" bf, bferr := os.Create(brokenName) if bferr != nil { diff --git a/server/storage/wal/repair_test.go b/server/storage/wal/repair_test.go index 864b56da6..0c5da51aa 100644 --- a/server/storage/wal/repair_test.go +++ b/server/storage/wal/repair_test.go @@ -16,6 +16,8 @@ package wal import ( "fmt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "io" "os" "testing" @@ -43,86 +45,59 @@ func TestRepairTruncate(t *testing.T) { } func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expectedEnts int) { + lg := zaptest.NewLogger(t) p := t.TempDir() // create WAL - w, err := Create(zaptest.NewLogger(t), p, nil) + w, err := Create(lg, p, nil) defer func() { - if err = w.Close(); err != nil { - t.Fatal(err) - } + // The Close might fail. + _ = w.Close() }() - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for _, es := range ents { - if err = w.Save(raftpb.HardState{}, es); err != nil { - t.Fatal(err) - } + assert.NoError(t, w.Save(raftpb.HardState{}, es)) } offset, err := w.tail().Seek(0, io.SeekCurrent) - if err != nil { - t.Fatal(err) - } - w.Close() + require.NoError(t, err) + require.NoError(t, w.Close()) - err = corrupt(p, offset) - if err != nil { - t.Fatal(err) - } + require.NoError(t, corrupt(p, offset)) // verify we broke the wal w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{}) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + _, _, _, err = w.ReadAll() - if err != io.ErrUnexpectedEOF { - t.Fatalf("err = %v, want error %v", err, io.ErrUnexpectedEOF) - } - w.Close() + require.ErrorIs(t, err, io.ErrUnexpectedEOF) + require.NoError(t, w.Close()) // repair the wal - if ok := Repair(zaptest.NewLogger(t), p); !ok { - t.Fatalf("'Repair' returned '%v', want 'true'", ok) - } + require.True(t, Repair(lg, p), "'Repair' returned 'false', want 'true'") // read it back - w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{}) - if err != nil { - t.Fatal(err) - } + w, err = Open(lg, p, walpb.Snapshot{}) + require.NoError(t, err) + _, _, walEnts, err := w.ReadAll() - if err != nil { - t.Fatal(err) - } - if len(walEnts) != expectedEnts { - t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts) - } + require.NoError(t, err) + assert.Len(t, walEnts, expectedEnts) // write some more entries to repaired log for i := 1; i <= 10; i++ { es := []raftpb.Entry{{Index: uint64(expectedEnts + i)}} - if err = w.Save(raftpb.HardState{}, es); err != nil { - t.Fatal(err) - } + require.NoError(t, w.Save(raftpb.HardState{}, es)) } - w.Close() + require.NoError(t, w.Close()) // read back entries following repair, ensure it's all there - w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{}) - if err != nil { - t.Fatal(err) - } + w, err = Open(lg, p, walpb.Snapshot{}) + require.NoError(t, err) _, _, walEnts, err = w.ReadAll() - if err != nil { - t.Fatal(err) - } - if len(walEnts) != expectedEnts+10 { - t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts+10) - } + require.NoError(t, err) + assert.Len(t, walEnts, expectedEnts+10) } func makeEnts(ents int) (ret [][]raftpb.Entry) { diff --git a/server/storage/wal/walpb/record.go b/server/storage/wal/walpb/record.go index e2070fbba..693deab11 100644 --- a/server/storage/wal/walpb/record.go +++ b/server/storage/wal/walpb/record.go @@ -14,7 +14,10 @@ package walpb -import "errors" +import ( + "errors" + "fmt" +) var ( ErrCRCMismatch = errors.New("walpb: crc mismatch") @@ -24,8 +27,7 @@ func (rec *Record) Validate(crc uint32) error { if rec.Crc == crc { return nil } - rec.Reset() - return ErrCRCMismatch + return fmt.Errorf("%w: expected: %x computed: %x", ErrCRCMismatch, rec.Crc, crc) } // ValidateSnapshotForWrite ensures the Snapshot the newly written snapshot is valid.