Merge pull request #11598 from jingyih/remove_capnslog_in_wal

wal: remove capnslog
This commit is contained in:
Sahdev Zala 2020-02-06 13:16:48 -05:00 committed by GitHub
commit 6b17141ece
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 98 additions and 177 deletions

View File

@ -41,6 +41,9 @@ type filePipeline struct {
} }
func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline { func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline {
if lg == nil {
lg = zap.NewNop()
}
fp := &filePipeline{ fp := &filePipeline{
lg: lg, lg: lg,
dir: dir, dir: dir,
@ -75,11 +78,7 @@ func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
return nil, err return nil, err
} }
if err = fileutil.Preallocate(f.File, fp.size, true); err != nil { if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
if fp.lg != nil { fp.lg.Error("failed to preallocate space when creating a new WAL", zap.Int64("size", fp.size), zap.Error(err))
fp.lg.Warn("failed to preallocate space when creating a new WAL", zap.Int64("size", fp.size), zap.Error(err))
} else {
plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
}
f.Close() f.Close()
return nil, err return nil, err
} }

View File

@ -28,17 +28,16 @@ import (
// Repair tries to repair ErrUnexpectedEOF in the // Repair tries to repair ErrUnexpectedEOF in the
// last wal file by truncating. // last wal file by truncating.
func Repair(lg *zap.Logger, dirpath string) bool { func Repair(lg *zap.Logger, dirpath string) bool {
if lg == nil {
lg = zap.NewNop()
}
f, err := openLast(lg, dirpath) f, err := openLast(lg, dirpath)
if err != nil { if err != nil {
return false return false
} }
defer f.Close() defer f.Close()
if lg != nil { lg.Info("repairing", zap.String("path", f.Name()))
lg.Info("repairing", zap.String("path", f.Name()))
} else {
plog.Noticef("repairing %v", f.Name())
}
rec := &walpb.Record{} rec := &walpb.Record{}
decoder := newDecoder(f) decoder := newDecoder(f)
@ -61,70 +60,42 @@ func Repair(lg *zap.Logger, dirpath string) bool {
continue continue
case io.EOF: case io.EOF:
if lg != nil { lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF))
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF))
}
return true return true
case io.ErrUnexpectedEOF: case io.ErrUnexpectedEOF:
bf, bferr := os.Create(f.Name() + ".broken") bf, bferr := os.Create(f.Name() + ".broken")
if bferr != nil { if bferr != nil {
if lg != nil { lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr))
lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr))
} else {
plog.Errorf("could not repair %v, failed to create backup file", f.Name())
}
return false return false
} }
defer bf.Close() defer bf.Close()
if _, err = f.Seek(0, io.SeekStart); err != nil { if _, err = f.Seek(0, io.SeekStart); err != nil {
if lg != nil { lg.Warn("failed to read file", zap.String("path", f.Name()), zap.Error(err))
lg.Warn("failed to read file", zap.String("path", f.Name()), zap.Error(err))
} else {
plog.Errorf("could not repair %v, failed to read file", f.Name())
}
return false return false
} }
if _, err = io.Copy(bf, f); err != nil { if _, err = io.Copy(bf, f); err != nil {
if lg != nil { lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name()), zap.Error(err))
lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name()), zap.Error(err))
} else {
plog.Errorf("could not repair %v, failed to copy file", f.Name())
}
return false return false
} }
if err = f.Truncate(lastOffset); err != nil { if err = f.Truncate(lastOffset); err != nil {
if lg != nil { lg.Warn("failed to truncate", zap.String("path", f.Name()), zap.Error(err))
lg.Warn("failed to truncate", zap.String("path", f.Name()), zap.Error(err))
} else {
plog.Errorf("could not repair %v, failed to truncate file", f.Name())
}
return false return false
} }
if err = fileutil.Fsync(f.File); err != nil { if err = fileutil.Fsync(f.File); err != nil {
if lg != nil { lg.Warn("failed to fsync", zap.String("path", f.Name()), zap.Error(err))
lg.Warn("failed to fsync", zap.String("path", f.Name()), zap.Error(err))
} else {
plog.Errorf("could not repair %v, failed to sync file", f.Name())
}
return false return false
} }
if lg != nil { lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.ErrUnexpectedEOF))
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.ErrUnexpectedEOF))
}
return true return true
default: default:
if lg != nil { lg.Warn("failed to repair", zap.String("path", f.Name()), zap.Error(err))
lg.Warn("failed to repair", zap.String("path", f.Name()), zap.Error(err))
} else {
plog.Errorf("could not repair error (%v)", err)
}
return false return false
} }
} }

View File

@ -43,11 +43,7 @@ func searchIndex(lg *zap.Logger, names []string, index uint64) (int, bool) {
name := names[i] name := names[i]
_, curIndex, err := parseWALName(name) _, curIndex, err := parseWALName(name)
if err != nil { if err != nil {
if lg != nil { lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err))
lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err))
} else {
plog.Panicf("parse correct name should never fail: %v", err)
}
} }
if index >= curIndex { if index >= curIndex {
return i, true return i, true
@ -63,11 +59,7 @@ func isValidSeq(lg *zap.Logger, names []string) bool {
for _, name := range names { for _, name := range names {
curSeq, _, err := parseWALName(name) curSeq, _, err := parseWALName(name)
if err != nil { if err != nil {
if lg != nil { lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err))
lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err))
} else {
plog.Panicf("parse correct name should never fail: %v", err)
}
} }
if lastSeq != 0 && lastSeq != curSeq-1 { if lastSeq != 0 && lastSeq != curSeq-1 {
return false return false
@ -95,14 +87,10 @@ func checkWalNames(lg *zap.Logger, names []string) []string {
if _, _, err := parseWALName(name); err != nil { if _, _, err := parseWALName(name); err != nil {
// don't complain about left over tmp files // don't complain about left over tmp files
if !strings.HasSuffix(name, ".tmp") { if !strings.HasSuffix(name, ".tmp") {
if lg != nil { lg.Warn(
lg.Warn( "ignored file in WAL directory",
"ignored file in WAL directory", zap.String("path", name),
zap.String("path", name), )
)
} else {
plog.Warningf("ignored file %v in wal", name)
}
} }
continue continue
} }

View File

@ -31,7 +31,6 @@ import (
"go.etcd.io/etcd/raft/raftpb" "go.etcd.io/etcd/raft/raftpb"
"go.etcd.io/etcd/wal/walpb" "go.etcd.io/etcd/wal/walpb"
"github.com/coreos/pkg/capnslog"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -54,8 +53,6 @@ var (
// so that tests can set a different segment size. // so that tests can set a different segment size.
SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB
plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "wal")
ErrMetadataConflict = errors.New("wal: conflicting metadata found") ErrMetadataConflict = errors.New("wal: conflicting metadata found")
ErrFileNotFound = errors.New("wal: file not found") ErrFileNotFound = errors.New("wal: file not found")
ErrCRCMismatch = errors.New("wal: crc mismatch") ErrCRCMismatch = errors.New("wal: crc mismatch")
@ -99,6 +96,10 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
return nil, os.ErrExist return nil, os.ErrExist
} }
if lg == nil {
lg = zap.NewNop()
}
// keep temporary wal directory so WAL initialization appears atomic // keep temporary wal directory so WAL initialization appears atomic
tmpdirpath := filepath.Clean(dirpath) + ".tmp" tmpdirpath := filepath.Clean(dirpath) + ".tmp"
if fileutil.Exist(tmpdirpath) { if fileutil.Exist(tmpdirpath) {
@ -107,48 +108,40 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
} }
} }
if err := fileutil.CreateDirAll(tmpdirpath); err != nil { if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
if lg != nil { lg.Warn(
lg.Warn( "failed to create a temporary WAL directory",
"failed to create a temporary WAL directory", zap.String("tmp-dir-path", tmpdirpath),
zap.String("tmp-dir-path", tmpdirpath), zap.String("dir-path", dirpath),
zap.String("dir-path", dirpath), zap.Error(err),
zap.Error(err), )
)
}
return nil, err return nil, err
} }
p := filepath.Join(tmpdirpath, walName(0, 0)) p := filepath.Join(tmpdirpath, walName(0, 0))
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode) f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err != nil { if err != nil {
if lg != nil { lg.Warn(
lg.Warn( "failed to flock an initial WAL file",
"failed to flock an initial WAL file", zap.String("path", p),
zap.String("path", p), zap.Error(err),
zap.Error(err), )
)
}
return nil, err return nil, err
} }
if _, err = f.Seek(0, io.SeekEnd); err != nil { if _, err = f.Seek(0, io.SeekEnd); err != nil {
if lg != nil { lg.Warn(
lg.Warn( "failed to seek an initial WAL file",
"failed to seek an initial WAL file", zap.String("path", p),
zap.String("path", p), zap.Error(err),
zap.Error(err), )
)
}
return nil, err return nil, err
} }
if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil { if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
if lg != nil { lg.Warn(
lg.Warn( "failed to preallocate an initial WAL file",
"failed to preallocate an initial WAL file", zap.String("path", p),
zap.String("path", p), zap.Int64("segment-bytes", SegmentSizeBytes),
zap.Int64("segment-bytes", SegmentSizeBytes), zap.Error(err),
zap.Error(err), )
)
}
return nil, err return nil, err
} }
@ -173,14 +166,12 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
} }
if w, err = w.renameWAL(tmpdirpath); err != nil { if w, err = w.renameWAL(tmpdirpath); err != nil {
if lg != nil { lg.Warn(
lg.Warn( "failed to rename the temporary WAL directory",
"failed to rename the temporary WAL directory", zap.String("tmp-dir-path", tmpdirpath),
zap.String("tmp-dir-path", tmpdirpath), zap.String("dir-path", w.dir),
zap.String("dir-path", w.dir), zap.Error(err),
zap.Error(err), )
)
}
return nil, err return nil, err
} }
@ -194,36 +185,30 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
// directory was renamed; sync parent dir to persist rename // directory was renamed; sync parent dir to persist rename
pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir)) pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
if perr != nil { if perr != nil {
if lg != nil { lg.Warn(
lg.Warn( "failed to open the parent data directory",
"failed to open the parent data directory", zap.String("parent-dir-path", filepath.Dir(w.dir)),
zap.String("parent-dir-path", filepath.Dir(w.dir)), zap.String("dir-path", w.dir),
zap.String("dir-path", w.dir), zap.Error(perr),
zap.Error(perr), )
)
}
return nil, perr return nil, perr
} }
if perr = fileutil.Fsync(pdir); perr != nil { if perr = fileutil.Fsync(pdir); perr != nil {
if lg != nil { lg.Warn(
lg.Warn( "failed to fsync the parent data directory file",
"failed to fsync the parent data directory file", zap.String("parent-dir-path", filepath.Dir(w.dir)),
zap.String("parent-dir-path", filepath.Dir(w.dir)), zap.String("dir-path", w.dir),
zap.String("dir-path", w.dir), zap.Error(perr),
zap.Error(perr), )
)
}
return nil, perr return nil, perr
} }
if perr = pdir.Close(); perr != nil { if perr = pdir.Close(); perr != nil {
if lg != nil { lg.Warn(
lg.Warn( "failed to close the parent data directory file",
"failed to close the parent data directory file", zap.String("parent-dir-path", filepath.Dir(w.dir)),
zap.String("parent-dir-path", filepath.Dir(w.dir)), zap.String("dir-path", w.dir),
zap.String("dir-path", w.dir), zap.Error(perr),
zap.Error(perr), )
)
}
return nil, perr return nil, perr
} }
@ -233,24 +218,16 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
func (w *WAL) cleanupWAL(lg *zap.Logger) { func (w *WAL) cleanupWAL(lg *zap.Logger) {
var err error var err error
if err = w.Close(); err != nil { if err = w.Close(); err != nil {
if lg != nil { lg.Panic("failed to close WAL during cleanup", zap.Error(err))
lg.Panic("failed to close WAL during cleanup", zap.Error(err))
} else {
plog.Panicf("failed to close WAL during cleanup: %v", err)
}
} }
brokenDirName := fmt.Sprintf("%s.broken.%v", w.dir, time.Now().Format("20060102.150405.999999")) brokenDirName := fmt.Sprintf("%s.broken.%v", w.dir, time.Now().Format("20060102.150405.999999"))
if err = os.Rename(w.dir, brokenDirName); err != nil { if err = os.Rename(w.dir, brokenDirName); err != nil {
if lg != nil { lg.Panic(
lg.Panic( "failed to rename WAL during cleanup",
"failed to rename WAL during cleanup", zap.Error(err),
zap.Error(err), zap.String("source-path", w.dir),
zap.String("source-path", w.dir), zap.String("rename-path", brokenDirName),
zap.String("rename-path", brokenDirName), )
)
} else {
plog.Panicf("failed to rename WAL during cleanup: %v", err)
}
} }
} }
@ -279,15 +256,11 @@ func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) {
func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) { func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
// rename of directory with locked files doesn't work on windows/cifs; // rename of directory with locked files doesn't work on windows/cifs;
// close the WAL to release the locks so the directory can be renamed. // close the WAL to release the locks so the directory can be renamed.
if w.lg != nil { w.lg.Info(
w.lg.Info( "closing WAL to release flock and retry directory renaming",
"closing WAL to release flock and retry directory renaming", zap.String("from", tmpdirpath),
zap.String("from", tmpdirpath), zap.String("to", w.dir),
zap.String("to", w.dir), )
)
} else {
plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir)
}
w.Close() w.Close()
if err := os.Rename(tmpdirpath, w.dir); err != nil { if err := os.Rename(tmpdirpath, w.dir); err != nil {
@ -330,6 +303,9 @@ func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, err
} }
func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
if lg == nil {
lg = zap.NewNop()
}
names, nameIndex, err := selectWALFiles(lg, dirpath, snap) names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
if err != nil { if err != nil {
return nil, err return nil, err
@ -543,6 +519,9 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) error {
rec := &walpb.Record{} rec := &walpb.Record{}
if lg == nil {
lg = zap.NewNop()
}
names, nameIndex, err := selectWALFiles(lg, walDir, snap) names, nameIndex, err := selectWALFiles(lg, walDir, snap)
if err != nil { if err != nil {
return err return err
@ -690,11 +669,7 @@ func (w *WAL) cut() error {
return err return err
} }
if w.lg != nil { w.lg.Info("created a new WAL segment", zap.String("path", fpath))
w.lg.Info("created a new WAL segment", zap.String("path", fpath))
} else {
plog.Infof("segmented wal file %v is created", fpath)
}
return nil return nil
} }
@ -709,15 +684,11 @@ func (w *WAL) sync() error {
took := time.Since(start) took := time.Since(start)
if took > warnSyncDuration { if took > warnSyncDuration {
if w.lg != nil { w.lg.Warn(
w.lg.Warn( "slow fdatasync",
"slow fdatasync", zap.Duration("took", took),
zap.Duration("took", took), zap.Duration("expected-duration", warnSyncDuration),
zap.Duration("expected-duration", warnSyncDuration), )
)
} else {
plog.Warningf("sync duration of %v, expected less than %v", took, warnSyncDuration)
}
} }
walFsyncSec.Observe(took.Seconds()) walFsyncSec.Observe(took.Seconds())
@ -791,11 +762,7 @@ func (w *WAL) Close() error {
continue continue
} }
if err := l.Close(); err != nil { if err := l.Close(); err != nil {
if w.lg != nil { w.lg.Error("failed to close WAL", zap.Error(err))
w.lg.Warn("failed to close WAL", zap.Error(err))
} else {
plog.Errorf("failed to unlock during closing wal: %s", err)
}
} }
} }
@ -893,11 +860,7 @@ func (w *WAL) seq() uint64 {
} }
seq, _, err := parseWALName(filepath.Base(t.Name())) seq, _, err := parseWALName(filepath.Base(t.Name()))
if err != nil { if err != nil {
if w.lg != nil { w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err))
w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err))
} else {
plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
}
} }
return seq return seq
} }