mvcc: add "etcd_mvcc_db_total_size_in_use_in_bytes"

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
This commit is contained in:
Gyuho Lee 2018-07-03 14:21:11 -07:00
parent 9bc1e15386
commit bedba66c69
5 changed files with 64 additions and 10 deletions

View File

@ -54,6 +54,10 @@ type Backend interface {
Hash(ignores map[IgnoreKey]struct{}) (uint32, error) Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
// Size returns the current size of the backend. // Size returns the current size of the backend.
Size() int64 Size() int64
// SizeInUse returns the current size of the backend logically in use.
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
Defrag() error Defrag() error
ForceCommit() ForceCommit()
Close() error Close() error
@ -74,6 +78,10 @@ type backend struct {
// size is the number of bytes in the backend // size is the number of bytes in the backend
size int64 size int64
// sizeInUse is the number of bytes actually used in the backend
sizeInUse int64
// commits counts number of commits since start // commits counts number of commits since start
commits int64 commits int64
@ -244,6 +252,10 @@ func (b *backend) Size() int64 {
return atomic.LoadInt64(&b.size) return atomic.LoadInt64(&b.size)
} }
func (b *backend) SizeInUse() int64 {
return atomic.LoadInt64(&b.sizeInUse)
}
func (b *backend) run() { func (b *backend) run() {
defer close(b.donec) defer close(b.donec)
t := time.NewTimer(b.batchInterval) t := time.NewTimer(b.batchInterval)
@ -341,7 +353,11 @@ func (b *backend) defrag() error {
b.readTx.buf.reset() b.readTx.buf.reset()
b.readTx.tx = b.unsafeBegin(false) b.readTx.tx = b.unsafeBegin(false)
atomic.StoreInt64(&b.size, b.readTx.tx.Size())
size := b.readTx.tx.Size()
db := b.db
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
return nil return nil
} }
@ -402,7 +418,12 @@ func (b *backend) begin(write bool) *bolt.Tx {
b.mu.RLock() b.mu.RLock()
tx := b.unsafeBegin(write) tx := b.unsafeBegin(write)
b.mu.RUnlock() b.mu.RUnlock()
atomic.StoreInt64(&b.size, tx.Size())
size := tx.Size()
db := tx.DB()
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
return tx return tx
} }

View File

@ -166,11 +166,22 @@ func (t *batchTx) commit(stop bool) {
t.backend.mu.RLock() t.backend.mu.RLock()
defer t.backend.mu.RUnlock() defer t.backend.mu.RUnlock()
// t.tx.DB()==nil if 'CommitAndStop' calls 'batchTx.commit(true)', // batchTx.commit(true) calls *bolt.Tx.Commit, which
// which initializes *bolt.Tx.db and *bolt.Tx.meta as nil; panics t.tx.Size(). // initializes *bolt.Tx.db and *bolt.Tx.meta as nil,
// Server must make sure 'batchTx.commit(false)' does not follow // and subsequent *bolt.Tx.Size() call panics.
// 'batchTx.commit(true)' (e.g. stopping backend, and inflight Hash call). //
atomic.StoreInt64(&t.backend.size, t.tx.Size()) // This nil pointer reference panic happens when:
// 1. batchTx.commit(false) from newBatchTx
// 2. batchTx.commit(true) from stopping backend
// 3. batchTx.commit(false) from inflight mvcc Hash call
//
// Check if db is nil to prevent this panic
if t.tx.DB() != nil {
size := t.tx.Size()
db := t.tx.DB()
atomic.StoreInt64(&t.backend.size, size)
atomic.StoreInt64(&t.backend.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
}
return return
} }

View File

@ -245,10 +245,14 @@ func (s *store) Restore(b backend.Backend) error {
} }
func (s *store) restore() error { func (s *store) restore() error {
reportDbTotalSizeInBytesMu.Lock()
b := s.b b := s.b
reportDbTotalSizeInBytesMu.Lock()
reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) } reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
reportDbTotalSizeInBytesMu.Unlock() reportDbTotalSizeInBytesMu.Unlock()
reportDbTotalSizeInUseInBytesMu.Lock()
reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
reportDbTotalSizeInUseInBytesMu.Unlock()
min, max := newRevBytes(), newRevBytes() min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: 1}, min) revToBytes(revision{main: 1}, min)

View File

@ -638,6 +638,7 @@ func (b *fakeBackend) BatchTx() backend.BatchTx
func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx } func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil } func (b *fakeBackend) Hash(ignores map[backend.IgnoreKey]struct{}) (uint32, error) { return 0, nil }
func (b *fakeBackend) Size() int64 { return 0 } func (b *fakeBackend) Size() int64 { return 0 }
func (b *fakeBackend) SizeInUse() int64 { return 0 }
func (b *fakeBackend) Snapshot() backend.Snapshot { return nil } func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
func (b *fakeBackend) ForceCommit() {} func (b *fakeBackend) ForceCommit() {}
func (b *fakeBackend) Defrag() error { return nil } func (b *fakeBackend) Defrag() error { return nil }

View File

@ -135,7 +135,7 @@ var (
Namespace: "etcd_debugging", Namespace: "etcd_debugging",
Subsystem: "mvcc", Subsystem: "mvcc",
Name: "db_total_size_in_bytes", Name: "db_total_size_in_bytes",
Help: "Total size of the underlying database in bytes.", Help: "Total size of the underlying database physically allocated in bytes. Use etcd_mvcc_db_total_size_in_bytes",
}, },
func() float64 { func() float64 {
reportDbTotalSizeInBytesMu.RLock() reportDbTotalSizeInBytesMu.RLock()
@ -147,7 +147,7 @@ var (
Namespace: "etcd", Namespace: "etcd",
Subsystem: "mvcc", Subsystem: "mvcc",
Name: "db_total_size_in_bytes", Name: "db_total_size_in_bytes",
Help: "Total size of the underlying database in bytes.", Help: "Total size of the underlying database physically allocated in bytes.",
}, },
func() float64 { func() float64 {
reportDbTotalSizeInBytesMu.RLock() reportDbTotalSizeInBytesMu.RLock()
@ -158,6 +158,22 @@ var (
// overridden by mvcc initialization // overridden by mvcc initialization
reportDbTotalSizeInBytesMu sync.RWMutex reportDbTotalSizeInBytesMu sync.RWMutex
reportDbTotalSizeInBytes = func() float64 { return 0 } reportDbTotalSizeInBytes = func() float64 { return 0 }
dbTotalSizeInUse = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "mvcc",
Name: "db_total_size_in_use_in_bytes",
Help: "Total size of the underlying database logically in use in bytes.",
},
func() float64 {
reportDbTotalSizeInUseInBytesMu.RLock()
defer reportDbTotalSizeInUseInBytesMu.RUnlock()
return reportDbTotalSizeInUseInBytes()
},
)
// overridden by mvcc initialization
reportDbTotalSizeInUseInBytesMu sync.RWMutex
reportDbTotalSizeInUseInBytes = func() float64 { return 0 }
) )
func init() { func init() {
@ -176,6 +192,7 @@ func init() {
prometheus.MustRegister(dbCompactionTotalDurations) prometheus.MustRegister(dbCompactionTotalDurations)
prometheus.MustRegister(dbTotalSizeDebugging) prometheus.MustRegister(dbTotalSizeDebugging)
prometheus.MustRegister(dbTotalSize) prometheus.MustRegister(dbTotalSize)
prometheus.MustRegister(dbTotalSizeInUse)
} }
// ReportEventReceived reports that an event is received. // ReportEventReceived reports that an event is received.