wal decoding: Add optional mode to ignore CRC errors.

Signed-off-by: Piotr Tabor <ptab@google.com>
This commit is contained in:
Piotr Tabor 2022-12-23 12:48:24 +01:00
parent 498619bdda
commit bee2a08968
4 changed files with 54 additions and 59 deletions

View File

@ -40,6 +40,11 @@ type decoder struct {
// lastValidOff file offset following the last valid decoded record
lastValidOff int64
crc hash.Hash32
// continueOnCrcError - causes the decoder to continue working even in case of crc mismatch.
// This is a desired mode for tools performing inspection of the corrupted WAL logs.
// See comments on 'decode' method for semantic.
continueOnCrcError bool
}
func newDecoder(r ...fileutil.FileReader) *decoder {
@ -53,6 +58,11 @@ func newDecoder(r ...fileutil.FileReader) *decoder {
}
}
// decode reads the next record out of the file.
// In the success path, fills 'rec' and returns nil.
// When it fails, it returns err and usually resets 'rec' to the defaults.
// When continueOnCrcError is set, the method may return ErrUnexpectedEOF or ErrCRCMismatch, but preserve the read
// (potentially corrupted) record content.
func (d *decoder) decode(rec *walpb.Record) error {
rec.Reset()
d.mu.Lock()
@ -108,6 +118,13 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
if rec.Type != crcType {
d.crc.Write(rec.Data)
if err := rec.Validate(d.crc.Sum32()); err != nil {
if !d.continueOnCrcError {
rec.Reset()
} else {
// If we continue, we want to update lastValidOff, such that following errors are consistent
defer func() { d.lastValidOff += frameSizeBytes + recBytes + padBytes }()
}
if d.isTornEntry(data) {
return fmt.Errorf("%w: in file '%s' at position: %d", io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), d.lastValidOff)
}

View File

@ -15,6 +15,7 @@
package wal
import (
"errors"
"io"
"os"
"path/filepath"
@ -45,8 +46,8 @@ func Repair(lg *zap.Logger, dirpath string) bool {
for {
lastOffset := decoder.lastOffset()
err := decoder.decode(rec)
switch err {
case nil:
switch {
case err == nil:
// update crc of the decoder when necessary
switch rec.Type {
case crcType:
@ -60,11 +61,11 @@ func Repair(lg *zap.Logger, dirpath string) bool {
}
continue
case io.EOF:
case errors.Is(err, io.EOF):
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF))
return true
case io.ErrUnexpectedEOF:
case errors.Is(err, io.ErrUnexpectedEOF):
brokenName := f.Name() + ".broken"
bf, bferr := os.Create(brokenName)
if bferr != nil {

View File

@ -16,6 +16,8 @@ package wal
import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io"
"os"
"testing"
@ -43,86 +45,59 @@ func TestRepairTruncate(t *testing.T) {
}
func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expectedEnts int) {
lg := zaptest.NewLogger(t)
p := t.TempDir()
// create WAL
w, err := Create(zaptest.NewLogger(t), p, nil)
w, err := Create(lg, p, nil)
defer func() {
if err = w.Close(); err != nil {
t.Fatal(err)
}
// The Close might fail.
_ = w.Close()
}()
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
for _, es := range ents {
if err = w.Save(raftpb.HardState{}, es); err != nil {
t.Fatal(err)
}
assert.NoError(t, w.Save(raftpb.HardState{}, es))
}
offset, err := w.tail().Seek(0, io.SeekCurrent)
if err != nil {
t.Fatal(err)
}
w.Close()
require.NoError(t, err)
require.NoError(t, w.Close())
err = corrupt(p, offset)
if err != nil {
t.Fatal(err)
}
require.NoError(t, corrupt(p, offset))
// verify we broke the wal
w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{})
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
_, _, _, err = w.ReadAll()
if err != io.ErrUnexpectedEOF {
t.Fatalf("err = %v, want error %v", err, io.ErrUnexpectedEOF)
}
w.Close()
require.ErrorIs(t, err, io.ErrUnexpectedEOF)
require.NoError(t, w.Close())
// repair the wal
if ok := Repair(zaptest.NewLogger(t), p); !ok {
t.Fatalf("'Repair' returned '%v', want 'true'", ok)
}
require.True(t, Repair(lg, p), "'Repair' returned 'false', want 'true'")
// read it back
w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{})
if err != nil {
t.Fatal(err)
}
w, err = Open(lg, p, walpb.Snapshot{})
require.NoError(t, err)
_, _, walEnts, err := w.ReadAll()
if err != nil {
t.Fatal(err)
}
if len(walEnts) != expectedEnts {
t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts)
}
require.NoError(t, err)
assert.Len(t, walEnts, expectedEnts)
// write some more entries to repaired log
for i := 1; i <= 10; i++ {
es := []raftpb.Entry{{Index: uint64(expectedEnts + i)}}
if err = w.Save(raftpb.HardState{}, es); err != nil {
t.Fatal(err)
require.NoError(t, w.Save(raftpb.HardState{}, es))
}
}
w.Close()
require.NoError(t, w.Close())
// read back entries following repair, ensure it's all there
w, err = Open(zaptest.NewLogger(t), p, walpb.Snapshot{})
if err != nil {
t.Fatal(err)
}
w, err = Open(lg, p, walpb.Snapshot{})
require.NoError(t, err)
_, _, walEnts, err = w.ReadAll()
if err != nil {
t.Fatal(err)
}
if len(walEnts) != expectedEnts+10 {
t.Fatalf("len(ents) = %d, want %d", len(walEnts), expectedEnts+10)
}
require.NoError(t, err)
assert.Len(t, walEnts, expectedEnts+10)
}
func makeEnts(ents int) (ret [][]raftpb.Entry) {

View File

@ -14,7 +14,10 @@
package walpb
import "errors"
import (
"errors"
"fmt"
)
var (
ErrCRCMismatch = errors.New("walpb: crc mismatch")
@ -24,8 +27,7 @@ func (rec *Record) Validate(crc uint32) error {
if rec.Crc == crc {
return nil
}
rec.Reset()
return ErrCRCMismatch
return fmt.Errorf("%w: expected: %x computed: %x", ErrCRCMismatch, rec.Crc, crc)
}
// ValidateSnapshotForWrite ensures the Snapshot the newly written snapshot is valid.