From 0d8aad54bac0b3b4439279a661864bd3638c4ebe Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Fri, 23 Dec 2022 12:59:06 +0100 Subject: [PATCH] wal: Expose Decoder as package visible interface. Such that can be used by tools. Signed-off-by: Piotr Tabor --- server/storage/wal/decoder.go | 26 +++++++++++++++++-------- server/storage/wal/record_test.go | 8 ++++---- server/storage/wal/repair.go | 10 +++++----- server/storage/wal/wal.go | 32 +++++++++++++++---------------- server/storage/wal/wal_test.go | 2 +- 5 files changed, 44 insertions(+), 34 deletions(-) diff --git a/server/storage/wal/decoder.go b/server/storage/wal/decoder.go index 11fb3985f..cbe4b0821 100644 --- a/server/storage/wal/decoder.go +++ b/server/storage/wal/decoder.go @@ -33,6 +33,13 @@ const minSectorSize = 512 // frameSizeBytes is frame size in bytes, including record size and padding size. const frameSizeBytes = 8 +type Decoder interface { + Decode(rec *walpb.Record) error + LastOffset() int64 + LastCRC() uint32 + UpdateCRC(prevCrc uint32) +} + type decoder struct { mu sync.Mutex brs []*fileutil.FileBufReader @@ -43,11 +50,11 @@ type decoder struct { // continueOnCrcError - causes the decoder to continue working even in case of crc mismatch. // This is a desired mode for tools performing inspection of the corrupted WAL logs. - // See comments on 'decode' method for semantic. + // See comments on 'Decode' method for semantic. continueOnCrcError bool } -func newDecoder(r ...fileutil.FileReader) *decoder { +func NewDecoder(r ...fileutil.FileReader) Decoder { readers := make([]*fileutil.FileBufReader, len(r)) for i := range r { readers[i] = fileutil.NewFileBufReader(r[i]) @@ -58,12 +65,12 @@ func newDecoder(r ...fileutil.FileReader) *decoder { } } -// decode reads the next record out of the file. +// Decode reads the next record out of the file. // In the success path, fills 'rec' and returns nil. // When it fails, it returns err and usually resets 'rec' to the defaults. // When continueOnCrcError is set, the method may return ErrUnexpectedEOF or ErrCRCMismatch, but preserve the read // (potentially corrupted) record content. -func (d *decoder) decode(rec *walpb.Record) error { +func (d *decoder) Decode(rec *walpb.Record) error { rec.Reset() d.mu.Lock() defer d.mu.Unlock() @@ -116,7 +123,10 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error { // skip crc checking if the record type is crcType if rec.Type != crcType { - d.crc.Write(rec.Data) + _, err := d.crc.Write(rec.Data) + if err != nil { + return err + } if err := rec.Validate(d.crc.Sum32()); err != nil { if !d.continueOnCrcError { rec.Reset() @@ -184,15 +194,15 @@ func (d *decoder) isTornEntry(data []byte) bool { return false } -func (d *decoder) updateCRC(prevCrc uint32) { +func (d *decoder) UpdateCRC(prevCrc uint32) { d.crc = crc.New(prevCrc, crcTable) } -func (d *decoder) lastCRC() uint32 { +func (d *decoder) LastCRC() uint32 { return d.crc.Sum32() } -func (d *decoder) lastOffset() int64 { return d.lastValidOff } +func (d *decoder) LastOffset() int64 { return d.lastValidOff } func mustUnmarshalEntry(d []byte) raftpb.Entry { var e raftpb.Entry diff --git a/server/storage/wal/record_test.go b/server/storage/wal/record_test.go index 0a01d6e6f..85ceebed9 100644 --- a/server/storage/wal/record_test.go +++ b/server/storage/wal/record_test.go @@ -57,8 +57,8 @@ func TestReadRecord(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %v", err) } - decoder := newDecoder(fileutil.NewFileReader(f)) - e := decoder.decode(rec) + decoder := NewDecoder(fileutil.NewFileReader(f)) + e := decoder.Decode(rec) if !reflect.DeepEqual(rec, tt.wr) { t.Errorf("#%d: block = %v, want %v", i, rec, tt.wr) } @@ -81,8 +81,8 @@ func TestWriteRecord(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %v", err) } - decoder := newDecoder(fileutil.NewFileReader(f)) - err = decoder.decode(b) + decoder := NewDecoder(fileutil.NewFileReader(f)) + err = decoder.Decode(b) if err != nil { t.Errorf("err = %v, want nil", err) } diff --git a/server/storage/wal/repair.go b/server/storage/wal/repair.go index 0cb1be892..bca72f8ae 100644 --- a/server/storage/wal/repair.go +++ b/server/storage/wal/repair.go @@ -42,22 +42,22 @@ func Repair(lg *zap.Logger, dirpath string) bool { lg.Info("repairing", zap.String("path", f.Name())) rec := &walpb.Record{} - decoder := newDecoder(fileutil.NewFileReader(f.File)) + decoder := NewDecoder(fileutil.NewFileReader(f.File)) for { - lastOffset := decoder.lastOffset() - err := decoder.decode(rec) + lastOffset := decoder.LastOffset() + err := decoder.Decode(rec) switch { case err == nil: // update crc of the decoder when necessary switch rec.Type { case crcType: - crc := decoder.crc.Sum32() + crc := decoder.LastCRC() // current crc of decoder must match the crc of the record. // do no need to match 0 crc, since the decoder is a new one at this case. if crc != 0 && rec.Validate(crc) != nil { return false } - decoder.updateCRC(rec.Crc) + decoder.UpdateCRC(rec.Crc) } continue diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index 145a1a305..5c15ebfa6 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -81,8 +81,8 @@ type WAL struct { state raftpb.HardState // hardstate recorded at the head of WAL start walpb.Snapshot // snapshot to start reading - decoder *decoder // decoder to decode records - readClose func() error // closer for decode reader + decoder Decoder // decoder to Decode records + readClose func() error // closer for Decode reader unsafeNoSync bool // if set, do not fsync @@ -351,7 +351,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool lg: lg, dir: dirpath, start: snap, - decoder: newDecoder(rs...), + decoder: NewDecoder(rs...), readClose: closer, locks: ls, } @@ -452,7 +452,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. decoder := w.decoder var match bool - for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) { switch rec.Type { case entryType: e := mustUnmarshalEntry(rec.Data) @@ -480,14 +480,14 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. metadata = rec.Data case crcType: - crc := decoder.crc.Sum32() + crc := decoder.LastCRC() // current crc of decoder must match the crc of the record. // do no need to match 0 crc, since the decoder is a new one at this case. if crc != 0 && rec.Validate(crc) != nil { state.Reset() return nil, state, nil, ErrCRCMismatch } - decoder.updateCRC(rec.Crc) + decoder.UpdateCRC(rec.Crc) case snapshotType: var snap walpb.Snapshot @@ -527,7 +527,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. // not all, will cause CRC errors on WAL open. Since the records // were never fully synced to disk in the first place, it's safe // to zero them out to avoid any CRC errors from new writes. - if _, err = w.tail().Seek(w.decoder.lastOffset(), io.SeekStart); err != nil { + if _, err = w.tail().Seek(w.decoder.LastOffset(), io.SeekStart); err != nil { return nil, state, nil, err } if err = fileutil.ZeroToEnd(w.tail().File); err != nil { @@ -551,7 +551,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. if w.tail() != nil { // create encoder (chain crc with the decoder), enable appending - w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC()) + w.encoder, err = newFileEncoder(w.tail().File, w.decoder.LastCRC()) if err != nil { return } @@ -587,9 +587,9 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro }() // create a new decoder from the readers on the WAL files - decoder := newDecoder(rs...) + decoder := NewDecoder(rs...) - for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) { switch rec.Type { case snapshotType: var loadedSnap walpb.Snapshot @@ -598,13 +598,13 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro case stateType: state = mustUnmarshalState(rec.Data) case crcType: - crc := decoder.crc.Sum32() + crc := decoder.LastCRC() // current crc of decoder must match the crc of the record. // do no need to match 0 crc, since the decoder is a new one at this case. if crc != 0 && rec.Validate(crc) != nil { return nil, ErrCRCMismatch } - decoder.updateCRC(rec.Crc) + decoder.UpdateCRC(rec.Crc) } } // We do not have to read out all the WAL entries @@ -661,9 +661,9 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta }() // create a new decoder from the readers on the WAL files - decoder := newDecoder(rs...) + decoder := NewDecoder(rs...) - for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) { switch rec.Type { case metadataType: if metadata != nil && !bytes.Equal(metadata, rec.Data) { @@ -671,13 +671,13 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta } metadata = rec.Data case crcType: - crc := decoder.crc.Sum32() + crc := decoder.LastCRC() // Current crc of decoder must match the crc of the record. // We need not match 0 crc, since the decoder is a new one at this point. if crc != 0 && rec.Validate(crc) != nil { return nil, ErrCRCMismatch } - decoder.updateCRC(rec.Crc) + decoder.UpdateCRC(rec.Crc) case snapshotType: var loadedSnap walpb.Snapshot pbutil.MustUnmarshal(&loadedSnap, rec.Data) diff --git a/server/storage/wal/wal_test.go b/server/storage/wal/wal_test.go index 5a8ad0dc4..70883ba3b 100644 --- a/server/storage/wal/wal_test.go +++ b/server/storage/wal/wal_test.go @@ -302,7 +302,7 @@ func TestCut(t *testing.T) { } defer f.Close() nw := &WAL{ - decoder: newDecoder(fileutil.NewFileReader(f)), + decoder: NewDecoder(fileutil.NewFileReader(f)), start: snap, } _, gst, _, err := nw.ReadAll()