wal: Expose Decoder as package visible interface.

Such that can be used by tools.

Signed-off-by: Piotr Tabor <ptab@google.com>
This commit is contained in:
Piotr Tabor 2022-12-23 12:59:06 +01:00
parent bee2a08968
commit 0d8aad54ba
5 changed files with 44 additions and 34 deletions

View File

@ -33,6 +33,13 @@ const minSectorSize = 512
// frameSizeBytes is frame size in bytes, including record size and padding size.
const frameSizeBytes = 8
type Decoder interface {
Decode(rec *walpb.Record) error
LastOffset() int64
LastCRC() uint32
UpdateCRC(prevCrc uint32)
}
type decoder struct {
mu sync.Mutex
brs []*fileutil.FileBufReader
@ -43,11 +50,11 @@ type decoder struct {
// continueOnCrcError - causes the decoder to continue working even in case of crc mismatch.
// This is a desired mode for tools performing inspection of the corrupted WAL logs.
// See comments on 'decode' method for semantic.
// See comments on 'Decode' method for semantic.
continueOnCrcError bool
}
func newDecoder(r ...fileutil.FileReader) *decoder {
func NewDecoder(r ...fileutil.FileReader) Decoder {
readers := make([]*fileutil.FileBufReader, len(r))
for i := range r {
readers[i] = fileutil.NewFileBufReader(r[i])
@ -58,12 +65,12 @@ func newDecoder(r ...fileutil.FileReader) *decoder {
}
}
// decode reads the next record out of the file.
// Decode reads the next record out of the file.
// In the success path, fills 'rec' and returns nil.
// When it fails, it returns err and usually resets 'rec' to the defaults.
// When continueOnCrcError is set, the method may return ErrUnexpectedEOF or ErrCRCMismatch, but preserve the read
// (potentially corrupted) record content.
func (d *decoder) decode(rec *walpb.Record) error {
func (d *decoder) Decode(rec *walpb.Record) error {
rec.Reset()
d.mu.Lock()
defer d.mu.Unlock()
@ -116,7 +123,10 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
// skip crc checking if the record type is crcType
if rec.Type != crcType {
d.crc.Write(rec.Data)
_, err := d.crc.Write(rec.Data)
if err != nil {
return err
}
if err := rec.Validate(d.crc.Sum32()); err != nil {
if !d.continueOnCrcError {
rec.Reset()
@ -184,15 +194,15 @@ func (d *decoder) isTornEntry(data []byte) bool {
return false
}
func (d *decoder) updateCRC(prevCrc uint32) {
func (d *decoder) UpdateCRC(prevCrc uint32) {
d.crc = crc.New(prevCrc, crcTable)
}
func (d *decoder) lastCRC() uint32 {
func (d *decoder) LastCRC() uint32 {
return d.crc.Sum32()
}
func (d *decoder) lastOffset() int64 { return d.lastValidOff }
func (d *decoder) LastOffset() int64 { return d.lastValidOff }
func mustUnmarshalEntry(d []byte) raftpb.Entry {
var e raftpb.Entry

View File

@ -57,8 +57,8 @@ func TestReadRecord(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
decoder := newDecoder(fileutil.NewFileReader(f))
e := decoder.decode(rec)
decoder := NewDecoder(fileutil.NewFileReader(f))
e := decoder.Decode(rec)
if !reflect.DeepEqual(rec, tt.wr) {
t.Errorf("#%d: block = %v, want %v", i, rec, tt.wr)
}
@ -81,8 +81,8 @@ func TestWriteRecord(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
decoder := newDecoder(fileutil.NewFileReader(f))
err = decoder.decode(b)
decoder := NewDecoder(fileutil.NewFileReader(f))
err = decoder.Decode(b)
if err != nil {
t.Errorf("err = %v, want nil", err)
}

View File

@ -42,22 +42,22 @@ func Repair(lg *zap.Logger, dirpath string) bool {
lg.Info("repairing", zap.String("path", f.Name()))
rec := &walpb.Record{}
decoder := newDecoder(fileutil.NewFileReader(f.File))
decoder := NewDecoder(fileutil.NewFileReader(f.File))
for {
lastOffset := decoder.lastOffset()
err := decoder.decode(rec)
lastOffset := decoder.LastOffset()
err := decoder.Decode(rec)
switch {
case err == nil:
// update crc of the decoder when necessary
switch rec.Type {
case crcType:
crc := decoder.crc.Sum32()
crc := decoder.LastCRC()
// current crc of decoder must match the crc of the record.
// do no need to match 0 crc, since the decoder is a new one at this case.
if crc != 0 && rec.Validate(crc) != nil {
return false
}
decoder.updateCRC(rec.Crc)
decoder.UpdateCRC(rec.Crc)
}
continue

View File

@ -81,8 +81,8 @@ type WAL struct {
state raftpb.HardState // hardstate recorded at the head of WAL
start walpb.Snapshot // snapshot to start reading
decoder *decoder // decoder to decode records
readClose func() error // closer for decode reader
decoder Decoder // decoder to Decode records
readClose func() error // closer for Decode reader
unsafeNoSync bool // if set, do not fsync
@ -351,7 +351,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
lg: lg,
dir: dirpath,
start: snap,
decoder: newDecoder(rs...),
decoder: NewDecoder(rs...),
readClose: closer,
locks: ls,
}
@ -452,7 +452,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
decoder := w.decoder
var match bool
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
switch rec.Type {
case entryType:
e := mustUnmarshalEntry(rec.Data)
@ -480,14 +480,14 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
metadata = rec.Data
case crcType:
crc := decoder.crc.Sum32()
crc := decoder.LastCRC()
// current crc of decoder must match the crc of the record.
// do no need to match 0 crc, since the decoder is a new one at this case.
if crc != 0 && rec.Validate(crc) != nil {
state.Reset()
return nil, state, nil, ErrCRCMismatch
}
decoder.updateCRC(rec.Crc)
decoder.UpdateCRC(rec.Crc)
case snapshotType:
var snap walpb.Snapshot
@ -527,7 +527,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
// not all, will cause CRC errors on WAL open. Since the records
// were never fully synced to disk in the first place, it's safe
// to zero them out to avoid any CRC errors from new writes.
if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil {
if _, err = w.tail().Seek(w.decoder.LastOffset(), io.SeekStart); err != nil {
return nil, state, nil, err
}
if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
@ -551,7 +551,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
if w.tail() != nil {
// create encoder (chain crc with the decoder), enable appending
w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
w.encoder, err = newFileEncoder(w.tail().File, w.decoder.LastCRC())
if err != nil {
return
}
@ -587,9 +587,9 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
}()
// create a new decoder from the readers on the WAL files
decoder := newDecoder(rs...)
decoder := NewDecoder(rs...)
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
switch rec.Type {
case snapshotType:
var loadedSnap walpb.Snapshot
@ -598,13 +598,13 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
case stateType:
state = mustUnmarshalState(rec.Data)
case crcType:
crc := decoder.crc.Sum32()
crc := decoder.LastCRC()
// current crc of decoder must match the crc of the record.
// do no need to match 0 crc, since the decoder is a new one at this case.
if crc != 0 && rec.Validate(crc) != nil {
return nil, ErrCRCMismatch
}
decoder.updateCRC(rec.Crc)
decoder.UpdateCRC(rec.Crc)
}
}
// We do not have to read out all the WAL entries
@ -661,9 +661,9 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta
}()
// create a new decoder from the readers on the WAL files
decoder := newDecoder(rs...)
decoder := NewDecoder(rs...)
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
switch rec.Type {
case metadataType:
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
@ -671,13 +671,13 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta
}
metadata = rec.Data
case crcType:
crc := decoder.crc.Sum32()
crc := decoder.LastCRC()
// Current crc of decoder must match the crc of the record.
// We need not match 0 crc, since the decoder is a new one at this point.
if crc != 0 && rec.Validate(crc) != nil {
return nil, ErrCRCMismatch
}
decoder.updateCRC(rec.Crc)
decoder.UpdateCRC(rec.Crc)
case snapshotType:
var loadedSnap walpb.Snapshot
pbutil.MustUnmarshal(&loadedSnap, rec.Data)

View File

@ -302,7 +302,7 @@ func TestCut(t *testing.T) {
}
defer f.Close()
nw := &WAL{
decoder: newDecoder(fileutil.NewFileReader(f)),
decoder: NewDecoder(fileutil.NewFileReader(f)),
start: snap,
}
_, gst, _, err := nw.ReadAll()