From 3d6862fe0d667715e35013f8c7999ff500c9f404 Mon Sep 17 00:00:00 2001 From: shreyas-s-rao Date: Fri, 1 Mar 2019 14:39:43 +0530 Subject: [PATCH 1/2] wal: add Verify function to perform corruption check on wal contents Signed-off-by: Shreyas Rao --- wal/wal.go | 164 ++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 130 insertions(+), 34 deletions(-) diff --git a/wal/wal.go b/wal/wal.go index 7200ad088..73bef540a 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -299,47 +299,18 @@ func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, err } func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { - names, err := readWALNames(lg, dirpath) + names, nameIndex, err := selectWALFiles(lg, dirpath, snap) if err != nil { return nil, err } - nameIndex, ok := searchIndex(lg, names, snap.Index) - if !ok || !isValidSeq(lg, names[nameIndex:]) { - return nil, ErrFileNotFound + rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write) + if err != nil { + return nil, err } - // open the wal files - rcs := make([]io.ReadCloser, 0) - rs := make([]io.Reader, 0) - ls := make([]*fileutil.LockedFile, 0) - for _, name := range names[nameIndex:] { - p := filepath.Join(dirpath, name) - if write { - l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode) - if err != nil { - closeAll(rcs...) - return nil, err - } - ls = append(ls, l) - rcs = append(rcs, l) - } else { - rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode) - if err != nil { - closeAll(rcs...) - return nil, err - } - ls = append(ls, nil) - rcs = append(rcs, rf) - } - rs = append(rs, rcs[len(rcs)-1]) - } - - closer := func() error { return closeAll(rcs...) } - // create a WAL ready for reading w := &WAL{ - lg: lg, dir: dirpath, start: snap, decoder: newDecoder(rs...), @@ -355,12 +326,58 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool closer() return nil, err } - w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes) + w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes) } return w, nil } +func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) { + names, err := readWALNames(lg, dirpath) + if err != nil { + return nil, -1, err + } + + nameIndex, ok := searchIndex(lg, names, snap.Index) + if !ok || !isValidSeq(lg, names[nameIndex:]) { + err = ErrFileNotFound + return nil, -1, err + } + + return names, nameIndex, nil +} + +func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) { + rcs := make([]io.ReadCloser, 0) + rs := make([]io.Reader, 0) + ls := make([]*fileutil.LockedFile, 0) + for _, name := range names[nameIndex:] { + p := filepath.Join(dirpath, name) + if write { + l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode) + if err != nil { + closeAll(rcs...) + return nil, nil, nil, err + } + ls = append(ls, l) + rcs = append(rcs, l) + } else { + rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode) + if err != nil { + closeAll(rcs...) + return nil, nil, nil, err + } + ls = append(ls, nil) + rcs = append(rcs, rf) + } + rs = append(rs, rcs[len(rcs)-1]) + } + + closer := func() error { return closeAll(rcs...) } + + return rs, ls, closer, nil +} + // ReadAll reads out records of the current WAL. // If opened in write mode, it must read out all records until EOF. Or an error // will be returned. @@ -480,6 +497,85 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. return metadata, state, ents, err } +// Verify reads through the given WAL and verifies that it is not corrupted. +// It creates a new decoder to read through the records of the given WAL. +// It does not conflict with any open WAL, but it is recommended not to +// call this function after opening the WAL for writing. +// If it cannot read out the expected snap, it will return ErrSnapshotNotFound. +// If the loaded snap doesn't match with the expected one, it will +// return error ErrSnapshotMismatch. +func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) error { + var metadata []byte + var err error + var match bool + + rec := &walpb.Record{} + + names, nameIndex, err := selectWALFiles(lg, walDir, snap) + if err != nil { + return err + } + + // open wal files in read mode, so that there is no conflict + // when the same WAL is opened elsewhere in write mode + rs, _, closer, err := openWALFiles(lg, walDir, names, nameIndex, false) + if err != nil { + return err + } + + // create a new decoder from the readers on the WAL files + decoder := newDecoder(rs...) + + for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) { + switch rec.Type { + case metadataType: + if metadata != nil && !bytes.Equal(metadata, rec.Data) { + return ErrMetadataConflict + } + metadata = rec.Data + case crcType: + crc := decoder.crc.Sum32() + // Current crc of decoder must match the crc of the record. + // We need not match 0 crc, since the decoder is a new one at this point. + if crc != 0 && rec.Validate(crc) != nil { + return ErrCRCMismatch + } + decoder.updateCRC(rec.Crc) + case snapshotType: + var loadedSnap walpb.Snapshot + pbutil.MustUnmarshal(&loadedSnap, rec.Data) + if loadedSnap.Index == snap.Index { + if loadedSnap.Term != snap.Term { + return ErrSnapshotMismatch + } + match = true + } + // We ignore all entry and state type records as these + // are not necessary for validating the WAL contents + case entryType: + case stateType: + default: + return fmt.Errorf("unexpected block type %d", rec.Type) + } + } + + if closer != nil { + closer() + } + + // We do not have to read out all the WAL entries + // as the decoder is opened in read mode. + if err != io.EOF && err != io.ErrUnexpectedEOF { + return err + } + + if !match { + return ErrSnapshotNotFound + } + + return nil +} + // cut closes current file written and creates a new one ready to append. // cut first creates a temp wal file and writes necessary headers into it. // Then cut atomically rename temp wal file to a wal file. From bb3eb8fea9bd8eb2e9cb54c2b98fb39945e3e6f0 Mon Sep 17 00:00:00 2001 From: Shreyas Rao Date: Tue, 12 Mar 2019 17:07:00 +0530 Subject: [PATCH 2/2] wal: Add test for Verify Signed-off-by: Shreyas Rao --- wal/wal_test.go | 52 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/wal/wal_test.go b/wal/wal_test.go index 8477a0402..addd094cb 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -20,6 +20,7 @@ import ( "io/ioutil" "math" "os" + "path" "path/filepath" "reflect" "testing" @@ -186,6 +187,57 @@ func TestOpenAtIndex(t *testing.T) { } } +// TestVerify tests that Verify throws a non-nil error when the WAL is corrupted. +// The test creates a WAL directory and cuts out multiple WAL files. Then +// it corrupts one of the files by completely truncating it. +func TestVerify(t *testing.T) { + walDir, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(walDir) + + // create WAL + w, err := Create(zap.NewExample(), walDir, nil) + if err != nil { + t.Fatal(err) + } + defer w.Close() + + // make 5 separate files + for i := 0; i < 5; i++ { + es := []raftpb.Entry{{Index: uint64(i), Data: []byte("waldata" + string(i+1))}} + if err = w.Save(raftpb.HardState{}, es); err != nil { + t.Fatal(err) + } + if err = w.cut(); err != nil { + t.Fatal(err) + } + } + + // to verify the WAL is not corrupted at this point + err = Verify(zap.NewExample(), walDir, walpb.Snapshot{}) + if err != nil { + t.Errorf("expected a nil error, got %v", err) + } + + walFiles, err := ioutil.ReadDir(walDir) + if err != nil { + t.Fatal(err) + } + + // corrupt the WAL by truncating one of the WAL files completely + err = os.Truncate(path.Join(walDir, walFiles[2].Name()), 0) + if err != nil { + t.Fatal(err) + } + + err = Verify(zap.NewExample(), walDir, walpb.Snapshot{}) + if err == nil { + t.Error("expected a non-nil error, got nil") + } +} + // TODO: split it into smaller tests for better readability func TestCut(t *testing.T) { p, err := ioutil.TempDir(os.TempDir(), "waltest")