diff --git a/wal/doc.go b/wal/doc.go index a3abd6961..7ea348e4a 100644 --- a/wal/doc.go +++ b/wal/doc.go @@ -21,7 +21,7 @@ segmented WAL files. Inside of each file the raft state and entries are appended to it with the Save method: metadata := []byte{} - w, err := wal.Create("/var/lib/etcd", metadata) + w, err := wal.Create(zap.NewExample(), "/var/lib/etcd", metadata) ... err := w.Save(s, ents) diff --git a/wal/file_pipeline.go b/wal/file_pipeline.go index 3a1c57c1c..f6d6433f6 100644 --- a/wal/file_pipeline.go +++ b/wal/file_pipeline.go @@ -20,10 +20,14 @@ import ( "path/filepath" "github.com/coreos/etcd/pkg/fileutil" + + "go.uber.org/zap" ) // filePipeline pipelines allocating disk space type filePipeline struct { + lg *zap.Logger + // dir to put files dir string // size of files to make, in bytes @@ -36,8 +40,9 @@ type filePipeline struct { donec chan struct{} } -func newFilePipeline(dir string, fileSize int64) *filePipeline { +func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline { fp := &filePipeline{ + lg: lg, dir: dir, size: fileSize, filec: make(chan *fileutil.LockedFile), diff --git a/wal/repair.go b/wal/repair.go index 091036b57..2a7a39fe9 100644 --- a/wal/repair.go +++ b/wal/repair.go @@ -21,12 +21,14 @@ import ( "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/wal/walpb" + + "go.uber.org/zap" ) // Repair tries to repair ErrUnexpectedEOF in the // last wal file by truncating. -func Repair(dirpath string) bool { - f, err := openLast(dirpath) +func Repair(lg *zap.Logger, dirpath string) bool { + f, err := openLast(lg, dirpath) if err != nil { return false } @@ -51,46 +53,77 @@ func Repair(dirpath string) bool { decoder.updateCRC(rec.Crc) } continue + case io.EOF: return true + case io.ErrUnexpectedEOF: - plog.Noticef("repairing %v", f.Name()) + if lg != nil { + lg.Info("repairing", zap.String("path", f.Name())) + } else { + plog.Noticef("repairing %v", f.Name()) + } bf, bferr := os.Create(f.Name() + ".broken") if bferr != nil { - plog.Errorf("could not repair %v, failed to create backup file", f.Name()) + if lg != nil { + lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken")) + } else { + plog.Errorf("could not repair %v, failed to create backup file", f.Name()) + } return false } defer bf.Close() if _, err = f.Seek(0, io.SeekStart); err != nil { - plog.Errorf("could not repair %v, failed to read file", f.Name()) + if lg != nil { + lg.Warn("failed to read file", zap.String("path", f.Name())) + } else { + plog.Errorf("could not repair %v, failed to read file", f.Name()) + } return false } if _, err = io.Copy(bf, f); err != nil { - plog.Errorf("could not repair %v, failed to copy file", f.Name()) + if lg != nil { + lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name())) + } else { + plog.Errorf("could not repair %v, failed to copy file", f.Name()) + } return false } if err = f.Truncate(int64(lastOffset)); err != nil { - plog.Errorf("could not repair %v, failed to truncate file", f.Name()) + if lg != nil { + lg.Warn("failed to truncate", zap.String("path", f.Name())) + } else { + plog.Errorf("could not repair %v, failed to truncate file", f.Name()) + } return false } if err = fileutil.Fsync(f.File); err != nil { - plog.Errorf("could not repair %v, failed to sync file", f.Name()) + if lg != nil { + lg.Warn("failed to fsync", zap.String("path", f.Name())) + } else { + plog.Errorf("could not repair %v, failed to sync file", f.Name()) + } return false } return true + default: - plog.Errorf("could not repair error (%v)", err) + if lg != nil { + lg.Warn("failed to repair", zap.Error(err)) + } else { + plog.Errorf("could not repair error (%v)", err) + } return false } } } // openLast opens the last wal file for read and write. -func openLast(dirpath string) (*fileutil.LockedFile, error) { - names, err := readWalNames(dirpath) +func openLast(lg *zap.Logger, dirpath string) (*fileutil.LockedFile, error) { + names, err := readWalNames(lg, dirpath) if err != nil { return nil, err } diff --git a/wal/repair_test.go b/wal/repair_test.go index be9c016cb..3deab99da 100644 --- a/wal/repair_test.go +++ b/wal/repair_test.go @@ -23,6 +23,8 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/wal/walpb" + + "go.uber.org/zap" ) type corruptFunc func(string, int64) error @@ -30,7 +32,7 @@ type corruptFunc func(string, int64) error // TestRepairTruncate ensures a truncated file can be repaired func TestRepairTruncate(t *testing.T) { corruptf := func(p string, offset int64) error { - f, err := openLast(p) + f, err := openLast(zap.NewExample(), p) if err != nil { return err } @@ -48,7 +50,7 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect } defer os.RemoveAll(p) // create WAL - w, err := Create(p, nil) + w, err := Create(zap.NewExample(), p, nil) defer func() { if err = w.Close(); err != nil { t.Fatal(err) @@ -76,7 +78,7 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect } // verify we broke the wal - w, err = Open(p, walpb.Snapshot{}) + w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) if err != nil { t.Fatal(err) } @@ -87,12 +89,12 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect w.Close() // repair the wal - if ok := Repair(p); !ok { + if ok := Repair(zap.NewExample(), p); !ok { t.Fatalf("fix = %t, want %t", ok, true) } // read it back - w, err = Open(p, walpb.Snapshot{}) + w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) if err != nil { t.Fatal(err) } @@ -114,7 +116,7 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect w.Close() // read back entries following repair, ensure it's all there - w, err = Open(p, walpb.Snapshot{}) + w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) if err != nil { t.Fatal(err) } @@ -138,7 +140,7 @@ func makeEnts(ents int) (ret [][]raftpb.Entry) { // that straddled two sectors. func TestRepairWriteTearLast(t *testing.T) { corruptf := func(p string, offset int64) error { - f, err := openLast(p) + f, err := openLast(zap.NewExample(), p) if err != nil { return err } @@ -162,7 +164,7 @@ func TestRepairWriteTearLast(t *testing.T) { // in the middle of a record. func TestRepairWriteTearMiddle(t *testing.T) { corruptf := func(p string, offset int64) error { - f, err := openLast(p) + f, err := openLast(zap.NewExample(), p) if err != nil { return err } diff --git a/wal/util.go b/wal/util.go index 5c56e2288..03cc242da 100644 --- a/wal/util.go +++ b/wal/util.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/coreos/etcd/pkg/fileutil" + "go.uber.org/zap" ) var ( @@ -67,25 +68,33 @@ func isValidSeq(names []string) bool { } return true } -func readWalNames(dirpath string) ([]string, error) { + +func readWalNames(lg *zap.Logger, dirpath string) ([]string, error) { names, err := fileutil.ReadDir(dirpath) if err != nil { return nil, err } - wnames := checkWalNames(names) + wnames := checkWalNames(lg, names) if len(wnames) == 0 { return nil, ErrFileNotFound } return wnames, nil } -func checkWalNames(names []string) []string { +func checkWalNames(lg *zap.Logger, names []string) []string { wnames := make([]string, 0) for _, name := range names { if _, _, err := parseWalName(name); err != nil { // don't complain about left over tmp files if !strings.HasSuffix(name, ".tmp") { - plog.Warningf("ignored file %v in wal", name) + if lg != nil { + lg.Warn( + "ignored file in WAL directory", + zap.String("path", name), + ) + } else { + plog.Warningf("ignored file %v in wal", name) + } } continue } diff --git a/wal/wal.go b/wal/wal.go index 96d01a23a..23925b389 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -32,6 +32,7 @@ import ( "github.com/coreos/etcd/wal/walpb" "github.com/coreos/pkg/capnslog" + "go.uber.org/zap" ) const ( @@ -69,6 +70,8 @@ var ( // A just opened WAL is in read mode, and ready for reading records. // The WAL will be ready for appending after reading out all the previous records. type WAL struct { + lg *zap.Logger + dir string // the living directory of the underlay files // dirFile is a fd for the wal directory for syncing on Rename @@ -91,7 +94,7 @@ type WAL struct { // Create creates a WAL ready for appending records. The given metadata is // recorded at the head of each WAL file, and can be retrieved with ReadAll. -func Create(dirpath string, metadata []byte) (*WAL, error) { +func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) { if Exist(dirpath) { return nil, os.ErrExist } @@ -120,6 +123,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { } w := &WAL{ + lg: lg, dir: dirpath, metadata: metadata, } @@ -173,7 +177,7 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) { } return nil, err } - w.fp = newFilePipeline(w.dir, SegmentSizeBytes) + w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes) df, err := fileutil.OpenDir(w.dir) w.dirFile = df return w, err @@ -182,13 +186,22 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) { func (w *WAL) renameWalUnlock(tmpdirpath string) (*WAL, error) { // rename of directory with locked files doesn't work on windows/cifs; // close the WAL to release the locks so the directory can be renamed. - plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir) + if w.lg != nil { + w.lg.Info( + "releasing flock to rename", + zap.String("from", tmpdirpath), + zap.String("to", w.dir), + ) + } else { + plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir) + } + w.Close() if err := os.Rename(tmpdirpath, w.dir); err != nil { return nil, err } // reopen and relock - newWAL, oerr := Open(w.dir, walpb.Snapshot{}) + newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{}) if oerr != nil { return nil, oerr } @@ -205,8 +218,8 @@ func (w *WAL) renameWalUnlock(tmpdirpath string) (*WAL, error) { // The returned WAL is ready to read and the first record will be the one after // the given snap. The WAL cannot be appended to before reading out all of its // previous records. -func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) { - w, err := openAtIndex(dirpath, snap, true) +func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) { + w, err := openAtIndex(lg, dirpath, snap, true) if err != nil { return nil, err } @@ -218,12 +231,12 @@ func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) { // OpenForRead only opens the wal files for read. // Write on a read only wal panics. -func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) { - return openAtIndex(dirpath, snap, false) +func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) { + return openAtIndex(lg, dirpath, snap, false) } -func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { - names, err := readWalNames(dirpath) +func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { + names, err := readWalNames(lg, dirpath) if err != nil { return nil, err } @@ -263,6 +276,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) // create a WAL ready for reading w := &WAL{ + lg: lg, dir: dirpath, start: snap, decoder: newDecoder(rs...), @@ -278,7 +292,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) closer() return nil, err } - w.fp = newFilePipeline(w.dir, SegmentSizeBytes) + w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes) } return w, nil @@ -473,7 +487,11 @@ func (w *WAL) cut() error { return err } - plog.Infof("segmented wal file %v is created", fpath) + if w.lg != nil { + + } else { + plog.Infof("segmented wal file %v is created", fpath) + } return nil } @@ -486,12 +504,20 @@ func (w *WAL) sync() error { start := time.Now() err := fileutil.Fdatasync(w.tail().File) - duration := time.Since(start) - if duration > warnSyncDuration { - plog.Warningf("sync duration of %v, expected less than %v", duration, warnSyncDuration) + took := time.Since(start) + if took > warnSyncDuration { + if w.lg != nil { + w.lg.Warn( + "slow fdatasync", + zap.Duration("took", took), + zap.Duration("expected-duration", warnSyncDuration), + ) + } else { + plog.Warningf("sync duration of %v, expected less than %v", took, warnSyncDuration) + } } - syncDurations.Observe(duration.Seconds()) + syncDurations.Observe(took.Seconds()) return err } @@ -562,7 +588,11 @@ func (w *WAL) Close() error { continue } if err := l.Close(); err != nil { - plog.Errorf("failed to unlock during closing wal: %s", err) + if w.lg != nil { + w.lg.Error("failed to close WAL", zap.Error(err)) + } else { + plog.Errorf("failed to unlock during closing wal: %s", err) + } } } @@ -660,7 +690,11 @@ func (w *WAL) seq() uint64 { } seq, _, err := parseWalName(filepath.Base(t.Name())) if err != nil { - plog.Fatalf("bad wal name %s (%v)", t.Name(), err) + if w.lg != nil { + w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err)) + } else { + plog.Fatalf("bad wal name %s (%v)", t.Name(), err) + } } return seq } diff --git a/wal/wal_bench_test.go b/wal/wal_bench_test.go index 914e61c77..ae0142cee 100644 --- a/wal/wal_bench_test.go +++ b/wal/wal_bench_test.go @@ -19,6 +19,8 @@ import ( "os" "testing" + "go.uber.org/zap" + "github.com/coreos/etcd/raft/raftpb" ) @@ -41,7 +43,7 @@ func benchmarkWriteEntry(b *testing.B, size int, batch int) { } defer os.RemoveAll(p) - w, err := Create(p, []byte("somedata")) + w, err := Create(zap.NewExample(), p, []byte("somedata")) if err != nil { b.Fatalf("err = %v, want nil", err) } diff --git a/wal/wal_test.go b/wal/wal_test.go index 71fd7c177..00ccab68b 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -27,6 +27,8 @@ import ( "github.com/coreos/etcd/pkg/pbutil" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/wal/walpb" + + "go.uber.org/zap" ) func TestNew(t *testing.T) { @@ -36,7 +38,7 @@ func TestNew(t *testing.T) { } defer os.RemoveAll(p) - w, err := Create(p, []byte("somedata")) + w, err := Create(zap.NewExample(), p, []byte("somedata")) if err != nil { t.Fatalf("err = %v, want nil", err) } @@ -91,7 +93,7 @@ func TestNewForInitedDir(t *testing.T) { defer os.RemoveAll(p) os.Create(filepath.Join(p, walName(0, 0))) - if _, err = Create(p, nil); err == nil || err != os.ErrExist { + if _, err = Create(zap.NewExample(), p, nil); err == nil || err != os.ErrExist { t.Errorf("err = %v, want %v", err, os.ErrExist) } } @@ -109,7 +111,7 @@ func TestOpenAtIndex(t *testing.T) { } f.Close() - w, err := Open(dir, walpb.Snapshot{}) + w, err := Open(zap.NewExample(), dir, walpb.Snapshot{}) if err != nil { t.Fatalf("err = %v, want nil", err) } @@ -128,7 +130,7 @@ func TestOpenAtIndex(t *testing.T) { } f.Close() - w, err = Open(dir, walpb.Snapshot{Index: 5}) + w, err = Open(zap.NewExample(), dir, walpb.Snapshot{Index: 5}) if err != nil { t.Fatalf("err = %v, want nil", err) } @@ -145,7 +147,7 @@ func TestOpenAtIndex(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(emptydir) - if _, err = Open(emptydir, walpb.Snapshot{}); err != ErrFileNotFound { + if _, err = Open(zap.NewExample(), emptydir, walpb.Snapshot{}); err != ErrFileNotFound { t.Errorf("err = %v, want %v", err, ErrFileNotFound) } } @@ -158,7 +160,7 @@ func TestCut(t *testing.T) { } defer os.RemoveAll(p) - w, err := Create(p, nil) + w, err := Create(zap.NewExample(), p, nil) if err != nil { t.Fatal(err) } @@ -220,7 +222,7 @@ func TestSaveWithCut(t *testing.T) { } defer os.RemoveAll(p) - w, err := Create(p, []byte("metadata")) + w, err := Create(zap.NewExample(), p, []byte("metadata")) if err != nil { t.Fatal(err) } @@ -248,7 +250,7 @@ func TestSaveWithCut(t *testing.T) { w.Close() - neww, err := Open(p, walpb.Snapshot{}) + neww, err := Open(zap.NewExample(), p, walpb.Snapshot{}) if err != nil { t.Fatalf("err = %v, want nil", err) } @@ -283,7 +285,7 @@ func TestRecover(t *testing.T) { } defer os.RemoveAll(p) - w, err := Create(p, []byte("metadata")) + w, err := Create(zap.NewExample(), p, []byte("metadata")) if err != nil { t.Fatal(err) } @@ -302,7 +304,7 @@ func TestRecover(t *testing.T) { } w.Close() - if w, err = Open(p, walpb.Snapshot{}); err != nil { + if w, err = Open(zap.NewExample(), p, walpb.Snapshot{}); err != nil { t.Fatal(err) } metadata, state, entries, err := w.ReadAll() @@ -398,7 +400,7 @@ func TestRecoverAfterCut(t *testing.T) { } defer os.RemoveAll(p) - md, err := Create(p, []byte("metadata")) + md, err := Create(zap.NewExample(), p, []byte("metadata")) if err != nil { t.Fatal(err) } @@ -421,7 +423,7 @@ func TestRecoverAfterCut(t *testing.T) { } for i := 0; i < 10; i++ { - w, err := Open(p, walpb.Snapshot{Index: uint64(i)}) + w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i)}) if err != nil { if i <= 4 { if err != ErrFileNotFound { @@ -456,7 +458,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) { } defer os.RemoveAll(p) - w, err := Create(p, nil) + w, err := Create(zap.NewExample(), p, nil) if err != nil { t.Fatal(err) } @@ -468,7 +470,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) { } w.Close() - w, err = Open(p, walpb.Snapshot{}) + w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) if err != nil { t.Fatal(err) } @@ -490,7 +492,7 @@ func TestOpenForRead(t *testing.T) { } defer os.RemoveAll(p) // create WAL - w, err := Create(p, nil) + w, err := Create(zap.NewExample(), p, nil) if err != nil { t.Fatal(err) } @@ -510,7 +512,7 @@ func TestOpenForRead(t *testing.T) { w.ReleaseLockTo(unlockIndex) // All are available for read - w2, err := OpenForRead(p, walpb.Snapshot{}) + w2, err := OpenForRead(zap.NewExample(), p, walpb.Snapshot{}) if err != nil { t.Fatal(err) } @@ -545,7 +547,7 @@ func TestReleaseLockTo(t *testing.T) { } defer os.RemoveAll(p) // create WAL - w, err := Create(p, nil) + w, err := Create(zap.NewExample(), p, nil) defer func() { if err = w.Close(); err != nil { t.Fatal(err) @@ -618,7 +620,7 @@ func TestTailWriteNoSlackSpace(t *testing.T) { defer os.RemoveAll(p) // create initial WAL - w, err := Create(p, []byte("metadata")) + w, err := Create(zap.NewExample(), p, []byte("metadata")) if err != nil { t.Fatal(err) } @@ -640,7 +642,7 @@ func TestTailWriteNoSlackSpace(t *testing.T) { w.Close() // open, write more - w, err = Open(p, walpb.Snapshot{}) + w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) if err != nil { t.Fatal(err) } @@ -661,7 +663,7 @@ func TestTailWriteNoSlackSpace(t *testing.T) { w.Close() // confirm all writes - w, err = Open(p, walpb.Snapshot{}) + w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) if err != nil { t.Fatal(err) } @@ -692,7 +694,7 @@ func TestRestartCreateWal(t *testing.T) { t.Fatal(err) } - w, werr := Create(p, []byte("abc")) + w, werr := Create(zap.NewExample(), p, []byte("abc")) if werr != nil { t.Fatal(werr) } @@ -701,7 +703,7 @@ func TestRestartCreateWal(t *testing.T) { t.Fatalf("got %q exists, expected it to not exist", tmpdir) } - if w, err = OpenForRead(p, walpb.Snapshot{}); err != nil { + if w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{}); err != nil { t.Fatal(err) } defer w.Close() @@ -722,7 +724,7 @@ func TestOpenOnTornWrite(t *testing.T) { t.Fatal(err) } defer os.RemoveAll(p) - w, err := Create(p, nil) + w, err := Create(zap.NewExample(), p, nil) defer func() { if err = w.Close(); err != nil && err != os.ErrInvalid { t.Fatal(err) @@ -764,7 +766,7 @@ func TestOpenOnTornWrite(t *testing.T) { } f.Close() - w, err = Open(p, walpb.Snapshot{}) + w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) if err != nil { t.Fatal(err) } @@ -785,7 +787,7 @@ func TestOpenOnTornWrite(t *testing.T) { w.Close() // read back the entries, confirm number of entries matches expectation - w, err = OpenForRead(p, walpb.Snapshot{}) + w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{}) if err != nil { t.Fatal(err) }