From bd9f1584d4000b2df0e8e288cf144e8c4d7fcaff Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sat, 7 Jan 2023 13:09:37 +0800 Subject: [PATCH 1/2] process the scenaro of the last WAL record being partially synced to disk We need to return io.ErrUnexpectedEOF in the error chain, so that etcdserver can repair it automatically. Signed-off-by: Benjamin Wang --- server/etcdserver/bootstrap.go | 8 ++-- server/storage/wal/decoder.go | 4 +- server/storage/wal/wal.go | 12 +++--- server/storage/wal/wal_test.go | 77 ++++++++++++++++++++++++++++++++++ tools/etcd-dump-logs/raw.go | 3 ++ 5 files changed, 92 insertions(+), 12 deletions(-) diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index 1587e3321..146e259be 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -16,6 +16,7 @@ package etcdserver import ( "encoding/json" + "errors" "fmt" "io" "net/http" @@ -27,8 +28,6 @@ import ( "github.com/dustin/go-humanize" "go.uber.org/zap" - "go.etcd.io/etcd/server/v3/etcdserver/errors" - "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/client/pkg/v3/types" @@ -42,6 +41,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" "go.etcd.io/etcd/server/v3/etcdserver/api/v3discovery" "go.etcd.io/etcd/server/v3/etcdserver/cindex" + servererrors "go.etcd.io/etcd/server/v3/etcdserver/errors" serverstorage "go.etcd.io/etcd/server/v3/storage" "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/schema" @@ -339,7 +339,7 @@ func bootstrapNewClusterNoWAL(cfg config.ServerConfig, prt http.RoundTripper) (* str, err = v3discovery.JoinCluster(cfg.Logger, &cfg.DiscoveryCfg, m.ID, cfg.InitialPeerURLsMap.String()) } if err != nil { - return nil, &errors.DiscoveryError{Op: "join", Err: err} + return nil, &servererrors.DiscoveryError{Op: "join", Err: err} } var urlsmap types.URLsMap urlsmap, err = types.NewURLsMap(str) @@ -614,7 +614,7 @@ func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*w if err != nil { w.Close() // we can only repair ErrUnexpectedEOF and we never repair twice. - if repaired || err != io.ErrUnexpectedEOF { + if repaired || !errors.Is(err, io.ErrUnexpectedEOF) { cfg.Logger.Fatal("failed to read WAL, cannot be repaired", zap.Error(err)) } if !wal.Repair(cfg.Logger, cfg.WALDir()) { diff --git a/server/storage/wal/decoder.go b/server/storage/wal/decoder.go index 168c71cb6..0f47b72fd 100644 --- a/server/storage/wal/decoder.go +++ b/server/storage/wal/decoder.go @@ -106,8 +106,8 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error { // The length of current WAL entry must be less than the remaining file size. maxEntryLimit := fileBufReader.FileInfo().Size() - d.lastValidOff - padBytes if recBytes > maxEntryLimit { - return fmt.Errorf("wal: max entry size limit exceeded, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)", - recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit) + return fmt.Errorf("%w: [wal] max entry size limit exceeded when reading %q, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)", + io.ErrUnexpectedEOF, fileBufReader.FileInfo().Name(), recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit) } data := make([]byte, recBytes+padBytes) diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index e913c76c1..ff24ca86a 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -510,14 +510,14 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. case nil: // We do not have to read out all entries in read mode. // The last record maybe a partial written one, so - // ErrunexpectedEOF might be returned. - if err != io.EOF && err != io.ErrUnexpectedEOF { + // `io.ErrUnexpectedEOF` might be returned. + if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { state.Reset() return nil, state, nil, err } default: - // We must read all of the entries if WAL is opened in write mode. - if err != io.EOF { + // We must read all the entries if WAL is opened in write mode. + if !errors.Is(err, io.EOF) { state.Reset() return nil, state, nil, err } @@ -609,7 +609,7 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro } // We do not have to read out all the WAL entries // as the decoder is opened in read mode. - if err != io.EOF && err != io.ErrUnexpectedEOF { + if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { return nil, err } @@ -699,7 +699,7 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta // We do not have to read out all the WAL entries // as the decoder is opened in read mode. - if err != io.EOF && err != io.ErrUnexpectedEOF { + if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { return nil, err } diff --git a/server/storage/wal/wal_test.go b/server/storage/wal/wal_test.go index 1063ed438..45bae828c 100644 --- a/server/storage/wal/wal_test.go +++ b/server/storage/wal/wal_test.go @@ -30,6 +30,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/client/pkg/v3/fileutil" @@ -1068,3 +1069,79 @@ func TestValidSnapshotEntriesAfterPurgeWal(t *testing.T) { t.Fatal(err) } } + +func TestLastRecordLengthExceedFileEnd(t *testing.T) { + /* The data below was generated by code something like below. The length + * of the last record was intentionally changed to 1000 in order to make + * sure it exceeds the end of the file. + * + * for i := 0; i < 3; i++ { + * es := []raftpb.Entry{{Index: uint64(i + 1), Data: []byte(fmt.Sprintf("waldata%d", i+1))}} + * if err = w.Save(raftpb.HardState{}, es); err != nil { + * t.Fatal(err) + * } + * } + * ...... + * var sb strings.Builder + * for _, ch := range buf { + * sb.WriteString(fmt.Sprintf("\\x%02x", ch)) + * } + */ + // Generate WAL file + t.Log("Generate a WAL file with the last record's length modified.") + data := []byte("\x04\x00\x00\x00\x00\x00\x00\x84\x08\x04\x10\x00\x00" + + "\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x84\x08\x01\x10\x00\x00" + + "\x00\x00\x00\x0e\x00\x00\x00\x00\x00\x00\x82\x08\x05\x10\xa0\xb3" + + "\x9b\x8f\x08\x1a\x04\x08\x00\x10\x00\x00\x00\x1a\x00\x00\x00\x00" + + "\x00\x00\x86\x08\x02\x10\xba\x8b\xdc\x85\x0f\x1a\x10\x08\x00\x10" + + "\x00\x18\x01\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x31\x00\x00\x00" + + "\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x86\x08\x02\x10\xa1\xe8" + + "\xff\x9c\x02\x1a\x10\x08\x00\x10\x00\x18\x02\x22\x08\x77\x61\x6c" + + "\x64\x61\x74\x61\x32\x00\x00\x00\x00\x00\x00\xe8\x03\x00\x00\x00" + + "\x00\x00\x86\x08\x02\x10\xa1\x9c\xa1\xaa\x04\x1a\x10\x08\x00\x10" + + "\x00\x18\x03\x22\x08\x77\x61\x6c\x64\x61\x74\x61\x33\x00\x00\x00" + + "\x00\x00\x00") + + buf := bytes.NewBuffer(data) + f, err := createFileWithData(t, buf) + fileName := f.Name() + require.NoError(t, err) + t.Logf("fileName: %v", fileName) + + // Verify low-level decoder directly + t.Log("Verify all records can be parsed correctly.") + rec := &walpb.Record{} + decoder := NewDecoder(fileutil.NewFileReader(f)) + for { + if err = decoder.Decode(rec); err != nil { + require.ErrorIs(t, err, io.ErrUnexpectedEOF) + break + } + if rec.Type == EntryType { + e := MustUnmarshalEntry(rec.Data) + t.Logf("Validating normal entry: %v", e) + recData := fmt.Sprintf("waldata%d", e.Index) + require.Equal(t, raftpb.EntryNormal, e.Type) + require.Equal(t, recData, string(e.Data)) + } + rec = &walpb.Record{} + } + require.NoError(t, f.Close()) + + // Verify w.ReadAll() returns io.ErrUnexpectedEOF in the error chain. + t.Log("Verify the w.ReadAll returns io.ErrUnexpectedEOF in the error chain") + newFileName := filepath.Join(filepath.Dir(fileName), "0000000000000000-0000000000000000.wal") + require.NoError(t, os.Rename(fileName, newFileName)) + + w, err := Open(zaptest.NewLogger(t), filepath.Dir(fileName), walpb.Snapshot{ + Index: 0, + Term: 0, + }) + require.NoError(t, err) + defer w.Close() + + _, _, _, err = w.ReadAll() + // Note: The wal file will be repaired automatically in production + // environment, but only once. + require.ErrorIs(t, err, io.ErrUnexpectedEOF) +} diff --git a/tools/etcd-dump-logs/raw.go b/tools/etcd-dump-logs/raw.go index 6a76022e8..2d4540629 100644 --- a/tools/etcd-dump-logs/raw.go +++ b/tools/etcd-dump-logs/raw.go @@ -69,6 +69,9 @@ func readRaw(fromIndex *uint64, waldir string, out io.Writer) { if errors.Is(err, io.EOF) { fmt.Fprintf(out, "EOF: All entries were processed.\n") break + } else if errors.Is(err, io.ErrUnexpectedEOF) { + fmt.Fprintf(out, "ErrUnexpectedEOF: The last record might be corrupted, error: %v.\n", err) + break } else { log.Printf("Error: Reading failed: %v", err) break From 5ef713c7289046944afa3b9c55a744ef7a60b25d Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sun, 8 Jan 2023 04:55:34 +0800 Subject: [PATCH 2/2] remove the dependency on the deprecated io/ioutil Reference: https://go.dev/doc/go1.16#ioutil Signed-off-by: Benjamin Wang --- tools/etcd-dump-logs/raw.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tools/etcd-dump-logs/raw.go b/tools/etcd-dump-logs/raw.go index 2d4540629..2c1bed769 100644 --- a/tools/etcd-dump-logs/raw.go +++ b/tools/etcd-dump-logs/raw.go @@ -18,7 +18,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "log" "os" "path/filepath" @@ -33,11 +32,15 @@ import ( func readRaw(fromIndex *uint64, waldir string, out io.Writer) { var walReaders []fileutil.FileReader - files, err := ioutil.ReadDir(waldir) + dirEntry, err := os.ReadDir(waldir) if err != nil { log.Fatalf("Error: Failed to read directory '%s' error:%v", waldir, err) } - for _, finfo := range files { + for _, e := range dirEntry { + finfo, err := e.Info() + if err != nil { + log.Fatalf("Error: failed to get fileInfo of file: %s, error: %v", e.Name(), err) + } if filepath.Ext(finfo.Name()) != ".wal" { log.Printf("Warning: Ignoring not .wal file: %s", finfo.Name()) continue