mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
wal: support structured logger
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
parent
c00c6cb685
commit
fdbedacc83
@ -21,7 +21,7 @@ segmented WAL files. Inside of each file the raft state and entries are appended
|
|||||||
to it with the Save method:
|
to it with the Save method:
|
||||||
|
|
||||||
metadata := []byte{}
|
metadata := []byte{}
|
||||||
w, err := wal.Create("/var/lib/etcd", metadata)
|
w, err := wal.Create(zap.NewExample(), "/var/lib/etcd", metadata)
|
||||||
...
|
...
|
||||||
err := w.Save(s, ents)
|
err := w.Save(s, ents)
|
||||||
|
|
||||||
|
@ -20,10 +20,14 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// filePipeline pipelines allocating disk space
|
// filePipeline pipelines allocating disk space
|
||||||
type filePipeline struct {
|
type filePipeline struct {
|
||||||
|
lg *zap.Logger
|
||||||
|
|
||||||
// dir to put files
|
// dir to put files
|
||||||
dir string
|
dir string
|
||||||
// size of files to make, in bytes
|
// size of files to make, in bytes
|
||||||
@ -36,8 +40,9 @@ type filePipeline struct {
|
|||||||
donec chan struct{}
|
donec chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFilePipeline(dir string, fileSize int64) *filePipeline {
|
func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline {
|
||||||
fp := &filePipeline{
|
fp := &filePipeline{
|
||||||
|
lg: lg,
|
||||||
dir: dir,
|
dir: dir,
|
||||||
size: fileSize,
|
size: fileSize,
|
||||||
filec: make(chan *fileutil.LockedFile),
|
filec: make(chan *fileutil.LockedFile),
|
||||||
|
@ -21,12 +21,14 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/coreos/etcd/wal/walpb"
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Repair tries to repair ErrUnexpectedEOF in the
|
// Repair tries to repair ErrUnexpectedEOF in the
|
||||||
// last wal file by truncating.
|
// last wal file by truncating.
|
||||||
func Repair(dirpath string) bool {
|
func Repair(lg *zap.Logger, dirpath string) bool {
|
||||||
f, err := openLast(dirpath)
|
f, err := openLast(lg, dirpath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@ -51,46 +53,77 @@ func Repair(dirpath string) bool {
|
|||||||
decoder.updateCRC(rec.Crc)
|
decoder.updateCRC(rec.Crc)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
||||||
case io.EOF:
|
case io.EOF:
|
||||||
return true
|
return true
|
||||||
|
|
||||||
case io.ErrUnexpectedEOF:
|
case io.ErrUnexpectedEOF:
|
||||||
|
if lg != nil {
|
||||||
|
lg.Info("repairing", zap.String("path", f.Name()))
|
||||||
|
} else {
|
||||||
plog.Noticef("repairing %v", f.Name())
|
plog.Noticef("repairing %v", f.Name())
|
||||||
|
}
|
||||||
bf, bferr := os.Create(f.Name() + ".broken")
|
bf, bferr := os.Create(f.Name() + ".broken")
|
||||||
if bferr != nil {
|
if bferr != nil {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"))
|
||||||
|
} else {
|
||||||
plog.Errorf("could not repair %v, failed to create backup file", f.Name())
|
plog.Errorf("could not repair %v, failed to create backup file", f.Name())
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
defer bf.Close()
|
defer bf.Close()
|
||||||
|
|
||||||
if _, err = f.Seek(0, io.SeekStart); err != nil {
|
if _, err = f.Seek(0, io.SeekStart); err != nil {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Warn("failed to read file", zap.String("path", f.Name()))
|
||||||
|
} else {
|
||||||
plog.Errorf("could not repair %v, failed to read file", f.Name())
|
plog.Errorf("could not repair %v, failed to read file", f.Name())
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err = io.Copy(bf, f); err != nil {
|
if _, err = io.Copy(bf, f); err != nil {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name()))
|
||||||
|
} else {
|
||||||
plog.Errorf("could not repair %v, failed to copy file", f.Name())
|
plog.Errorf("could not repair %v, failed to copy file", f.Name())
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = f.Truncate(int64(lastOffset)); err != nil {
|
if err = f.Truncate(int64(lastOffset)); err != nil {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Warn("failed to truncate", zap.String("path", f.Name()))
|
||||||
|
} else {
|
||||||
plog.Errorf("could not repair %v, failed to truncate file", f.Name())
|
plog.Errorf("could not repair %v, failed to truncate file", f.Name())
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if err = fileutil.Fsync(f.File); err != nil {
|
if err = fileutil.Fsync(f.File); err != nil {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Warn("failed to fsync", zap.String("path", f.Name()))
|
||||||
|
} else {
|
||||||
plog.Errorf("could not repair %v, failed to sync file", f.Name())
|
plog.Errorf("could not repair %v, failed to sync file", f.Name())
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
if lg != nil {
|
||||||
|
lg.Warn("failed to repair", zap.Error(err))
|
||||||
|
} else {
|
||||||
plog.Errorf("could not repair error (%v)", err)
|
plog.Errorf("could not repair error (%v)", err)
|
||||||
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// openLast opens the last wal file for read and write.
|
// openLast opens the last wal file for read and write.
|
||||||
func openLast(dirpath string) (*fileutil.LockedFile, error) {
|
func openLast(lg *zap.Logger, dirpath string) (*fileutil.LockedFile, error) {
|
||||||
names, err := readWalNames(dirpath)
|
names, err := readWalNames(lg, dirpath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,8 @@ import (
|
|||||||
|
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/wal/walpb"
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type corruptFunc func(string, int64) error
|
type corruptFunc func(string, int64) error
|
||||||
@ -30,7 +32,7 @@ type corruptFunc func(string, int64) error
|
|||||||
// TestRepairTruncate ensures a truncated file can be repaired
|
// TestRepairTruncate ensures a truncated file can be repaired
|
||||||
func TestRepairTruncate(t *testing.T) {
|
func TestRepairTruncate(t *testing.T) {
|
||||||
corruptf := func(p string, offset int64) error {
|
corruptf := func(p string, offset int64) error {
|
||||||
f, err := openLast(p)
|
f, err := openLast(zap.NewExample(), p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -48,7 +50,7 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
// create WAL
|
// create WAL
|
||||||
w, err := Create(p, nil)
|
w, err := Create(zap.NewExample(), p, nil)
|
||||||
defer func() {
|
defer func() {
|
||||||
if err = w.Close(); err != nil {
|
if err = w.Close(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -76,7 +78,7 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect
|
|||||||
}
|
}
|
||||||
|
|
||||||
// verify we broke the wal
|
// verify we broke the wal
|
||||||
w, err = Open(p, walpb.Snapshot{})
|
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -87,12 +89,12 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect
|
|||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
// repair the wal
|
// repair the wal
|
||||||
if ok := Repair(p); !ok {
|
if ok := Repair(zap.NewExample(), p); !ok {
|
||||||
t.Fatalf("fix = %t, want %t", ok, true)
|
t.Fatalf("fix = %t, want %t", ok, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
// read it back
|
// read it back
|
||||||
w, err = Open(p, walpb.Snapshot{})
|
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -114,7 +116,7 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect
|
|||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
// read back entries following repair, ensure it's all there
|
// read back entries following repair, ensure it's all there
|
||||||
w, err = Open(p, walpb.Snapshot{})
|
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -138,7 +140,7 @@ func makeEnts(ents int) (ret [][]raftpb.Entry) {
|
|||||||
// that straddled two sectors.
|
// that straddled two sectors.
|
||||||
func TestRepairWriteTearLast(t *testing.T) {
|
func TestRepairWriteTearLast(t *testing.T) {
|
||||||
corruptf := func(p string, offset int64) error {
|
corruptf := func(p string, offset int64) error {
|
||||||
f, err := openLast(p)
|
f, err := openLast(zap.NewExample(), p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -162,7 +164,7 @@ func TestRepairWriteTearLast(t *testing.T) {
|
|||||||
// in the middle of a record.
|
// in the middle of a record.
|
||||||
func TestRepairWriteTearMiddle(t *testing.T) {
|
func TestRepairWriteTearMiddle(t *testing.T) {
|
||||||
corruptf := func(p string, offset int64) error {
|
corruptf := func(p string, offset int64) error {
|
||||||
f, err := openLast(p)
|
f, err := openLast(zap.NewExample(), p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
15
wal/util.go
15
wal/util.go
@ -20,6 +20,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -67,26 +68,34 @@ func isValidSeq(names []string) bool {
|
|||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
func readWalNames(dirpath string) ([]string, error) {
|
|
||||||
|
func readWalNames(lg *zap.Logger, dirpath string) ([]string, error) {
|
||||||
names, err := fileutil.ReadDir(dirpath)
|
names, err := fileutil.ReadDir(dirpath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
wnames := checkWalNames(names)
|
wnames := checkWalNames(lg, names)
|
||||||
if len(wnames) == 0 {
|
if len(wnames) == 0 {
|
||||||
return nil, ErrFileNotFound
|
return nil, ErrFileNotFound
|
||||||
}
|
}
|
||||||
return wnames, nil
|
return wnames, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkWalNames(names []string) []string {
|
func checkWalNames(lg *zap.Logger, names []string) []string {
|
||||||
wnames := make([]string, 0)
|
wnames := make([]string, 0)
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
if _, _, err := parseWalName(name); err != nil {
|
if _, _, err := parseWalName(name); err != nil {
|
||||||
// don't complain about left over tmp files
|
// don't complain about left over tmp files
|
||||||
if !strings.HasSuffix(name, ".tmp") {
|
if !strings.HasSuffix(name, ".tmp") {
|
||||||
|
if lg != nil {
|
||||||
|
lg.Warn(
|
||||||
|
"ignored file in WAL directory",
|
||||||
|
zap.String("path", name),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Warningf("ignored file %v in wal", name)
|
plog.Warningf("ignored file %v in wal", name)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
wnames = append(wnames, name)
|
wnames = append(wnames, name)
|
||||||
|
62
wal/wal.go
62
wal/wal.go
@ -32,6 +32,7 @@ import (
|
|||||||
"github.com/coreos/etcd/wal/walpb"
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
|
|
||||||
"github.com/coreos/pkg/capnslog"
|
"github.com/coreos/pkg/capnslog"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -69,6 +70,8 @@ var (
|
|||||||
// A just opened WAL is in read mode, and ready for reading records.
|
// A just opened WAL is in read mode, and ready for reading records.
|
||||||
// The WAL will be ready for appending after reading out all the previous records.
|
// The WAL will be ready for appending after reading out all the previous records.
|
||||||
type WAL struct {
|
type WAL struct {
|
||||||
|
lg *zap.Logger
|
||||||
|
|
||||||
dir string // the living directory of the underlay files
|
dir string // the living directory of the underlay files
|
||||||
|
|
||||||
// dirFile is a fd for the wal directory for syncing on Rename
|
// dirFile is a fd for the wal directory for syncing on Rename
|
||||||
@ -91,7 +94,7 @@ type WAL struct {
|
|||||||
|
|
||||||
// Create creates a WAL ready for appending records. The given metadata is
|
// Create creates a WAL ready for appending records. The given metadata is
|
||||||
// recorded at the head of each WAL file, and can be retrieved with ReadAll.
|
// recorded at the head of each WAL file, and can be retrieved with ReadAll.
|
||||||
func Create(dirpath string, metadata []byte) (*WAL, error) {
|
func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
|
||||||
if Exist(dirpath) {
|
if Exist(dirpath) {
|
||||||
return nil, os.ErrExist
|
return nil, os.ErrExist
|
||||||
}
|
}
|
||||||
@ -120,6 +123,7 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
w := &WAL{
|
w := &WAL{
|
||||||
|
lg: lg,
|
||||||
dir: dirpath,
|
dir: dirpath,
|
||||||
metadata: metadata,
|
metadata: metadata,
|
||||||
}
|
}
|
||||||
@ -173,7 +177,7 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
|||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
|
w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes)
|
||||||
df, err := fileutil.OpenDir(w.dir)
|
df, err := fileutil.OpenDir(w.dir)
|
||||||
w.dirFile = df
|
w.dirFile = df
|
||||||
return w, err
|
return w, err
|
||||||
@ -182,13 +186,22 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
|||||||
func (w *WAL) renameWalUnlock(tmpdirpath string) (*WAL, error) {
|
func (w *WAL) renameWalUnlock(tmpdirpath string) (*WAL, error) {
|
||||||
// rename of directory with locked files doesn't work on windows/cifs;
|
// rename of directory with locked files doesn't work on windows/cifs;
|
||||||
// close the WAL to release the locks so the directory can be renamed.
|
// close the WAL to release the locks so the directory can be renamed.
|
||||||
|
if w.lg != nil {
|
||||||
|
w.lg.Info(
|
||||||
|
"releasing flock to rename",
|
||||||
|
zap.String("from", tmpdirpath),
|
||||||
|
zap.String("to", w.dir),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir)
|
plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir)
|
||||||
|
}
|
||||||
|
|
||||||
w.Close()
|
w.Close()
|
||||||
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// reopen and relock
|
// reopen and relock
|
||||||
newWAL, oerr := Open(w.dir, walpb.Snapshot{})
|
newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{})
|
||||||
if oerr != nil {
|
if oerr != nil {
|
||||||
return nil, oerr
|
return nil, oerr
|
||||||
}
|
}
|
||||||
@ -205,8 +218,8 @@ func (w *WAL) renameWalUnlock(tmpdirpath string) (*WAL, error) {
|
|||||||
// The returned WAL is ready to read and the first record will be the one after
|
// The returned WAL is ready to read and the first record will be the one after
|
||||||
// the given snap. The WAL cannot be appended to before reading out all of its
|
// the given snap. The WAL cannot be appended to before reading out all of its
|
||||||
// previous records.
|
// previous records.
|
||||||
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
||||||
w, err := openAtIndex(dirpath, snap, true)
|
w, err := openAtIndex(lg, dirpath, snap, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -218,12 +231,12 @@ func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
|||||||
|
|
||||||
// OpenForRead only opens the wal files for read.
|
// OpenForRead only opens the wal files for read.
|
||||||
// Write on a read only wal panics.
|
// Write on a read only wal panics.
|
||||||
func OpenForRead(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
||||||
return openAtIndex(dirpath, snap, false)
|
return openAtIndex(lg, dirpath, snap, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
|
func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
|
||||||
names, err := readWalNames(dirpath)
|
names, err := readWalNames(lg, dirpath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -263,6 +276,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
|
|||||||
|
|
||||||
// create a WAL ready for reading
|
// create a WAL ready for reading
|
||||||
w := &WAL{
|
w := &WAL{
|
||||||
|
lg: lg,
|
||||||
dir: dirpath,
|
dir: dirpath,
|
||||||
start: snap,
|
start: snap,
|
||||||
decoder: newDecoder(rs...),
|
decoder: newDecoder(rs...),
|
||||||
@ -278,7 +292,7 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
|
|||||||
closer()
|
closer()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
w.fp = newFilePipeline(w.dir, SegmentSizeBytes)
|
w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
return w, nil
|
return w, nil
|
||||||
@ -473,7 +487,11 @@ func (w *WAL) cut() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if w.lg != nil {
|
||||||
|
|
||||||
|
} else {
|
||||||
plog.Infof("segmented wal file %v is created", fpath)
|
plog.Infof("segmented wal file %v is created", fpath)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -486,12 +504,20 @@ func (w *WAL) sync() error {
|
|||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := fileutil.Fdatasync(w.tail().File)
|
err := fileutil.Fdatasync(w.tail().File)
|
||||||
|
|
||||||
duration := time.Since(start)
|
took := time.Since(start)
|
||||||
if duration > warnSyncDuration {
|
if took > warnSyncDuration {
|
||||||
plog.Warningf("sync duration of %v, expected less than %v", duration, warnSyncDuration)
|
if w.lg != nil {
|
||||||
|
w.lg.Warn(
|
||||||
|
"slow fdatasync",
|
||||||
|
zap.Duration("took", took),
|
||||||
|
zap.Duration("expected-duration", warnSyncDuration),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
plog.Warningf("sync duration of %v, expected less than %v", took, warnSyncDuration)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
syncDurations.Observe(duration.Seconds())
|
|
||||||
|
|
||||||
|
syncDurations.Observe(took.Seconds())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -562,9 +588,13 @@ func (w *WAL) Close() error {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := l.Close(); err != nil {
|
if err := l.Close(); err != nil {
|
||||||
|
if w.lg != nil {
|
||||||
|
w.lg.Error("failed to close WAL", zap.Error(err))
|
||||||
|
} else {
|
||||||
plog.Errorf("failed to unlock during closing wal: %s", err)
|
plog.Errorf("failed to unlock during closing wal: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return w.dirFile.Close()
|
return w.dirFile.Close()
|
||||||
}
|
}
|
||||||
@ -660,8 +690,12 @@ func (w *WAL) seq() uint64 {
|
|||||||
}
|
}
|
||||||
seq, _, err := parseWalName(filepath.Base(t.Name()))
|
seq, _, err := parseWalName(filepath.Base(t.Name()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if w.lg != nil {
|
||||||
|
w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err))
|
||||||
|
} else {
|
||||||
plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
|
plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return seq
|
return seq
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,6 +19,8 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -41,7 +43,7 @@ func benchmarkWriteEntry(b *testing.B, size int, batch int) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
w, err := Create(p, []byte("somedata"))
|
w, err := Create(zap.NewExample(), p, []byte("somedata"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("err = %v, want nil", err)
|
b.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
|
@ -27,6 +27,8 @@ import (
|
|||||||
"github.com/coreos/etcd/pkg/pbutil"
|
"github.com/coreos/etcd/pkg/pbutil"
|
||||||
"github.com/coreos/etcd/raft/raftpb"
|
"github.com/coreos/etcd/raft/raftpb"
|
||||||
"github.com/coreos/etcd/wal/walpb"
|
"github.com/coreos/etcd/wal/walpb"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestNew(t *testing.T) {
|
func TestNew(t *testing.T) {
|
||||||
@ -36,7 +38,7 @@ func TestNew(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
w, err := Create(p, []byte("somedata"))
|
w, err := Create(zap.NewExample(), p, []byte("somedata"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
@ -91,7 +93,7 @@ func TestNewForInitedDir(t *testing.T) {
|
|||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
os.Create(filepath.Join(p, walName(0, 0)))
|
os.Create(filepath.Join(p, walName(0, 0)))
|
||||||
if _, err = Create(p, nil); err == nil || err != os.ErrExist {
|
if _, err = Create(zap.NewExample(), p, nil); err == nil || err != os.ErrExist {
|
||||||
t.Errorf("err = %v, want %v", err, os.ErrExist)
|
t.Errorf("err = %v, want %v", err, os.ErrExist)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -109,7 +111,7 @@ func TestOpenAtIndex(t *testing.T) {
|
|||||||
}
|
}
|
||||||
f.Close()
|
f.Close()
|
||||||
|
|
||||||
w, err := Open(dir, walpb.Snapshot{})
|
w, err := Open(zap.NewExample(), dir, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
@ -128,7 +130,7 @@ func TestOpenAtIndex(t *testing.T) {
|
|||||||
}
|
}
|
||||||
f.Close()
|
f.Close()
|
||||||
|
|
||||||
w, err = Open(dir, walpb.Snapshot{Index: 5})
|
w, err = Open(zap.NewExample(), dir, walpb.Snapshot{Index: 5})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
@ -145,7 +147,7 @@ func TestOpenAtIndex(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(emptydir)
|
defer os.RemoveAll(emptydir)
|
||||||
if _, err = Open(emptydir, walpb.Snapshot{}); err != ErrFileNotFound {
|
if _, err = Open(zap.NewExample(), emptydir, walpb.Snapshot{}); err != ErrFileNotFound {
|
||||||
t.Errorf("err = %v, want %v", err, ErrFileNotFound)
|
t.Errorf("err = %v, want %v", err, ErrFileNotFound)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -158,7 +160,7 @@ func TestCut(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
w, err := Create(p, nil)
|
w, err := Create(zap.NewExample(), p, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -220,7 +222,7 @@ func TestSaveWithCut(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
w, err := Create(p, []byte("metadata"))
|
w, err := Create(zap.NewExample(), p, []byte("metadata"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -248,7 +250,7 @@ func TestSaveWithCut(t *testing.T) {
|
|||||||
|
|
||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
neww, err := Open(p, walpb.Snapshot{})
|
neww, err := Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
}
|
}
|
||||||
@ -283,7 +285,7 @@ func TestRecover(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
w, err := Create(p, []byte("metadata"))
|
w, err := Create(zap.NewExample(), p, []byte("metadata"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -302,7 +304,7 @@ func TestRecover(t *testing.T) {
|
|||||||
}
|
}
|
||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
if w, err = Open(p, walpb.Snapshot{}); err != nil {
|
if w, err = Open(zap.NewExample(), p, walpb.Snapshot{}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
metadata, state, entries, err := w.ReadAll()
|
metadata, state, entries, err := w.ReadAll()
|
||||||
@ -398,7 +400,7 @@ func TestRecoverAfterCut(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
md, err := Create(p, []byte("metadata"))
|
md, err := Create(zap.NewExample(), p, []byte("metadata"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -421,7 +423,7 @@ func TestRecoverAfterCut(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
w, err := Open(p, walpb.Snapshot{Index: uint64(i)})
|
w, err := Open(zap.NewExample(), p, walpb.Snapshot{Index: uint64(i)})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if i <= 4 {
|
if i <= 4 {
|
||||||
if err != ErrFileNotFound {
|
if err != ErrFileNotFound {
|
||||||
@ -456,7 +458,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
w, err := Create(p, nil)
|
w, err := Create(zap.NewExample(), p, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -468,7 +470,7 @@ func TestOpenAtUncommittedIndex(t *testing.T) {
|
|||||||
}
|
}
|
||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
w, err = Open(p, walpb.Snapshot{})
|
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -490,7 +492,7 @@ func TestOpenForRead(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
// create WAL
|
// create WAL
|
||||||
w, err := Create(p, nil)
|
w, err := Create(zap.NewExample(), p, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -510,7 +512,7 @@ func TestOpenForRead(t *testing.T) {
|
|||||||
w.ReleaseLockTo(unlockIndex)
|
w.ReleaseLockTo(unlockIndex)
|
||||||
|
|
||||||
// All are available for read
|
// All are available for read
|
||||||
w2, err := OpenForRead(p, walpb.Snapshot{})
|
w2, err := OpenForRead(zap.NewExample(), p, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -545,7 +547,7 @@ func TestReleaseLockTo(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
// create WAL
|
// create WAL
|
||||||
w, err := Create(p, nil)
|
w, err := Create(zap.NewExample(), p, nil)
|
||||||
defer func() {
|
defer func() {
|
||||||
if err = w.Close(); err != nil {
|
if err = w.Close(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -618,7 +620,7 @@ func TestTailWriteNoSlackSpace(t *testing.T) {
|
|||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
// create initial WAL
|
// create initial WAL
|
||||||
w, err := Create(p, []byte("metadata"))
|
w, err := Create(zap.NewExample(), p, []byte("metadata"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -640,7 +642,7 @@ func TestTailWriteNoSlackSpace(t *testing.T) {
|
|||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
// open, write more
|
// open, write more
|
||||||
w, err = Open(p, walpb.Snapshot{})
|
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -661,7 +663,7 @@ func TestTailWriteNoSlackSpace(t *testing.T) {
|
|||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
// confirm all writes
|
// confirm all writes
|
||||||
w, err = Open(p, walpb.Snapshot{})
|
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -692,7 +694,7 @@ func TestRestartCreateWal(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
w, werr := Create(p, []byte("abc"))
|
w, werr := Create(zap.NewExample(), p, []byte("abc"))
|
||||||
if werr != nil {
|
if werr != nil {
|
||||||
t.Fatal(werr)
|
t.Fatal(werr)
|
||||||
}
|
}
|
||||||
@ -701,7 +703,7 @@ func TestRestartCreateWal(t *testing.T) {
|
|||||||
t.Fatalf("got %q exists, expected it to not exist", tmpdir)
|
t.Fatalf("got %q exists, expected it to not exist", tmpdir)
|
||||||
}
|
}
|
||||||
|
|
||||||
if w, err = OpenForRead(p, walpb.Snapshot{}); err != nil {
|
if w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer w.Close()
|
defer w.Close()
|
||||||
@ -722,7 +724,7 @@ func TestOpenOnTornWrite(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer os.RemoveAll(p)
|
defer os.RemoveAll(p)
|
||||||
w, err := Create(p, nil)
|
w, err := Create(zap.NewExample(), p, nil)
|
||||||
defer func() {
|
defer func() {
|
||||||
if err = w.Close(); err != nil && err != os.ErrInvalid {
|
if err = w.Close(); err != nil && err != os.ErrInvalid {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -764,7 +766,7 @@ func TestOpenOnTornWrite(t *testing.T) {
|
|||||||
}
|
}
|
||||||
f.Close()
|
f.Close()
|
||||||
|
|
||||||
w, err = Open(p, walpb.Snapshot{})
|
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -785,7 +787,7 @@ func TestOpenOnTornWrite(t *testing.T) {
|
|||||||
w.Close()
|
w.Close()
|
||||||
|
|
||||||
// read back the entries, confirm number of entries matches expectation
|
// read back the entries, confirm number of entries matches expectation
|
||||||
w, err = OpenForRead(p, walpb.Snapshot{})
|
w, err = OpenForRead(zap.NewExample(), p, walpb.Snapshot{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user