Merge pull request #5720 from xiang90/report_recv

*: fix pending events metrics
This commit is contained in:
Xiang Li
2016-06-20 06:44:16 -07:00
committed by GitHub
2 changed files with 19 additions and 15 deletions

View File

@@ -214,7 +214,19 @@ func (sws *serverWatchStream) sendLoop() {
interval := GetProgressReportInterval()
progressTicker := time.NewTicker(interval)
defer progressTicker.Stop()
defer func() {
progressTicker.Stop()
// drain the chan to clean up pending events
for ws := range sws.watchStream.Chan() {
mvcc.ReportEventReceived(len(ws.Events))
}
for _, wrs := range pending {
for _, ws := range wrs {
mvcc.ReportEventReceived(len(ws.Events))
}
}
}()
for {
select {
@@ -246,7 +258,7 @@ func (sws *serverWatchStream) sendLoop() {
continue
}
mvcc.ReportEventReceived()
mvcc.ReportEventReceived(len(evs))
if err := sws.gRPCStream.Send(wr); err != nil {
return
}
@@ -276,7 +288,7 @@ func (sws *serverWatchStream) sendLoop() {
// flush buffered events
ids[wid] = struct{}{}
for _, v := range pending[wid] {
mvcc.ReportEventReceived()
mvcc.ReportEventReceived(len(v.Events))
if err := sws.gRPCStream.Send(v); err != nil {
return
}
@@ -291,15 +303,7 @@ func (sws *serverWatchStream) sendLoop() {
sws.progress[id] = true
}
case <-sws.closec:
// drain the chan to clean up pending events
for range sws.watchStream.Chan() {
mvcc.ReportEventReceived()
}
for _, wrs := range pending {
for range wrs {
mvcc.ReportEventReceived()
}
}
return
}
}
}

View File

@@ -157,7 +157,7 @@ func init() {
// ReportEventReceived reports that an event is received.
// This function should be called when the external systems received an
// event from mvcc.Watcher.
func ReportEventReceived() {
pendingEventsGauge.Dec()
totalEventsCounter.Inc()
func ReportEventReceived(n int) {
pendingEventsGauge.Sub(float64(n))
totalEventsCounter.Add(float64(n))
}