From 6f28b43806458c3c2f536862d328bd35e1061374 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 19 Jun 2016 23:00:39 -0700 Subject: [PATCH] *: fix pending events metrics --- etcdserver/api/v3rpc/watch.go | 28 ++++++++++++++++------------ mvcc/metrics.go | 6 +++--- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 9088397ce..fc4eb8d55 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -214,7 +214,19 @@ func (sws *serverWatchStream) sendLoop() { interval := GetProgressReportInterval() progressTicker := time.NewTicker(interval) - defer progressTicker.Stop() + + defer func() { + progressTicker.Stop() + // drain the chan to clean up pending events + for ws := range sws.watchStream.Chan() { + mvcc.ReportEventReceived(len(ws.Events)) + } + for _, wrs := range pending { + for _, ws := range wrs { + mvcc.ReportEventReceived(len(ws.Events)) + } + } + }() for { select { @@ -246,7 +258,7 @@ func (sws *serverWatchStream) sendLoop() { continue } - mvcc.ReportEventReceived() + mvcc.ReportEventReceived(len(evs)) if err := sws.gRPCStream.Send(wr); err != nil { return } @@ -276,7 +288,7 @@ func (sws *serverWatchStream) sendLoop() { // flush buffered events ids[wid] = struct{}{} for _, v := range pending[wid] { - mvcc.ReportEventReceived() + mvcc.ReportEventReceived(len(v.Events)) if err := sws.gRPCStream.Send(v); err != nil { return } @@ -291,15 +303,7 @@ func (sws *serverWatchStream) sendLoop() { sws.progress[id] = true } case <-sws.closec: - // drain the chan to clean up pending events - for range sws.watchStream.Chan() { - mvcc.ReportEventReceived() - } - for _, wrs := range pending { - for range wrs { - mvcc.ReportEventReceived() - } - } + return } } } diff --git a/mvcc/metrics.go b/mvcc/metrics.go index 736873082..aa8af6aa5 100644 --- a/mvcc/metrics.go +++ b/mvcc/metrics.go @@ -157,7 +157,7 @@ func init() { // ReportEventReceived reports that an event is received. // This function should be called when the external systems received an // event from mvcc.Watcher. -func ReportEventReceived() { - pendingEventsGauge.Dec() - totalEventsCounter.Inc() +func ReportEventReceived(n int) { + pendingEventsGauge.Sub(float64(n)) + totalEventsCounter.Add(float64(n)) }