diff --git a/storage/metrics.go b/storage/metrics.go index a38f9ac72..3763990c0 100644 --- a/storage/metrics.go +++ b/storage/metrics.go @@ -59,6 +59,38 @@ var ( Help: "Total number of keys.", }) + watchersGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "storage", + Name: "watchers_total", + Help: "Total number of watchers.", + }) + + slowWatchersGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "storage", + Name: "slow_watchers_total", + Help: "Total number of unsynced slow watchers.", + }) + + totalEventsCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "storage", + Name: "events_total", + Help: "Total number of events sent by this member.", + }) + + pendingEventsGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "storage", + Name: "pending_events_total", + Help: "Total number of pending events to be sent.", + }) + indexCompactionPauseDurations = prometheus.NewHistogram( prometheus.HistogramOpts{ Namespace: "etcd", @@ -96,7 +128,19 @@ func init() { prometheus.MustRegister(deleteCounter) prometheus.MustRegister(txnCounter) prometheus.MustRegister(keysGauge) + prometheus.MustRegister(watchersGauge) + prometheus.MustRegister(totalEventsCounter) + prometheus.MustRegister(slowWatchersGauge) + prometheus.MustRegister(pendingEventsGauge) prometheus.MustRegister(indexCompactionPauseDurations) prometheus.MustRegister(dbCompactionPauseDurations) prometheus.MustRegister(dbCompactionTotalDurations) } + +// ReportEventReceived reports that an event is received. +// This function should be called when the external systems received an +// event from storage.Watcher. +func ReportEventReceived() { + pendingEventsGauge.Dec() + totalEventsCounter.Inc() +} diff --git a/storage/watchable_store.go b/storage/watchable_store.go index 83eba9120..84f39f2b5 100644 --- a/storage/watchable_store.go +++ b/storage/watchable_store.go @@ -172,8 +172,10 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64 s.endm[endRev] = append(s.endm[endRev], wa) } } else { + slowWatchersGauge.Inc() s.unsynced = append(s.unsynced, wa) } + watchersGauge.Inc() cancel := CancelFunc(func() { s.mu.Lock() @@ -184,6 +186,8 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64 for i, w := range s.unsynced { if w == wa { s.unsynced = append(s.unsynced[:i], s.unsynced[i+1:]...) + slowWatchersGauge.Dec() + watchersGauge.Dec() return } } @@ -191,6 +195,7 @@ func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64 for i, w := range s.synced[k] { if w == wa { s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...) + watchersGauge.Dec() } } if wa.end != 0 { @@ -252,6 +257,7 @@ func (s *watchableStore) syncWatchers() { // push events to the channel for _, ev := range evs { w.ch <- ev + pendingEventsGauge.Inc() } // stop watcher if it reaches the end if w.end > 0 && nextRev >= w.end { @@ -271,6 +277,7 @@ func (s *watchableStore) syncWatchers() { nws = append(nws, w) } s.unsynced = nws + slowWatchersGauge.Set(float64(len(s.unsynced))) } // handle handles the change of the happening event on all watchers. @@ -294,6 +301,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) { } select { case w.ch <- ev: + pendingEventsGauge.Inc() nws = append(nws, w) default: // put it back to unsynced place @@ -306,6 +314,7 @@ func (s *watchableStore) notify(rev int64, ev storagepb.Event) { } w.cur = rev s.unsynced = append(s.unsynced, w) + slowWatchersGauge.Inc() } } s.synced[string(ev.Kv.Key[:i])] = nws @@ -319,6 +328,7 @@ func (s *watchableStore) stopWatchers(rev int64) { for _, w := range s.synced[k] { if w == wa { s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...) + watchersGauge.Dec() } } wa.stopWithError(ExceedEnd)