From 2f0e3fd2df1057770395f8e8e9e18996e4bc70c4 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 23 May 2018 11:53:40 -0700 Subject: [PATCH 01/11] etcdserver: add "etcd_server_heartbeat_failures_total" {"level":"warn","ts":1527101858.4149103,"caller":"etcdserver/raft.go:370","msg":"failed to send out heartbeat; took too long, server is overloaded likely from slow disk","heartbeat-interval":0.1,"expected-duration":0.2,"exceeded-duration":0.025771662} {"level":"warn","ts":1527101858.4149644,"caller":"etcdserver/raft.go:370","msg":"failed to send out heartbeat; took too long, server is overloaded likely from slow disk","heartbeat-interval":0.1,"expected-duration":0.2,"exceeded-duration":0.034015766} Signed-off-by: Gyuho Lee --- etcdserver/metrics.go | 7 +++++++ etcdserver/raft.go | 1 + 2 files changed, 8 insertions(+) diff --git a/etcdserver/metrics.go b/etcdserver/metrics.go index 42a27166a..6c228b721 100644 --- a/etcdserver/metrics.go +++ b/etcdserver/metrics.go @@ -41,6 +41,12 @@ var ( Name: "leader_changes_seen_total", Help: "The number of leader changes seen.", }) + heartbeatFailures = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "heartbeat_failures_total", + Help: "The total number of heartbeat send failures (likely overloaded from slow disk).", + }) proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "etcd", Subsystem: "server", @@ -96,6 +102,7 @@ func init() { prometheus.MustRegister(hasLeader) prometheus.MustRegister(isLeader) prometheus.MustRegister(leaderChanges) + prometheus.MustRegister(heartbeatFailures) prometheus.MustRegister(proposalsCommitted) prometheus.MustRegister(proposalsApplied) prometheus.MustRegister(proposalsPending) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index 6c67275de..c3d93f83a 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -346,6 +346,7 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message { // TODO: limit request rate. plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed) plog.Warningf("server is likely overloaded") + heartbeatFailures.Inc() } } } From 66d8194e4db0b7d14bfcf4f1b3bb837ac096bce1 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 23 May 2018 12:01:38 -0700 Subject: [PATCH 02/11] etcdserver: add "etcd_server_slow_apply_total" {"level":"warn","ts":1527101858.6985068,"caller":"etcdserver/util.go:115","msg":"apply request took too long","took":0.114101529,"expected-duration":0.1,"prefix":"","request":"header: put: --- etcdserver/metrics.go | 7 +++++++ etcdserver/util.go | 1 + 2 files changed, 8 insertions(+) diff --git a/etcdserver/metrics.go b/etcdserver/metrics.go index 6c228b721..80798bc1b 100644 --- a/etcdserver/metrics.go +++ b/etcdserver/metrics.go @@ -47,6 +47,12 @@ var ( Name: "heartbeat_failures_total", Help: "The total number of heartbeat send failures (likely overloaded from slow disk).", }) + slowApplies = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "slow_apply_total", + Help: "The total number of slow apply requests (likely overloaded from slow disk).", + }) proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "etcd", Subsystem: "server", @@ -103,6 +109,7 @@ func init() { prometheus.MustRegister(isLeader) prometheus.MustRegister(leaderChanges) prometheus.MustRegister(heartbeatFailures) + prometheus.MustRegister(slowApplies) prometheus.MustRegister(proposalsCommitted) prometheus.MustRegister(proposalsApplied) prometheus.MustRegister(proposalsPending) diff --git a/etcdserver/util.go b/etcdserver/util.go index 708fee4cb..79bb6b859 100644 --- a/etcdserver/util.go +++ b/etcdserver/util.go @@ -146,6 +146,7 @@ func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, pref result = resp } plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d) + slowApplies.Inc() } } From fae9b6f667708b701eb81a3e7908093793243fd8 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 23 May 2018 12:09:03 -0700 Subject: [PATCH 03/11] mvcc/backend: clean up mutex, logging Signed-off-by: Gyuho Lee --- mvcc/backend/batch_tx.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/mvcc/backend/batch_tx.go b/mvcc/backend/batch_tx.go index 32f5d0c8b..4119f2e30 100644 --- a/mvcc/backend/batch_tx.go +++ b/mvcc/backend/batch_tx.go @@ -141,15 +141,15 @@ func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error) // Commit commits a previous tx and begins a new writable one. func (t *batchTx) Commit() { t.Lock() - defer t.Unlock() t.commit(false) + t.Unlock() } // CommitAndStop commits the previous tx and does not create a new one. func (t *batchTx) CommitAndStop() { t.Lock() - defer t.Unlock() t.commit(true) + t.Unlock() } func (t *batchTx) Unlock() { @@ -167,9 +167,11 @@ func (t *batchTx) commit(stop bool) { } start := time.Now() + // gofail: var beforeCommit struct{} err := t.tx.Commit() // gofail: var afterCommit struct{} + commitDurations.Observe(time.Since(start).Seconds()) atomic.AddInt64(&t.backend.commits, 1) @@ -214,21 +216,21 @@ func (t *batchTxBuffered) Unlock() { func (t *batchTxBuffered) Commit() { t.Lock() - defer t.Unlock() t.commit(false) + t.Unlock() } func (t *batchTxBuffered) CommitAndStop() { t.Lock() - defer t.Unlock() t.commit(true) + t.Unlock() } func (t *batchTxBuffered) commit(stop bool) { // all read txs must be closed to acquire boltdb commit rwlock t.backend.readTx.mu.Lock() - defer t.backend.readTx.mu.Unlock() t.unsafeCommit(stop) + t.backend.readTx.mu.Unlock() } func (t *batchTxBuffered) unsafeCommit(stop bool) { From 3535f7a61f51abd4d3b08fcd737f63ef2c6d98d0 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 23 May 2018 12:14:43 -0700 Subject: [PATCH 04/11] mvcc/backend: document metrics ExponentialBuckets Signed-off-by: Gyuho Lee --- mvcc/backend/metrics.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/mvcc/backend/metrics.go b/mvcc/backend/metrics.go index 30a388014..e83602ea9 100644 --- a/mvcc/backend/metrics.go +++ b/mvcc/backend/metrics.go @@ -22,7 +22,10 @@ var ( Subsystem: "disk", Name: "backend_commit_duration_seconds", Help: "The latency distributions of commit called by backend.", - Buckets: prometheus.ExponentialBuckets(0.001, 2, 14), + + // lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2 + // highest bucket start of 0.001 sec * 2^13 == 8.192 sec + Buckets: prometheus.ExponentialBuckets(0.001, 2, 14), }) snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ @@ -30,7 +33,9 @@ var ( Subsystem: "disk", Name: "backend_snapshot_duration_seconds", Help: "The latency distribution of backend snapshots.", - // 10 ms -> 655 seconds + + // lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2 + // highest bucket start of 0.01 sec * 2^16 == 655.36 sec Buckets: prometheus.ExponentialBuckets(.01, 2, 17), }) ) From aca5c8f4b63d36bd1841cb3a378fee4d1cceb4b6 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 23 May 2018 12:21:57 -0700 Subject: [PATCH 05/11] mvcc/backend: add "etcd_disk_backend_defrag_duration_seconds" Signed-off-by: Gyuho Lee --- mvcc/backend/backend.go | 5 +++++ mvcc/backend/metrics.go | 13 +++++++++++++ 2 files changed, 18 insertions(+) diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index e55768f29..6bf228986 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -288,6 +288,8 @@ func (b *backend) Defrag() error { } func (b *backend) defrag() error { + now := time.Now() + // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. @@ -351,6 +353,9 @@ func (b *backend) defrag() error { atomic.StoreInt64(&b.size, size) atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) + took := time.Since(now) + defragDurations.Observe(took.Seconds()) + return nil } diff --git a/mvcc/backend/metrics.go b/mvcc/backend/metrics.go index e83602ea9..95503940b 100644 --- a/mvcc/backend/metrics.go +++ b/mvcc/backend/metrics.go @@ -28,6 +28,18 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2, 14), }) + defragDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "disk", + Name: "backend_defrag_duration_seconds", + Help: "The latency distribution of backend defragmentation.", + + // 100 MB usually takes 1 sec, so start with 10 MB of 100 ms + // lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2 + // highest bucket start of 0.1 sec * 2^12 == 409.6 sec + Buckets: prometheus.ExponentialBuckets(.01, 2, 13), + }) + snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: "etcd", Subsystem: "disk", @@ -42,5 +54,6 @@ var ( func init() { prometheus.MustRegister(commitDurations) + prometheus.MustRegister(defragDurations) prometheus.MustRegister(snapshotDurations) } From 8ac6c888cd8ddb8bccf9e4bf0abd994f6ce036ad Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 23 May 2018 12:33:58 -0700 Subject: [PATCH 06/11] mvcc/backend: fix defrag duration scale Signed-off-by: Gyuho Lee --- mvcc/backend/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mvcc/backend/metrics.go b/mvcc/backend/metrics.go index 95503940b..341570804 100644 --- a/mvcc/backend/metrics.go +++ b/mvcc/backend/metrics.go @@ -37,7 +37,7 @@ var ( // 100 MB usually takes 1 sec, so start with 10 MB of 100 ms // lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2 // highest bucket start of 0.1 sec * 2^12 == 409.6 sec - Buckets: prometheus.ExponentialBuckets(.01, 2, 13), + Buckets: prometheus.ExponentialBuckets(.1, 2, 13), }) snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ From 4e08898571677453d0d4218755685bf51855f143 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 23 May 2018 12:34:09 -0700 Subject: [PATCH 07/11] mvcc: add "etcd_mvcc_hash_(rev)_duration_seconds" etcd_mvcc_hash_duration_seconds etcd_mvcc_hash_rev_duration_seconds Signed-off-by: Gyuho Lee --- mvcc/backend/backend.go | 2 +- mvcc/kvstore.go | 57 +++++++++++++++++++++++++++++++++++++++++ mvcc/metrics.go | 28 +++++++++++++++++++- 3 files changed, 85 insertions(+), 2 deletions(-) diff --git a/mvcc/backend/backend.go b/mvcc/backend/backend.go index 6bf228986..a240c8e62 100644 --- a/mvcc/backend/backend.go +++ b/mvcc/backend/backend.go @@ -289,7 +289,7 @@ func (b *backend) Defrag() error { func (b *backend) defrag() error { now := time.Now() - + // TODO: make this non-blocking? // lock batchTx to ensure nobody is using previous tx, and then // close previous ongoing tx. diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index aa93147a7..e0ffd151a 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -150,11 +150,68 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) { } func (s *store) Hash() (hash uint32, revision int64, err error) { + start := time.Now() + s.b.ForceCommit() h, err := s.b.Hash(DefaultIgnores) + + hashDurations.Observe(time.Since(start).Seconds()) return h, s.currentRev, err } +func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { + start := time.Now() + + s.mu.RLock() + s.revMu.RLock() + compactRev, currentRev = s.compactMainRev, s.currentRev + s.revMu.RUnlock() + + if rev > 0 && rev <= compactRev { + s.mu.RUnlock() + return 0, 0, compactRev, ErrCompacted + } else if rev > 0 && rev > currentRev { + s.mu.RUnlock() + return 0, currentRev, 0, ErrFutureRev + } + + if rev == 0 { + rev = currentRev + } + keep := s.kvindex.Keep(rev) + + tx := s.b.ReadTx() + tx.Lock() + defer tx.Unlock() + s.mu.RUnlock() + + upper := revision{main: rev + 1} + lower := revision{main: compactRev + 1} + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + + h.Write(keyBucketName) + err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error { + kr := bytesToRev(k) + if !upper.GreaterThan(kr) { + return nil + } + // skip revisions that are scheduled for deletion + // due to compacting; don't skip if there isn't one. + if lower.GreaterThan(kr) && len(keep) > 0 { + if _, ok := keep[kr]; !ok { + return nil + } + } + h.Write(k) + h.Write(v) + return nil + }) + hash = h.Sum32() + + hashRevDurations.Observe(time.Since(start).Seconds()) + return hash, currentRev, compactRev, err +} + func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.mu.Lock() defer s.mu.Unlock() diff --git a/mvcc/metrics.go b/mvcc/metrics.go index bd875dddd..aae449091 100644 --- a/mvcc/metrics.go +++ b/mvcc/metrics.go @@ -173,7 +173,31 @@ var ( ) // overridden by mvcc initialization reportDbTotalSizeInUseInBytesMu sync.RWMutex - reportDbTotalSizeInUseInBytes = func() float64 { return 0 } + reportDbTotalSizeInUseInBytes func() float64 = func() float64 { return 0 } + + hashDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "hash_duration_seconds", + Help: "The latency distribution of storage hash operation.", + + // 100 MB usually takes 100 ms, so start with 10 MB of 10 ms + // lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2 + // highest bucket start of 0.01 sec * 2^14 == 163.84 sec + Buckets: prometheus.ExponentialBuckets(.01, 2, 15), + }) + + hashRevDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "mvcc", + Name: "hash_rev_duration_seconds", + Help: "The latency distribution of storage hash by revision operation.", + + // 100 MB usually takes 100 ms, so start with 10 MB of 10 ms + // lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2 + // highest bucket start of 0.01 sec * 2^14 == 163.84 sec + Buckets: prometheus.ExponentialBuckets(.01, 2, 15), + }) ) func init() { @@ -193,6 +217,8 @@ func init() { prometheus.MustRegister(dbTotalSizeDebugging) prometheus.MustRegister(dbTotalSize) prometheus.MustRegister(dbTotalSizeInUse) + prometheus.MustRegister(hashDurations) + prometheus.MustRegister(hashRevDurations) } // ReportEventReceived reports that an event is received. From 8798c5cd43dca1dbf3f3ce94453e692e68241e25 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 23 May 2018 13:09:31 -0700 Subject: [PATCH 08/11] etcdserver: rename to "heartbeat_send_failures_total" Signed-off-by: Gyuho Lee --- etcdserver/metrics.go | 8 ++++---- etcdserver/raft.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/etcdserver/metrics.go b/etcdserver/metrics.go index 80798bc1b..bb2fa2db6 100644 --- a/etcdserver/metrics.go +++ b/etcdserver/metrics.go @@ -41,11 +41,11 @@ var ( Name: "leader_changes_seen_total", Help: "The number of leader changes seen.", }) - heartbeatFailures = prometheus.NewCounter(prometheus.CounterOpts{ + heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "etcd", Subsystem: "server", - Name: "heartbeat_failures_total", - Help: "The total number of heartbeat send failures (likely overloaded from slow disk).", + Name: "heartbeat_send_failures_total", + Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).", }) slowApplies = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "etcd", @@ -108,7 +108,7 @@ func init() { prometheus.MustRegister(hasLeader) prometheus.MustRegister(isLeader) prometheus.MustRegister(leaderChanges) - prometheus.MustRegister(heartbeatFailures) + prometheus.MustRegister(heartbeatSendFailures) prometheus.MustRegister(slowApplies) prometheus.MustRegister(proposalsCommitted) prometheus.MustRegister(proposalsApplied) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index c3d93f83a..5e2ab3d17 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -346,7 +346,7 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message { // TODO: limit request rate. plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed) plog.Warningf("server is likely overloaded") - heartbeatFailures.Inc() + heartbeatSendFailures.Inc() } } } From b3ab14ca9aaa0d5c16abda1d859368430736db1b Mon Sep 17 00:00:00 2001 From: Wenjia Date: Fri, 20 Jul 2018 13:44:15 -0700 Subject: [PATCH 09/11] remove HashByRev --- mvcc/kvstore.go | 53 ------------------------------------------------- 1 file changed, 53 deletions(-) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index e0ffd151a..c68256685 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -159,59 +159,6 @@ func (s *store) Hash() (hash uint32, revision int64, err error) { return h, s.currentRev, err } -func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) { - start := time.Now() - - s.mu.RLock() - s.revMu.RLock() - compactRev, currentRev = s.compactMainRev, s.currentRev - s.revMu.RUnlock() - - if rev > 0 && rev <= compactRev { - s.mu.RUnlock() - return 0, 0, compactRev, ErrCompacted - } else if rev > 0 && rev > currentRev { - s.mu.RUnlock() - return 0, currentRev, 0, ErrFutureRev - } - - if rev == 0 { - rev = currentRev - } - keep := s.kvindex.Keep(rev) - - tx := s.b.ReadTx() - tx.Lock() - defer tx.Unlock() - s.mu.RUnlock() - - upper := revision{main: rev + 1} - lower := revision{main: compactRev + 1} - h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) - - h.Write(keyBucketName) - err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error { - kr := bytesToRev(k) - if !upper.GreaterThan(kr) { - return nil - } - // skip revisions that are scheduled for deletion - // due to compacting; don't skip if there isn't one. - if lower.GreaterThan(kr) && len(keep) > 0 { - if _, ok := keep[kr]; !ok { - return nil - } - } - h.Write(k) - h.Write(v) - return nil - }) - hash = h.Sum32() - - hashRevDurations.Observe(time.Since(start).Seconds()) - return hash, currentRev, compactRev, err -} - func (s *store) Compact(rev int64) (<-chan struct{}, error) { s.mu.Lock() defer s.mu.Unlock() From a3c0a990675aa87ed7b79b4ffc8b4f8f68933375 Mon Sep 17 00:00:00 2001 From: Wenjia Date: Fri, 20 Jul 2018 13:45:33 -0700 Subject: [PATCH 10/11] remove hashRevDurations --- mvcc/metrics.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/mvcc/metrics.go b/mvcc/metrics.go index aae449091..4dd33a768 100644 --- a/mvcc/metrics.go +++ b/mvcc/metrics.go @@ -186,18 +186,6 @@ var ( // highest bucket start of 0.01 sec * 2^14 == 163.84 sec Buckets: prometheus.ExponentialBuckets(.01, 2, 15), }) - - hashRevDurations = prometheus.NewHistogram(prometheus.HistogramOpts{ - Namespace: "etcd", - Subsystem: "mvcc", - Name: "hash_rev_duration_seconds", - Help: "The latency distribution of storage hash by revision operation.", - - // 100 MB usually takes 100 ms, so start with 10 MB of 10 ms - // lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2 - // highest bucket start of 0.01 sec * 2^14 == 163.84 sec - Buckets: prometheus.ExponentialBuckets(.01, 2, 15), - }) ) func init() { From 8c9fd1b5e68dbbac9e813835c130cc93e11751d7 Mon Sep 17 00:00:00 2001 From: Wenjia Date: Fri, 20 Jul 2018 13:48:35 -0700 Subject: [PATCH 11/11] remove hashRevDurations --- mvcc/metrics.go | 1 - 1 file changed, 1 deletion(-) diff --git a/mvcc/metrics.go b/mvcc/metrics.go index 4dd33a768..90bf9ecae 100644 --- a/mvcc/metrics.go +++ b/mvcc/metrics.go @@ -206,7 +206,6 @@ func init() { prometheus.MustRegister(dbTotalSize) prometheus.MustRegister(dbTotalSizeInUse) prometheus.MustRegister(hashDurations) - prometheus.MustRegister(hashRevDurations) } // ReportEventReceived reports that an event is received.