mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00

Moves the vendor/ directory to cmd/vendor. Vendored binaries are built from cmd/, which is backed by symlinks pointing back to repo root.
654 lines
21 KiB
Go
654 lines
21 KiB
Go
// Copyright 2014 The Cockroach Authors.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
// implied. See the License for the specific language governing
|
|
// permissions and limitations under the License. See the AUTHORS file
|
|
// for names of contributors.
|
|
//
|
|
// Author: Tyler Neely (t@jujit.su)
|
|
|
|
// IMPORTANT: only subscribe to the metric stream
|
|
// using buffered channels that are regularly
|
|
// flushed, as reaper will NOT block while trying
|
|
// to send metrics to a subscriber, and will ignore
|
|
// a subscriber if they fail to clear their channel
|
|
// 3 times in a row!
|
|
|
|
package loghisto
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"runtime"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/golang/glog"
|
|
)
|
|
|
|
const (
|
|
// precision effects the bucketing used during histogram value compression.
|
|
precision = 100
|
|
)
|
|
|
|
// ProcessedMetricSet contains human-readable metrics that may also be
|
|
// suitable for storage in time-series databases.
|
|
type ProcessedMetricSet struct {
|
|
Time time.Time
|
|
Metrics map[string]float64
|
|
}
|
|
|
|
// RawMetricSet contains metrics in a form that supports generation of
|
|
// percentiles and other rich statistics.
|
|
type RawMetricSet struct {
|
|
Time time.Time
|
|
Counters map[string]uint64
|
|
Rates map[string]uint64
|
|
Histograms map[string]map[int16]*uint64
|
|
Gauges map[string]float64
|
|
}
|
|
|
|
// TimerToken facilitates concurrent timings of durations of the same label.
|
|
type TimerToken struct {
|
|
Name string
|
|
Start time.Time
|
|
MetricSystem *MetricSystem
|
|
}
|
|
|
|
// proportion is a compact value with a corresponding count of
|
|
// occurrences in this interval.
|
|
type proportion struct {
|
|
Value float64
|
|
Count uint64
|
|
}
|
|
|
|
// proportionArray is a sortable collection of proportion types.
|
|
type proportionArray []proportion
|
|
|
|
// MetricSystem facilitates the collection and distribution of metrics.
|
|
type MetricSystem struct {
|
|
// percentiles is a mapping from labels to desired percentiles to be
|
|
// calculated by the MetricSystem
|
|
percentiles map[string]float64
|
|
// interval is the duration between collections and broadcasts of metrics
|
|
// to subscribers.
|
|
interval time.Duration
|
|
// subscribeToRawMetrics allows subscription to a RawMetricSet generated
|
|
// by reaper at the end of each interval on a sent channel.
|
|
subscribeToRawMetrics chan chan *RawMetricSet
|
|
// unsubscribeFromRawMetrics allows subscribers to unsubscribe from
|
|
// receiving a RawMetricSet on the sent channel.
|
|
unsubscribeFromRawMetrics chan chan *RawMetricSet
|
|
// subscribeToProcessedMetrics allows subscription to a ProcessedMetricSet
|
|
// generated by reaper at the end of each interval on a sent channel.
|
|
subscribeToProcessedMetrics chan chan *ProcessedMetricSet
|
|
// unsubscribeFromProcessedMetrics allows subscribers to unsubscribe from
|
|
// receiving a ProcessedMetricSet on the sent channel.
|
|
unsubscribeFromProcessedMetrics chan chan *ProcessedMetricSet
|
|
// rawSubscribers stores current subscribers to RawMetrics
|
|
rawSubscribers map[chan *RawMetricSet]struct{}
|
|
// rawBadSubscribers tracks misbehaving subscribers who do not clear their
|
|
// subscription channels regularly.
|
|
rawBadSubscribers map[chan *RawMetricSet]int
|
|
// processedSubscribers stores current subscribers to ProcessedMetrics
|
|
processedSubscribers map[chan *ProcessedMetricSet]struct{}
|
|
// processedBadSubscribers tracks misbehaving subscribers who do not clear
|
|
// their subscription channels regularly.
|
|
processedBadSubscribers map[chan *ProcessedMetricSet]int
|
|
// subscribersMu controls access to subscription structures
|
|
subscribersMu sync.RWMutex
|
|
// counterStore maintains the total counts of counters.
|
|
counterStore map[string]*uint64
|
|
counterStoreMu sync.RWMutex
|
|
// counterCache aggregates new Counters until they are collected by reaper().
|
|
counterCache map[string]*uint64
|
|
// counterMu controls access to counterCache.
|
|
counterMu sync.RWMutex
|
|
// histogramCache aggregates Histograms until they are collected by reaper().
|
|
histogramCache map[string]map[int16]*uint64
|
|
// histogramMu controls access to histogramCache.
|
|
histogramMu sync.RWMutex
|
|
// histogramCountStore keeps track of aggregate counts and sums for aggregate
|
|
// mean calculation.
|
|
histogramCountStore map[string]*uint64
|
|
// histogramCountMu controls access to the histogramCountStore.
|
|
histogramCountMu sync.RWMutex
|
|
// gaugeFuncs maps metrics to functions used for calculating their value
|
|
gaugeFuncs map[string]func() float64
|
|
// gaugeFuncsMu controls access to the gaugeFuncs map.
|
|
gaugeFuncsMu sync.Mutex
|
|
// Has reaper() been started?
|
|
reaping bool
|
|
// Close this to bring down this MetricSystem
|
|
shutdownChan chan struct{}
|
|
}
|
|
|
|
// Metrics is the default metric system, which collects and broadcasts metrics
|
|
// to subscribers once every 60 seconds. Also includes default system stats.
|
|
var Metrics = NewMetricSystem(60*time.Second, true)
|
|
|
|
// NewMetricSystem returns a new metric system that collects and broadcasts
|
|
// metrics after each interval.
|
|
func NewMetricSystem(interval time.Duration, sysStats bool) *MetricSystem {
|
|
ms := &MetricSystem{
|
|
percentiles: map[string]float64{
|
|
"%s_min": 0,
|
|
"%s_50": .5,
|
|
"%s_75": .75,
|
|
"%s_90": .9,
|
|
"%s_95": .95,
|
|
"%s_99": .99,
|
|
"%s_99.9": .999,
|
|
"%s_99.99": .9999,
|
|
"%s_max": 1,
|
|
},
|
|
interval: interval,
|
|
subscribeToRawMetrics: make(chan chan *RawMetricSet, 64),
|
|
unsubscribeFromRawMetrics: make(chan chan *RawMetricSet, 64),
|
|
subscribeToProcessedMetrics: make(chan chan *ProcessedMetricSet, 64),
|
|
unsubscribeFromProcessedMetrics: make(chan chan *ProcessedMetricSet, 64),
|
|
rawSubscribers: make(map[chan *RawMetricSet]struct{}),
|
|
rawBadSubscribers: make(map[chan *RawMetricSet]int),
|
|
processedSubscribers: make(map[chan *ProcessedMetricSet]struct{}),
|
|
processedBadSubscribers: make(map[chan *ProcessedMetricSet]int),
|
|
counterStore: make(map[string]*uint64),
|
|
counterCache: make(map[string]*uint64),
|
|
histogramCache: make(map[string]map[int16]*uint64),
|
|
histogramCountStore: make(map[string]*uint64),
|
|
gaugeFuncs: make(map[string]func() float64),
|
|
shutdownChan: make(chan struct{}),
|
|
}
|
|
if sysStats {
|
|
ms.gaugeFuncsMu.Lock()
|
|
ms.gaugeFuncs["sys.Alloc"] = func() float64 {
|
|
memStats := new(runtime.MemStats)
|
|
runtime.ReadMemStats(memStats)
|
|
return float64(memStats.Alloc)
|
|
}
|
|
ms.gaugeFuncs["sys.NumGC"] = func() float64 {
|
|
memStats := new(runtime.MemStats)
|
|
runtime.ReadMemStats(memStats)
|
|
return float64(memStats.NumGC)
|
|
}
|
|
ms.gaugeFuncs["sys.PauseTotalNs"] = func() float64 {
|
|
memStats := new(runtime.MemStats)
|
|
runtime.ReadMemStats(memStats)
|
|
return float64(memStats.PauseTotalNs)
|
|
}
|
|
ms.gaugeFuncs["sys.NumGoroutine"] = func() float64 {
|
|
return float64(runtime.NumGoroutine())
|
|
}
|
|
ms.gaugeFuncsMu.Unlock()
|
|
}
|
|
return ms
|
|
}
|
|
|
|
// SpecifyPercentiles allows users to override the default collected
|
|
// and reported percentiles.
|
|
func (ms *MetricSystem) SpecifyPercentiles(percentiles map[string]float64) {
|
|
ms.percentiles = percentiles
|
|
}
|
|
|
|
// SubscribeToRawMetrics registers a channel to receive RawMetricSets
|
|
// periodically generated by reaper at each interval.
|
|
func (ms *MetricSystem) SubscribeToRawMetrics(metricStream chan *RawMetricSet) {
|
|
ms.subscribeToRawMetrics <- metricStream
|
|
}
|
|
|
|
// UnsubscribeFromRawMetrics registers a channel to receive RawMetricSets
|
|
// periodically generated by reaper at each interval.
|
|
func (ms *MetricSystem) UnsubscribeFromRawMetrics(
|
|
metricStream chan *RawMetricSet) {
|
|
ms.unsubscribeFromRawMetrics <- metricStream
|
|
}
|
|
|
|
// SubscribeToProcessedMetrics registers a channel to receive
|
|
// ProcessedMetricSets periodically generated by reaper at each interval.
|
|
func (ms *MetricSystem) SubscribeToProcessedMetrics(
|
|
metricStream chan *ProcessedMetricSet) {
|
|
ms.subscribeToProcessedMetrics <- metricStream
|
|
}
|
|
|
|
// UnsubscribeFromProcessedMetrics registers a channel to receive
|
|
// ProcessedMetricSets periodically generated by reaper at each interval.
|
|
func (ms *MetricSystem) UnsubscribeFromProcessedMetrics(
|
|
metricStream chan *ProcessedMetricSet) {
|
|
ms.unsubscribeFromProcessedMetrics <- metricStream
|
|
}
|
|
|
|
// StartTimer begins a timer and returns a token which is required for halting
|
|
// the timer. This allows for concurrent timings under the same name.
|
|
func (ms *MetricSystem) StartTimer(name string) TimerToken {
|
|
return TimerToken{
|
|
Name: name,
|
|
Start: time.Now(),
|
|
MetricSystem: ms,
|
|
}
|
|
}
|
|
|
|
// Stop stops a timer given by StartTimer, submits a Histogram of its duration
|
|
// in nanoseconds, and returns its duration in nanoseconds.
|
|
func (tt *TimerToken) Stop() time.Duration {
|
|
duration := time.Since(tt.Start)
|
|
tt.MetricSystem.Histogram(tt.Name, float64(duration.Nanoseconds()))
|
|
return duration
|
|
}
|
|
|
|
// Counter is used for recording a running count of the total occurrences of
|
|
// a particular event. A rate is also exported for the amount that a counter
|
|
// has increased during an interval of this MetricSystem.
|
|
func (ms *MetricSystem) Counter(name string, amount uint64) {
|
|
ms.counterMu.RLock()
|
|
_, exists := ms.counterCache[name]
|
|
// perform lock promotion when we need more control
|
|
if exists {
|
|
atomic.AddUint64(ms.counterCache[name], amount)
|
|
ms.counterMu.RUnlock()
|
|
} else {
|
|
ms.counterMu.RUnlock()
|
|
ms.counterMu.Lock()
|
|
_, syncExists := ms.counterCache[name]
|
|
if !syncExists {
|
|
var z uint64
|
|
ms.counterCache[name] = &z
|
|
}
|
|
atomic.AddUint64(ms.counterCache[name], amount)
|
|
ms.counterMu.Unlock()
|
|
}
|
|
}
|
|
|
|
// Histogram is used for generating rich metrics, such as percentiles, from
|
|
// periodically occurring continuous values.
|
|
func (ms *MetricSystem) Histogram(name string, value float64) {
|
|
compressedValue := compress(value)
|
|
ms.histogramMu.RLock()
|
|
_, present := ms.histogramCache[name][compressedValue]
|
|
if present {
|
|
atomic.AddUint64(ms.histogramCache[name][compressedValue], 1)
|
|
ms.histogramMu.RUnlock()
|
|
} else {
|
|
ms.histogramMu.RUnlock()
|
|
ms.histogramMu.Lock()
|
|
_, syncPresent := ms.histogramCache[name][compressedValue]
|
|
if !syncPresent {
|
|
var z uint64
|
|
_, mapPresent := ms.histogramCache[name]
|
|
if !mapPresent {
|
|
ms.histogramCache[name] = make(map[int16]*uint64)
|
|
}
|
|
ms.histogramCache[name][compressedValue] = &z
|
|
}
|
|
atomic.AddUint64(ms.histogramCache[name][compressedValue], 1)
|
|
ms.histogramMu.Unlock()
|
|
}
|
|
}
|
|
|
|
// RegisterGaugeFunc registers a function to be called at each interval
|
|
// whose return value will be used to populate the <name> metric.
|
|
func (ms *MetricSystem) RegisterGaugeFunc(name string, f func() float64) {
|
|
ms.gaugeFuncsMu.Lock()
|
|
ms.gaugeFuncs[name] = f
|
|
ms.gaugeFuncsMu.Unlock()
|
|
}
|
|
|
|
// DeregisterGaugeFunc deregisters a function for the <name> metric.
|
|
func (ms *MetricSystem) DeregisterGaugeFunc(name string) {
|
|
ms.gaugeFuncsMu.Lock()
|
|
delete(ms.gaugeFuncs, name)
|
|
ms.gaugeFuncsMu.Unlock()
|
|
}
|
|
|
|
// compress takes a float64 and lossily shrinks it to an int16 to facilitate
|
|
// bucketing of histogram values, staying within 1% of the true value. This
|
|
// fails for large values of 1e142 and above, and is inaccurate for values
|
|
// closer to 0 than +/- 0.51 or +/- math.Inf.
|
|
func compress(value float64) int16 {
|
|
i := int16(precision*math.Log(1.0+math.Abs(value)) + 0.5)
|
|
if value < 0 {
|
|
return -1 * i
|
|
}
|
|
return i
|
|
}
|
|
|
|
// decompress takes a lossily shrunk int16 and returns a float64 within 1% of
|
|
// the original float64 passed to compress.
|
|
func decompress(compressedValue int16) float64 {
|
|
f := math.Exp(math.Abs(float64(compressedValue))/precision) - 1.0
|
|
if compressedValue < 0 {
|
|
return -1.0 * f
|
|
}
|
|
return f
|
|
}
|
|
|
|
// processHistograms derives rich metrics from histograms, currently
|
|
// percentiles, sum, count, and mean.
|
|
func (ms *MetricSystem) processHistograms(name string,
|
|
valuesToCounts map[int16]*uint64) map[string]float64 {
|
|
output := make(map[string]float64)
|
|
totalSum := float64(0)
|
|
totalCount := uint64(0)
|
|
proportions := make([]proportion, 0, len(valuesToCounts))
|
|
for compressedValue, count := range valuesToCounts {
|
|
value := decompress(compressedValue)
|
|
totalSum += value * float64(*count)
|
|
totalCount += *count
|
|
proportions = append(proportions, proportion{Value: value, Count: *count})
|
|
}
|
|
|
|
sumName := fmt.Sprintf("%s_sum", name)
|
|
countName := fmt.Sprintf("%s_count", name)
|
|
avgName := fmt.Sprintf("%s_avg", name)
|
|
|
|
// increment interval sum and count
|
|
output[countName] = float64(totalCount)
|
|
output[sumName] = totalSum
|
|
output[avgName] = totalSum / float64(totalCount)
|
|
|
|
// increment aggregate sum and count
|
|
ms.histogramCountMu.RLock()
|
|
_, present := ms.histogramCountStore[sumName]
|
|
if !present {
|
|
ms.histogramCountMu.RUnlock()
|
|
ms.histogramCountMu.Lock()
|
|
_, syncPresent := ms.histogramCountStore[sumName]
|
|
if !syncPresent {
|
|
var x uint64
|
|
ms.histogramCountStore[sumName] = &x
|
|
var z uint64
|
|
ms.histogramCountStore[countName] = &z
|
|
}
|
|
ms.histogramCountMu.Unlock()
|
|
ms.histogramCountMu.RLock()
|
|
}
|
|
atomic.AddUint64(ms.histogramCountStore[sumName], uint64(totalSum))
|
|
atomic.AddUint64(ms.histogramCountStore[countName], totalCount)
|
|
ms.histogramCountMu.RUnlock()
|
|
|
|
for label, p := range ms.percentiles {
|
|
value, err := percentile(totalCount, proportions, p)
|
|
if err != nil {
|
|
glog.Errorf("unable to calculate percentile: %s", err)
|
|
} else {
|
|
output[fmt.Sprintf(label, name)] = value
|
|
}
|
|
}
|
|
return output
|
|
}
|
|
|
|
// These next 3 methods are for the implementation of sort.Interface
|
|
|
|
func (s proportionArray) Len() int {
|
|
return len(s)
|
|
}
|
|
|
|
func (s proportionArray) Less(i, j int) bool {
|
|
return s[i].Value < s[j].Value
|
|
}
|
|
|
|
func (s proportionArray) Swap(i, j int) {
|
|
s[i], s[j] = s[j], s[i]
|
|
}
|
|
|
|
// percentile calculates a percentile represented as a float64 between 0 and 1
|
|
// inclusive from a proportionArray. totalCount is the sum of all counts of
|
|
// elements in the proportionArray.
|
|
func percentile(totalCount uint64, proportions proportionArray,
|
|
percentile float64) (float64, error) {
|
|
//TODO(tyler) handle multiple percentiles at once for efficiency
|
|
sort.Sort(proportions)
|
|
sofar := uint64(0)
|
|
for _, proportion := range proportions {
|
|
sofar += proportion.Count
|
|
if float64(sofar)/float64(totalCount) >= percentile {
|
|
return proportion.Value, nil
|
|
}
|
|
}
|
|
return 0, errors.New("Invalid percentile. Should be between 0 and 1.")
|
|
}
|
|
|
|
func (ms *MetricSystem) collectRawMetrics() *RawMetricSet {
|
|
normalizedInterval := time.Unix(0, time.Now().UnixNano()/
|
|
ms.interval.Nanoseconds()*
|
|
ms.interval.Nanoseconds())
|
|
|
|
ms.counterMu.Lock()
|
|
freshCounters := ms.counterCache
|
|
ms.counterCache = make(map[string]*uint64)
|
|
ms.counterMu.Unlock()
|
|
|
|
rates := make(map[string]uint64)
|
|
for name, count := range freshCounters {
|
|
rates[name] = *count
|
|
}
|
|
|
|
counters := make(map[string]uint64)
|
|
ms.counterStoreMu.RLock()
|
|
// update counters
|
|
for name, count := range freshCounters {
|
|
_, exists := ms.counterStore[name]
|
|
// only take a write lock when it's a totally new counter
|
|
if !exists {
|
|
ms.counterStoreMu.RUnlock()
|
|
ms.counterStoreMu.Lock()
|
|
_, syncExists := ms.counterStore[name]
|
|
if !syncExists {
|
|
var z uint64
|
|
ms.counterStore[name] = &z
|
|
}
|
|
ms.counterStoreMu.Unlock()
|
|
ms.counterStoreMu.RLock()
|
|
}
|
|
atomic.AddUint64(ms.counterStore[name], *count)
|
|
}
|
|
// copy counters for export
|
|
for name, count := range ms.counterStore {
|
|
counters[name] = *count
|
|
}
|
|
ms.counterStoreMu.RUnlock()
|
|
|
|
ms.histogramMu.Lock()
|
|
histograms := ms.histogramCache
|
|
ms.histogramCache = make(map[string]map[int16]*uint64)
|
|
ms.histogramMu.Unlock()
|
|
|
|
ms.gaugeFuncsMu.Lock()
|
|
gauges := make(map[string]float64)
|
|
for name, f := range ms.gaugeFuncs {
|
|
gauges[name] = f()
|
|
}
|
|
ms.gaugeFuncsMu.Unlock()
|
|
|
|
return &RawMetricSet{
|
|
Time: normalizedInterval,
|
|
Counters: counters,
|
|
Rates: rates,
|
|
Histograms: histograms,
|
|
Gauges: gauges,
|
|
}
|
|
}
|
|
|
|
// processMetrics (potentially slowly) creates human consumable metrics from a
|
|
// RawMetricSet, deriving rich statistics from histograms such as percentiles.
|
|
func (ms *MetricSystem) processMetrics(
|
|
rawMetrics *RawMetricSet) *ProcessedMetricSet {
|
|
metrics := make(map[string]float64)
|
|
|
|
for name, count := range rawMetrics.Counters {
|
|
metrics[name] = float64(count)
|
|
}
|
|
|
|
for name, count := range rawMetrics.Rates {
|
|
metrics[fmt.Sprintf("%s_rate", name)] = float64(count)
|
|
}
|
|
|
|
for name, valuesToCounts := range rawMetrics.Histograms {
|
|
for histoName, histoValue := range ms.processHistograms(name, valuesToCounts) {
|
|
metrics[histoName] = histoValue
|
|
}
|
|
}
|
|
|
|
for name, value := range rawMetrics.Gauges {
|
|
metrics[name] = value
|
|
}
|
|
|
|
return &ProcessedMetricSet{Time: rawMetrics.Time, Metrics: metrics}
|
|
}
|
|
|
|
func (ms *MetricSystem) updateSubscribers() {
|
|
ms.subscribersMu.Lock()
|
|
defer ms.subscribersMu.Unlock()
|
|
for {
|
|
select {
|
|
case subscriber := <-ms.subscribeToRawMetrics:
|
|
ms.rawSubscribers[subscriber] = struct{}{}
|
|
case unsubscriber := <-ms.unsubscribeFromRawMetrics:
|
|
delete(ms.rawSubscribers, unsubscriber)
|
|
case subscriber := <-ms.subscribeToProcessedMetrics:
|
|
ms.processedSubscribers[subscriber] = struct{}{}
|
|
case unsubscriber := <-ms.unsubscribeFromProcessedMetrics:
|
|
delete(ms.processedSubscribers, unsubscriber)
|
|
default: // no changes in subscribers
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// reaper wakes up every <interval> seconds,
|
|
// collects and processes metrics, and pushes
|
|
// them to the corresponding subscribing channels.
|
|
func (ms *MetricSystem) reaper() {
|
|
ms.reaping = true
|
|
|
|
// create goroutine pool to handle multiple processing tasks at once
|
|
processChan := make(chan func(), 16)
|
|
for i := 0; i < int(math.Max(float64(runtime.NumCPU()/4), 4)); i++ {
|
|
go func() {
|
|
for {
|
|
c, ok := <-processChan
|
|
if !ok {
|
|
return
|
|
}
|
|
c()
|
|
}
|
|
}()
|
|
}
|
|
|
|
// begin reaper main loop
|
|
for {
|
|
// sleep until the next interval, or die if shutdownChan is closed
|
|
tts := ms.interval.Nanoseconds() -
|
|
(time.Now().UnixNano() % ms.interval.Nanoseconds())
|
|
select {
|
|
case <-time.After(time.Duration(tts)):
|
|
case <-ms.shutdownChan:
|
|
ms.reaping = false
|
|
close(processChan)
|
|
return
|
|
}
|
|
|
|
rawMetrics := ms.collectRawMetrics()
|
|
|
|
ms.updateSubscribers()
|
|
|
|
// broadcast raw metrics
|
|
for subscriber := range ms.rawSubscribers {
|
|
// new subscribers get all counters, otherwise just the new diffs
|
|
select {
|
|
case subscriber <- rawMetrics:
|
|
delete(ms.rawBadSubscribers, subscriber)
|
|
default:
|
|
ms.rawBadSubscribers[subscriber]++
|
|
glog.Error("a raw subscriber has allowed their channel to fill up. ",
|
|
"dropping their metrics on the floor rather than blocking.")
|
|
if ms.rawBadSubscribers[subscriber] >= 2 {
|
|
glog.Error("this raw subscriber has caused dropped metrics at ",
|
|
"least 3 times in a row. closing the channel.")
|
|
delete(ms.rawSubscribers, subscriber)
|
|
close(subscriber)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Perform the rest in another goroutine since processing is not
|
|
// gauranteed to complete before the interval is up.
|
|
sendProcessed := func() {
|
|
// this is potentially expensive if there is a massive number of metrics
|
|
processedMetrics := ms.processMetrics(rawMetrics)
|
|
|
|
// add aggregate mean
|
|
for name := range rawMetrics.Histograms {
|
|
ms.histogramCountMu.RLock()
|
|
aggCountPtr, countPresent :=
|
|
ms.histogramCountStore[fmt.Sprintf("%s_count", name)]
|
|
aggCount := atomic.LoadUint64(aggCountPtr)
|
|
aggSumPtr, sumPresent :=
|
|
ms.histogramCountStore[fmt.Sprintf("%s_sum", name)]
|
|
aggSum := atomic.LoadUint64(aggSumPtr)
|
|
ms.histogramCountMu.RUnlock()
|
|
|
|
if countPresent && sumPresent && aggCount > 0 {
|
|
processedMetrics.Metrics[fmt.Sprintf("%s_agg_avg", name)] =
|
|
float64(aggSum / aggCount)
|
|
processedMetrics.Metrics[fmt.Sprintf("%s_agg_count", name)] =
|
|
float64(aggCount)
|
|
processedMetrics.Metrics[fmt.Sprintf("%s_agg_sum", name)] =
|
|
float64(aggSum)
|
|
}
|
|
}
|
|
|
|
// broadcast processed metrics
|
|
ms.subscribersMu.Lock()
|
|
for subscriber := range ms.processedSubscribers {
|
|
select {
|
|
case subscriber <- processedMetrics:
|
|
delete(ms.processedBadSubscribers, subscriber)
|
|
default:
|
|
ms.processedBadSubscribers[subscriber]++
|
|
glog.Error("a subscriber has allowed their channel to fill up. ",
|
|
"dropping their metrics on the floor rather than blocking.")
|
|
if ms.processedBadSubscribers[subscriber] >= 2 {
|
|
glog.Error("this subscriber has caused dropped metrics at ",
|
|
"least 3 times in a row. closing the channel.")
|
|
delete(ms.processedSubscribers, subscriber)
|
|
close(subscriber)
|
|
}
|
|
}
|
|
}
|
|
ms.subscribersMu.Unlock()
|
|
}
|
|
select {
|
|
case processChan <- sendProcessed:
|
|
default:
|
|
// processChan has filled up, this metric load is not sustainable
|
|
glog.Errorf("processing of metrics is taking longer than this node can "+
|
|
"handle. dropping this entire interval of %s metrics on the "+
|
|
"floor rather than blocking the reaper.", rawMetrics.Time)
|
|
}
|
|
} // end main reaper loop
|
|
}
|
|
|
|
// Start spawns a goroutine for merging metrics into caches from
|
|
// metric submitters, and a reaper goroutine that harvests metrics at the
|
|
// default interval of every 60 seconds.
|
|
func (ms *MetricSystem) Start() {
|
|
if !ms.reaping {
|
|
go ms.reaper()
|
|
}
|
|
}
|
|
|
|
// Stop shuts down a MetricSystem
|
|
func (ms *MetricSystem) Stop() {
|
|
close(ms.shutdownChan)
|
|
}
|