mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #9942 from wenjiaswe/automated-cherry-pick-of-#9761-upstream-release-3.2
Automated cherry pick of #9761
This commit is contained in:
commit
87418c3432
@ -41,6 +41,18 @@ var (
|
|||||||
Name: "leader_changes_seen_total",
|
Name: "leader_changes_seen_total",
|
||||||
Help: "The number of leader changes seen.",
|
Help: "The number of leader changes seen.",
|
||||||
})
|
})
|
||||||
|
heartbeatSendFailures = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "heartbeat_send_failures_total",
|
||||||
|
Help: "The total number of leader heartbeat send failures (likely overloaded from slow disk).",
|
||||||
|
})
|
||||||
|
slowApplies = prometheus.NewCounter(prometheus.CounterOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "server",
|
||||||
|
Name: "slow_apply_total",
|
||||||
|
Help: "The total number of slow apply requests (likely overloaded from slow disk).",
|
||||||
|
})
|
||||||
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
|
proposalsCommitted = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||||
Namespace: "etcd",
|
Namespace: "etcd",
|
||||||
Subsystem: "server",
|
Subsystem: "server",
|
||||||
@ -96,6 +108,8 @@ func init() {
|
|||||||
prometheus.MustRegister(hasLeader)
|
prometheus.MustRegister(hasLeader)
|
||||||
prometheus.MustRegister(isLeader)
|
prometheus.MustRegister(isLeader)
|
||||||
prometheus.MustRegister(leaderChanges)
|
prometheus.MustRegister(leaderChanges)
|
||||||
|
prometheus.MustRegister(heartbeatSendFailures)
|
||||||
|
prometheus.MustRegister(slowApplies)
|
||||||
prometheus.MustRegister(proposalsCommitted)
|
prometheus.MustRegister(proposalsCommitted)
|
||||||
prometheus.MustRegister(proposalsApplied)
|
prometheus.MustRegister(proposalsApplied)
|
||||||
prometheus.MustRegister(proposalsPending)
|
prometheus.MustRegister(proposalsPending)
|
||||||
|
|||||||
@ -346,6 +346,7 @@ func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
|
|||||||
// TODO: limit request rate.
|
// TODO: limit request rate.
|
||||||
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
|
plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v)", r.heartbeat, exceed)
|
||||||
plog.Warningf("server is likely overloaded")
|
plog.Warningf("server is likely overloaded")
|
||||||
|
heartbeatSendFailures.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -146,6 +146,7 @@ func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, pref
|
|||||||
result = resp
|
result = resp
|
||||||
}
|
}
|
||||||
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
|
plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
|
||||||
|
slowApplies.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -288,6 +288,8 @@ func (b *backend) Defrag() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *backend) defrag() error {
|
func (b *backend) defrag() error {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
// TODO: make this non-blocking?
|
// TODO: make this non-blocking?
|
||||||
// lock batchTx to ensure nobody is using previous tx, and then
|
// lock batchTx to ensure nobody is using previous tx, and then
|
||||||
// close previous ongoing tx.
|
// close previous ongoing tx.
|
||||||
@ -351,6 +353,9 @@ func (b *backend) defrag() error {
|
|||||||
atomic.StoreInt64(&b.size, size)
|
atomic.StoreInt64(&b.size, size)
|
||||||
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))
|
||||||
|
|
||||||
|
took := time.Since(now)
|
||||||
|
defragDurations.Observe(took.Seconds())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -141,15 +141,15 @@ func unsafeForEach(tx *bolt.Tx, bucket []byte, visitor func(k, v []byte) error)
|
|||||||
// Commit commits a previous tx and begins a new writable one.
|
// Commit commits a previous tx and begins a new writable one.
|
||||||
func (t *batchTx) Commit() {
|
func (t *batchTx) Commit() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(false)
|
t.commit(false)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// CommitAndStop commits the previous tx and does not create a new one.
|
// CommitAndStop commits the previous tx and does not create a new one.
|
||||||
func (t *batchTx) CommitAndStop() {
|
func (t *batchTx) CommitAndStop() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(true)
|
t.commit(true)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTx) Unlock() {
|
func (t *batchTx) Unlock() {
|
||||||
@ -167,9 +167,11 @@ func (t *batchTx) commit(stop bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
// gofail: var beforeCommit struct{}
|
// gofail: var beforeCommit struct{}
|
||||||
err := t.tx.Commit()
|
err := t.tx.Commit()
|
||||||
// gofail: var afterCommit struct{}
|
// gofail: var afterCommit struct{}
|
||||||
|
|
||||||
commitDurations.Observe(time.Since(start).Seconds())
|
commitDurations.Observe(time.Since(start).Seconds())
|
||||||
atomic.AddInt64(&t.backend.commits, 1)
|
atomic.AddInt64(&t.backend.commits, 1)
|
||||||
|
|
||||||
@ -214,21 +216,21 @@ func (t *batchTxBuffered) Unlock() {
|
|||||||
|
|
||||||
func (t *batchTxBuffered) Commit() {
|
func (t *batchTxBuffered) Commit() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(false)
|
t.commit(false)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTxBuffered) CommitAndStop() {
|
func (t *batchTxBuffered) CommitAndStop() {
|
||||||
t.Lock()
|
t.Lock()
|
||||||
defer t.Unlock()
|
|
||||||
t.commit(true)
|
t.commit(true)
|
||||||
|
t.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTxBuffered) commit(stop bool) {
|
func (t *batchTxBuffered) commit(stop bool) {
|
||||||
// all read txs must be closed to acquire boltdb commit rwlock
|
// all read txs must be closed to acquire boltdb commit rwlock
|
||||||
t.backend.readTx.mu.Lock()
|
t.backend.readTx.mu.Lock()
|
||||||
defer t.backend.readTx.mu.Unlock()
|
|
||||||
t.unsafeCommit(stop)
|
t.unsafeCommit(stop)
|
||||||
|
t.backend.readTx.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
func (t *batchTxBuffered) unsafeCommit(stop bool) {
|
||||||
|
|||||||
@ -22,7 +22,22 @@ var (
|
|||||||
Subsystem: "disk",
|
Subsystem: "disk",
|
||||||
Name: "backend_commit_duration_seconds",
|
Name: "backend_commit_duration_seconds",
|
||||||
Help: "The latency distributions of commit called by backend.",
|
Help: "The latency distributions of commit called by backend.",
|
||||||
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
|
||||||
|
// lowest bucket start of upper bound 0.001 sec (1 ms) with factor 2
|
||||||
|
// highest bucket start of 0.001 sec * 2^13 == 8.192 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(0.001, 2, 14),
|
||||||
|
})
|
||||||
|
|
||||||
|
defragDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "disk",
|
||||||
|
Name: "backend_defrag_duration_seconds",
|
||||||
|
Help: "The latency distribution of backend defragmentation.",
|
||||||
|
|
||||||
|
// 100 MB usually takes 1 sec, so start with 10 MB of 100 ms
|
||||||
|
// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
|
||||||
|
// highest bucket start of 0.1 sec * 2^12 == 409.6 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(.1, 2, 13),
|
||||||
})
|
})
|
||||||
|
|
||||||
snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
snapshotDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
@ -30,12 +45,15 @@ var (
|
|||||||
Subsystem: "disk",
|
Subsystem: "disk",
|
||||||
Name: "backend_snapshot_duration_seconds",
|
Name: "backend_snapshot_duration_seconds",
|
||||||
Help: "The latency distribution of backend snapshots.",
|
Help: "The latency distribution of backend snapshots.",
|
||||||
// 10 ms -> 655 seconds
|
|
||||||
|
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
|
||||||
|
// highest bucket start of 0.01 sec * 2^16 == 655.36 sec
|
||||||
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
|
Buckets: prometheus.ExponentialBuckets(.01, 2, 17),
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
prometheus.MustRegister(commitDurations)
|
prometheus.MustRegister(commitDurations)
|
||||||
|
prometheus.MustRegister(defragDurations)
|
||||||
prometheus.MustRegister(snapshotDurations)
|
prometheus.MustRegister(snapshotDurations)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -150,8 +150,12 @@ func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) Hash() (hash uint32, revision int64, err error) {
|
func (s *store) Hash() (hash uint32, revision int64, err error) {
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
s.b.ForceCommit()
|
s.b.ForceCommit()
|
||||||
h, err := s.b.Hash(DefaultIgnores)
|
h, err := s.b.Hash(DefaultIgnores)
|
||||||
|
|
||||||
|
hashDurations.Observe(time.Since(start).Seconds())
|
||||||
return h, s.currentRev, err
|
return h, s.currentRev, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -173,7 +173,19 @@ var (
|
|||||||
)
|
)
|
||||||
// overridden by mvcc initialization
|
// overridden by mvcc initialization
|
||||||
reportDbTotalSizeInUseInBytesMu sync.RWMutex
|
reportDbTotalSizeInUseInBytesMu sync.RWMutex
|
||||||
reportDbTotalSizeInUseInBytes = func() float64 { return 0 }
|
reportDbTotalSizeInUseInBytes func() float64 = func() float64 { return 0 }
|
||||||
|
|
||||||
|
hashDurations = prometheus.NewHistogram(prometheus.HistogramOpts{
|
||||||
|
Namespace: "etcd",
|
||||||
|
Subsystem: "mvcc",
|
||||||
|
Name: "hash_duration_seconds",
|
||||||
|
Help: "The latency distribution of storage hash operation.",
|
||||||
|
|
||||||
|
// 100 MB usually takes 100 ms, so start with 10 MB of 10 ms
|
||||||
|
// lowest bucket start of upper bound 0.01 sec (10 ms) with factor 2
|
||||||
|
// highest bucket start of 0.01 sec * 2^14 == 163.84 sec
|
||||||
|
Buckets: prometheus.ExponentialBuckets(.01, 2, 15),
|
||||||
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -193,6 +205,7 @@ func init() {
|
|||||||
prometheus.MustRegister(dbTotalSizeDebugging)
|
prometheus.MustRegister(dbTotalSizeDebugging)
|
||||||
prometheus.MustRegister(dbTotalSize)
|
prometheus.MustRegister(dbTotalSize)
|
||||||
prometheus.MustRegister(dbTotalSizeInUse)
|
prometheus.MustRegister(dbTotalSizeInUse)
|
||||||
|
prometheus.MustRegister(hashDurations)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReportEventReceived reports that an event is received.
|
// ReportEventReceived reports that an event is received.
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user