wal: add "etcd_wal_writes_bytes_total"

Signed-off-by: Gyuho Lee <leegyuho@amazon.com>
This commit is contained in:
Gyuho Lee 2020-03-31 20:31:32 -07:00
parent 4051e3e36a
commit af261f1a1b
4 changed files with 24 additions and 5 deletions

View File

@ -92,7 +92,8 @@ func (e *encoder) encode(rec *walpb.Record) error {
if padBytes != 0 { if padBytes != 0 {
data = append(data, make([]byte, padBytes)...) data = append(data, make([]byte, padBytes)...)
} }
_, err = e.bw.Write(data) n, err = e.bw.Write(data)
walWriteBytes.Add(float64(n))
return err return err
} }
@ -108,13 +109,16 @@ func encodeFrameSize(dataBytes int) (lenField uint64, padBytes int) {
func (e *encoder) flush() error { func (e *encoder) flush() error {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() n, err := e.bw.FlushN()
return e.bw.Flush() e.mu.Unlock()
walWriteBytes.Add(float64(n))
return err
} }
func writeUint64(w io.Writer, n uint64, buf []byte) error { func writeUint64(w io.Writer, n uint64, buf []byte) error {
// http://golang.org/src/encoding/binary/binary.go // http://golang.org/src/encoding/binary/binary.go
binary.LittleEndian.PutUint64(buf, n) binary.LittleEndian.PutUint64(buf, n)
_, err := w.Write(buf) nv, err := w.Write(buf)
walWriteBytes.Add(float64(nv))
return err return err
} }

View File

@ -27,8 +27,16 @@ var (
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec // highest bucket start of 0.001 sec * 2^13 == 8.192 sec
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14), Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
}) })
walWriteBytes = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "disk",
Name: "wal_write_bytes_total",
Help: "Total number of bytes written in WAL.",
})
) )
func init() { func init() {
prometheus.MustRegister(walFsyncSec) prometheus.MustRegister(walFsyncSec)
prometheus.MustRegister(walWriteBytes)
} }

View File

@ -18,10 +18,10 @@ import (
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"time"
"go.etcd.io/etcd/pkg/fileutil" "go.etcd.io/etcd/pkg/fileutil"
"go.etcd.io/etcd/wal/walpb" "go.etcd.io/etcd/wal/walpb"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -86,10 +86,12 @@ func Repair(lg *zap.Logger, dirpath string) bool {
return false return false
} }
start := time.Now()
if err = fileutil.Fsync(f.File); err != nil { if err = fileutil.Fsync(f.File); err != nil {
lg.Warn("failed to fsync", zap.String("path", f.Name()), zap.Error(err)) lg.Warn("failed to fsync", zap.String("path", f.Name()), zap.Error(err))
return false return false
} }
walFsyncSec.Observe(time.Since(start).Seconds())
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.ErrUnexpectedEOF)) lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.ErrUnexpectedEOF))
return true return true

View File

@ -193,6 +193,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
) )
return nil, perr return nil, perr
} }
start := time.Now()
if perr = fileutil.Fsync(pdir); perr != nil { if perr = fileutil.Fsync(pdir); perr != nil {
lg.Warn( lg.Warn(
"failed to fsync the parent data directory file", "failed to fsync the parent data directory file",
@ -202,6 +203,8 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
) )
return nil, perr return nil, perr
} }
walFsyncSec.Observe(time.Since(start).Seconds())
if perr = pdir.Close(); perr != nil { if perr = pdir.Close(); perr != nil {
lg.Warn( lg.Warn(
"failed to close the parent data directory file", "failed to close the parent data directory file",
@ -647,9 +650,11 @@ func (w *WAL) cut() error {
if err = os.Rename(newTail.Name(), fpath); err != nil { if err = os.Rename(newTail.Name(), fpath); err != nil {
return err return err
} }
start := time.Now()
if err = fileutil.Fsync(w.dirFile); err != nil { if err = fileutil.Fsync(w.dirFile); err != nil {
return err return err
} }
walFsyncSec.Observe(time.Since(start).Seconds())
// reopen newTail with its new path so calls to Name() match the wal filename format // reopen newTail with its new path so calls to Name() match the wal filename format
newTail.Close() newTail.Close()