Merge pull request #10516 from shreyas-s-rao/wal-verify-func

wal: add Verify function to perform corruption check on wal contents
This commit is contained in:
Xiang Li 2019-03-12 12:06:36 -07:00 committed by GitHub
commit 4478993fbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 182 additions and 34 deletions

View File

@ -299,47 +299,18 @@ func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, err
}
func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
names, err := readWALNames(lg, dirpath)
names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
if err != nil {
return nil, err
}
nameIndex, ok := searchIndex(lg, names, snap.Index)
if !ok || !isValidSeq(lg, names[nameIndex:]) {
return nil, ErrFileNotFound
rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write)
if err != nil {
return nil, err
}
// open the wal files
rcs := make([]io.ReadCloser, 0)
rs := make([]io.Reader, 0)
ls := make([]*fileutil.LockedFile, 0)
for _, name := range names[nameIndex:] {
p := filepath.Join(dirpath, name)
if write {
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, err
}
ls = append(ls, l)
rcs = append(rcs, l)
} else {
rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, err
}
ls = append(ls, nil)
rcs = append(rcs, rf)
}
rs = append(rs, rcs[len(rcs)-1])
}
closer := func() error { return closeAll(rcs...) }
// create a WAL ready for reading
w := &WAL{
lg: lg,
dir: dirpath,
start: snap,
decoder: newDecoder(rs...),
@ -355,12 +326,58 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
closer()
return nil, err
}
w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes)
w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes)
}
return w, nil
}
func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) {
names, err := readWALNames(lg, dirpath)
if err != nil {
return nil, -1, err
}
nameIndex, ok := searchIndex(lg, names, snap.Index)
if !ok || !isValidSeq(lg, names[nameIndex:]) {
err = ErrFileNotFound
return nil, -1, err
}
return names, nameIndex, nil
}
func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) {
rcs := make([]io.ReadCloser, 0)
rs := make([]io.Reader, 0)
ls := make([]*fileutil.LockedFile, 0)
for _, name := range names[nameIndex:] {
p := filepath.Join(dirpath, name)
if write {
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, nil, nil, err
}
ls = append(ls, l)
rcs = append(rcs, l)
} else {
rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
if err != nil {
closeAll(rcs...)
return nil, nil, nil, err
}
ls = append(ls, nil)
rcs = append(rcs, rf)
}
rs = append(rs, rcs[len(rcs)-1])
}
closer := func() error { return closeAll(rcs...) }
return rs, ls, closer, nil
}
// ReadAll reads out records of the current WAL.
// If opened in write mode, it must read out all records until EOF. Or an error
// will be returned.
@ -480,6 +497,85 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
return metadata, state, ents, err
}
// Verify reads through the given WAL and verifies that it is not corrupted.
// It creates a new decoder to read through the records of the given WAL.
// It does not conflict with any open WAL, but it is recommended not to
// call this function after opening the WAL for writing.
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
// If the loaded snap doesn't match with the expected one, it will
// return error ErrSnapshotMismatch.
func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) error {
var metadata []byte
var err error
var match bool
rec := &walpb.Record{}
names, nameIndex, err := selectWALFiles(lg, walDir, snap)
if err != nil {
return err
}
// open wal files in read mode, so that there is no conflict
// when the same WAL is opened elsewhere in write mode
rs, _, closer, err := openWALFiles(lg, walDir, names, nameIndex, false)
if err != nil {
return err
}
// create a new decoder from the readers on the WAL files
decoder := newDecoder(rs...)
for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
switch rec.Type {
case metadataType:
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
return ErrMetadataConflict
}
metadata = rec.Data
case crcType:
crc := decoder.crc.Sum32()
// Current crc of decoder must match the crc of the record.
// We need not match 0 crc, since the decoder is a new one at this point.
if crc != 0 && rec.Validate(crc) != nil {
return ErrCRCMismatch
}
decoder.updateCRC(rec.Crc)
case snapshotType:
var loadedSnap walpb.Snapshot
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
if loadedSnap.Index == snap.Index {
if loadedSnap.Term != snap.Term {
return ErrSnapshotMismatch
}
match = true
}
// We ignore all entry and state type records as these
// are not necessary for validating the WAL contents
case entryType:
case stateType:
default:
return fmt.Errorf("unexpected block type %d", rec.Type)
}
}
if closer != nil {
closer()
}
// We do not have to read out all the WAL entries
// as the decoder is opened in read mode.
if err != io.EOF && err != io.ErrUnexpectedEOF {
return err
}
if !match {
return ErrSnapshotNotFound
}
return nil
}
// cut closes current file written and creates a new one ready to append.
// cut first creates a temp wal file and writes necessary headers into it.
// Then cut atomically rename temp wal file to a wal file.

View File

@ -20,6 +20,7 @@ import (
"io/ioutil"
"math"
"os"
"path"
"path/filepath"
"reflect"
"testing"
@ -186,6 +187,57 @@ func TestOpenAtIndex(t *testing.T) {
}
}
// TestVerify tests that Verify throws a non-nil error when the WAL is corrupted.
// The test creates a WAL directory and cuts out multiple WAL files. Then
// it corrupts one of the files by completely truncating it.
func TestVerify(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(walDir)
// create WAL
w, err := Create(zap.NewExample(), walDir, nil)
if err != nil {
t.Fatal(err)
}
defer w.Close()
// make 5 separate files
for i := 0; i < 5; i++ {
es := []raftpb.Entry{{Index: uint64(i), Data: []byte("waldata" + string(i+1))}}
if err = w.Save(raftpb.HardState{}, es); err != nil {
t.Fatal(err)
}
if err = w.cut(); err != nil {
t.Fatal(err)
}
}
// to verify the WAL is not corrupted at this point
err = Verify(zap.NewExample(), walDir, walpb.Snapshot{})
if err != nil {
t.Errorf("expected a nil error, got %v", err)
}
walFiles, err := ioutil.ReadDir(walDir)
if err != nil {
t.Fatal(err)
}
// corrupt the WAL by truncating one of the WAL files completely
err = os.Truncate(path.Join(walDir, walFiles[2].Name()), 0)
if err != nil {
t.Fatal(err)
}
err = Verify(zap.NewExample(), walDir, walpb.Snapshot{})
if err == nil {
t.Error("expected a non-nil error, got nil")
}
}
// TODO: split it into smaller tests for better readability
func TestCut(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")