From 7bca757d09954bf676839ff030cbc90808a0ed29 Mon Sep 17 00:00:00 2001 From: Michal Witkowski Date: Wed, 17 Jun 2015 14:32:13 +0100 Subject: [PATCH] *: add metrics to `store` and `proxy`. --- Documentation/metrics.md | 71 ++++++++++++++++++++++ etcdmain/etcd.go | 6 +- proxy/metrics.go | 86 ++++++++++++++++++++++++++ proxy/reverse.go | 8 ++- store/metrics.go | 128 +++++++++++++++++++++++++++++++++++++++ store/store.go | 30 +++++++++ store/watcher_hub.go | 4 ++ 7 files changed, 331 insertions(+), 2 deletions(-) create mode 100644 proxy/metrics.go create mode 100644 store/metrics.go diff --git a/Documentation/metrics.md b/Documentation/metrics.md index d077998ff..26e9c0152 100644 --- a/Documentation/metrics.md +++ b/Documentation/metrics.md @@ -30,6 +30,45 @@ Pending proposal (`pending_proposal_total`) gives you an idea about how many pro Failed proposals (`proposal_failed_total`) are normally related to two issues: temporary failures related to a leader election or longer duration downtime caused by a loss of quorum in the cluster. + +### store + +These metrics describe the accesses into the data store of etcd members that exist in the cluster. They +are useful to count what kind of actions are taken by users. It is also useful to see and whether all etcd members +"see" the same set of data mutations, and whether reads and watches (which are local) are equally distributed. + +All these metrics are prefixed with `etcd_store_`. + +| Name | Description | Type | +|---------------------------|------------------------------------------------------------------------------------------|--------------------| +| reads_total | Total number of reads from store, should differ among etcd members (local reads). | Counter(action) | +| writes_total | Total number of writes to store, should be same among all etcd members. | Counter(action) | +| reads_failed_total | Number of failed reads from store (e.g. key missing) on local reads. | Counter(action) | +| writes_failed_total | Number of failed writes to store (e.g. failed compare and swap). | Counter(action) | +| expires_total | Total number of expired keys (due to TTL).   | Counter | +| watch_requests_totals | Total number of incoming watch requests to this etcd member (local watches). | Counter | +| watchers | Current count of active watchers on this etcd member. | Gauge | + +Both `reads_total` and `writes_total` count both successful and failed requests. `reads_failed_total` and +`writes_failed_total` count failed requests. A lot of failed writes indicate possible contentions on keys (e.g. when +doing `compareAndSet`), and read failures indicate that some clients try to access keys that don't exist. + +Example Prometheus queries that may be useful from these metrics (across all etcd members): + + * `sum(rate(etcd_store_reads_total{job="etcd"}[1m])) by (action)` + `max(rate(etcd_store_writes_total{job="etcd"}[1m])) by (action)` + + Rate of reads and writes by action, across all servers across a time window of `1m`. The reason why `max` is used + for writes as opposed to `sum` for reads is because all of etcd nodes in the cluster apply all writes to their stores. + Shows the rate of successfull readonly/write queries across all servers, across a time window of `1m`. + * `sum(rate(etcd_store_watch_requests_total{job="etcd"}[1m]))` + + Shows rate of new watch requests per second. Likely driven by how often watched keys change. + * `sum(etcd_store_watchers{job="etcd"})` + + Number of active watchers across all etcd servers. + + ### wal | Name | Description | Type | @@ -47,6 +86,7 @@ Abnormally high fsync duration (`fsync_durations_microseconds`) indicates disk i Abnormally high snapshot duration (`snapshot_save_total_durations_microseconds`) indicates disk issues and might cause the cluster to be unstable. + ### rafthttp | Name | Description | Type | Labels | @@ -64,3 +104,34 @@ Label `sendingType` is the connection type to send messages. `message`, `msgapp` Label `msgType` is the type of raft message. `MsgApp` is log replication message; `MsgSnap` is snapshot install message; `MsgProp` is proposal forward message; the others are used to maintain raft internal status. If you have a large snapshot, you would expect a long msgSnap sending latency. For other types of messages, you would expect low latency, which is comparable to your ping latency if you have enough network bandwidth. Label `remoteID` is the member ID of the message destination. + + +### proxy + +etcd members operating in proxy mode do not do store operations. They forward all requests + to cluster instances. + +Tracking the rate of requests coming from a proxy allows one to pin down which machine is performing most reads/writes. + +All these metrics are prefixed with `etcd_proxy_` + +| Name | Description | Type | +|---------------------------|-----------------------------------------------------------------------------------------|--------------------| +| requests_total | Total number of requests by this proxy instance. . | Counter(method) | +| handled_total | Total number of fully handled requests, with responses from etcd members. | Counter(method) | +| dropped_total | Total number of dropped requests due to forwarding errors to etcd members.  | Counter(method,error) | +| handling_duration_seconds | Bucketed handling times by HTTP method, including round trip to member instances. | Histogram(method) | + +Example Prometheus queries that may be useful from these metrics (across all etcd servers): + + * `sum(rate(etcd_proxy_handled_total{job="etcd"}[1m])) by (method)` + + Rate of requests (by HTTP method) handled by all proxies, across a window of `1m`. + * `histogram_quantile(0.9, sum(increase(etcd_proxy_events_handling_time_seconds_bucket{job="etcd",method="GET"}[5m])) by (le))` + `histogram_quantile(0.9, sum(increase(etcd_proxy_events_handling_time_seconds_bucket{job="etcd",method!="GET"}[5m])) by (le))` + + Show the 0.90-tile latency (in seconds) of handling of user requestsacross all proxy machines, with a window of `5m`. + * `sum(rate(etcd_proxy_dropped_total{job="etcd"}[1m])) by (proxying_error)` + + Number of failed request on the proxy. This should be 0, spikes here indicate connectivity issues to etcd cluster. + \ No newline at end of file diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index e8fcd7dd7..adf87c828 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -39,6 +39,7 @@ import ( "github.com/coreos/etcd/rafthttp" "github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog" + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" ) type dirType string @@ -342,7 +343,10 @@ func startProxy(cfg *config) error { host := u.Host go func() { plog.Info("proxy: listening for client requests on ", host) - plog.Fatal(http.Serve(l, ph)) + mux := http.NewServeMux() + mux.Handle("/metrics", prometheus.Handler()) + mux.Handle("/", ph) + plog.Fatal(http.Serve(l, mux)) }() } return nil diff --git a/proxy/metrics.go b/proxy/metrics.go new file mode 100644 index 000000000..fee57c833 --- /dev/null +++ b/proxy/metrics.go @@ -0,0 +1,86 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" + "net/http" + "strconv" + "time" +) + +var ( + requestsIncoming = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "proxy", + Name: "requests_total", + Help: "Counter requests incoming by method.", + }, []string{"method"}) + + requestsHandled = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "proxy", + Name: "handled_total", + Help: "Counter of requests fully handled (by authoratitave servers)", + }, []string{"method", "code"}) + + requestsDropped = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "proxy", + Name: "dropped_total", + Help: "Counter of requests dropped on the proxy.", + }, []string{"method", "proxying_error"}) + + requestsHandlingTime = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "proxy", + Name: "handling_duration_seconds", + Help: "Bucketed histogram of handling time of successful events (non-watches), by method " + + "(GET/PUT etc.).", + Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13), + }, []string{"method"}) +) + +type forwardingError string + +const ( + zeroEndpoints forwardingError = "zero_endpoints" + failedSendingRequest forwardingError = "failed_sending_request" + failedGettingResponse forwardingError = "failed_getting_response" +) + +func init() { + prometheus.MustRegister(requestsIncoming) + prometheus.MustRegister(requestsHandled) + prometheus.MustRegister(requestsDropped) + prometheus.MustRegister(requestsHandlingTime) +} + +func reportIncomingRequest(request *http.Request) { + requestsIncoming.WithLabelValues(request.Method).Inc() +} + +func reportRequestHandled(request *http.Request, response *http.Response, startTime time.Time) { + method := request.Method + requestsHandled.WithLabelValues(method, strconv.Itoa(response.StatusCode)).Inc() + requestsHandlingTime.WithLabelValues(method).Observe(time.Since(startTime).Seconds()) +} + +func reportRequestDropped(request *http.Request, err forwardingError) { + requestsDropped.WithLabelValues(request.Method, string(err)).Inc() +} diff --git a/proxy/reverse.go b/proxy/reverse.go index 0d6822e52..00f194ba5 100644 --- a/proxy/reverse.go +++ b/proxy/reverse.go @@ -27,6 +27,7 @@ import ( "sync/atomic" "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes" + "time" ) // Hop-by-hop headers. These are removed when sent to the backend. @@ -57,6 +58,7 @@ type reverseProxy struct { func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request) { proxyreq := new(http.Request) *proxyreq = *clientreq + startTime := time.Now() var ( proxybody []byte @@ -84,6 +86,8 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request endpoints := p.director.endpoints() if len(endpoints) == 0 { msg := "proxy: zero endpoints currently available" + reportRequestDropped(clientreq, zeroEndpoints) + // TODO: limit the rate of the error logging. log.Printf(msg) e := httptypes.NewHTTPError(http.StatusServiceUnavailable, msg) @@ -127,6 +131,7 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request return } if err != nil { + reportRequestDropped(clientreq, failedSendingRequest) log.Printf("proxy: failed to direct request to %s: %v", ep.URL.String(), err) ep.Failed() continue @@ -138,6 +143,7 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request if res == nil { // TODO: limit the rate of the error logging. msg := fmt.Sprintf("proxy: unable to get response from %d endpoint(s)", len(endpoints)) + reportRequestDropped(clientreq, failedGettingResponse) log.Printf(msg) e := httptypes.NewHTTPError(http.StatusBadGateway, msg) e.WriteTo(rw) @@ -145,7 +151,7 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request } defer res.Body.Close() - + reportRequestHandled(clientreq, res, startTime) removeSingleHopHeaders(&res.Header) copyHeader(rw.Header(), res.Header) diff --git a/store/metrics.go b/store/metrics.go new file mode 100644 index 000000000..91bc803f9 --- /dev/null +++ b/store/metrics.go @@ -0,0 +1,128 @@ +// Copyright 2015 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package store + +import ( + "github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus" +) + +// Set of raw Prometheus metrics. +// Labels +// * action = declared in event.go +// * outcome = Outcome +// Do not increment directly, use Report* methods. +var ( + readCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "store", + Name: "reads_total", + Help: "Total number of reads action by (get/getRecursive), local to this member.", + }, []string{"action"}) + + writeCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "store", + Name: "writes_total", + Help: "Total number of writes (e.g. set/compareAndDelete) seen by this member.", + }, []string{"action"}) + + readFailedCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "store", + Name: "reads_failed_total", + Help: "Failed read actions by (get/getRecursive), local to this member.", + }, []string{"action"}) + + writeFailedCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "store", + Name: "writes_failed_total", + Help: "Failed write actions (e.g. set/compareAndDelete), seen by this member.", + }, []string{"action"}) + + expireCounter = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "store", + Name: "expires_total", + Help: "Total number of expired keys.", + }) + + watchRequests = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "store", + Name: "watch_requests_total", + Help: "Total number of incoming watch requests (new or reestablished).", + }) + + watcherCount = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "store", + Name: "watchers", + Help: "Count of currently active watchers.", + }) +) + +const ( + GetRecursive = "getRecursive" +) + +func init() { + prometheus.MustRegister(readCounter) + prometheus.MustRegister(writeCounter) + prometheus.MustRegister(expireCounter) + prometheus.MustRegister(watchRequests) + prometheus.MustRegister(watcherCount) +} + +func reportReadSuccess(read_action string) { + readCounter.WithLabelValues(read_action).Inc() +} + +func reportReadFailure(read_action string) { + readCounter.WithLabelValues(read_action).Inc() + readFailedCounter.WithLabelValues(read_action).Inc() +} + +func reportWriteSuccess(write_action string) { + writeCounter.WithLabelValues(write_action).Inc() +} + +func reportWriteFailure(write_action string) { + writeCounter.WithLabelValues(write_action).Inc() + writeFailedCounter.WithLabelValues(write_action).Inc() +} + +func reportExpiredKey() { + expireCounter.Inc() +} + +func reportWatchRequest() { + watchRequests.Inc() +} + +func reportWatcherAdded() { + watcherCount.Inc() +} + +func reportWatcherRemoved() { + watcherCount.Dec() +} diff --git a/store/store.go b/store/store.go index 54dd0f1a1..4e74e24f1 100644 --- a/store/store.go +++ b/store/store.go @@ -121,6 +121,11 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) { if err != nil { s.Stats.Inc(GetFail) + if recursive { + reportReadFailure(GetRecursive) + } else { + reportReadFailure(Get) + } return nil, err } @@ -129,6 +134,11 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) { e.Node.loadInternalNode(n, recursive, sorted, s.clock) s.Stats.Inc(GetSuccess) + if recursive { + reportReadSuccess(GetRecursive) + } else { + reportReadSuccess(Get) + } return e, nil } @@ -145,8 +155,10 @@ func (s *store) Create(nodePath string, dir bool, value string, unique bool, exp e.EtcdIndex = s.CurrentIndex s.WatcherHub.notify(e) s.Stats.Inc(CreateSuccess) + reportWriteSuccess(Create) } else { s.Stats.Inc(CreateFail) + reportWriteFailure(Create) } return e, err @@ -162,8 +174,10 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim defer func() { if err == nil { s.Stats.Inc(SetSuccess) + reportWriteSuccess(Set) } else { s.Stats.Inc(SetFail) + reportWriteFailure(Set) } }() @@ -221,11 +235,13 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint if err != nil { s.Stats.Inc(CompareAndSwapFail) + reportWriteFailure(CompareAndSwap) return nil, err } if n.IsDir() { // can only compare and swap file s.Stats.Inc(CompareAndSwapFail) + reportWriteFailure(CompareAndSwap) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex) } @@ -234,6 +250,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint if ok, which := n.Compare(prevValue, prevIndex); !ok { cause := getCompareFailCause(n, which, prevValue, prevIndex) s.Stats.Inc(CompareAndSwapFail) + reportWriteFailure(CompareAndSwap) return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex) } @@ -256,6 +273,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint s.WatcherHub.notify(e) s.Stats.Inc(CompareAndSwapSuccess) + reportWriteSuccess(CompareAndSwap) return e, nil } @@ -281,6 +299,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) { if err != nil { // if the node does not exist, return error s.Stats.Inc(DeleteFail) + reportWriteFailure(Delete) return nil, err } @@ -303,6 +322,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) { if err != nil { s.Stats.Inc(DeleteFail) + reportWriteFailure(Delete) return nil, err } @@ -312,6 +332,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) { s.WatcherHub.notify(e) s.Stats.Inc(DeleteSuccess) + reportWriteSuccess(Delete) return e, nil } @@ -326,11 +347,13 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui if err != nil { // if the node does not exist, return error s.Stats.Inc(CompareAndDeleteFail) + reportWriteFailure(CompareAndDelete) return nil, err } if n.IsDir() { // can only compare and delete file s.Stats.Inc(CompareAndSwapFail) + reportWriteFailure(CompareAndDelete) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex) } @@ -339,6 +362,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui if ok, which := n.Compare(prevValue, prevIndex); !ok { cause := getCompareFailCause(n, which, prevValue, prevIndex) s.Stats.Inc(CompareAndDeleteFail) + reportWriteFailure(CompareAndDelete) return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex) } @@ -361,6 +385,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui s.WatcherHub.notify(e) s.Stats.Inc(CompareAndDeleteSuccess) + reportWriteSuccess(CompareAndDelete) return e, nil } @@ -423,6 +448,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) ( if err != nil { // if the node does not exist, return error s.Stats.Inc(UpdateFail) + reportWriteFailure(Update) return nil, err } @@ -434,6 +460,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) ( if n.IsDir() && len(newValue) != 0 { // if the node is a directory, we cannot update value to non-empty s.Stats.Inc(UpdateFail) + reportWriteFailure(Update) return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex) } @@ -455,6 +482,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) ( s.WatcherHub.notify(e) s.Stats.Inc(UpdateSuccess) + reportWriteSuccess(Update) s.CurrentIndex = nextIndex @@ -490,6 +518,7 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique, if err != nil { s.Stats.Inc(SetFail) + reportWriteFailure(action) err.Index = currIndex return nil, err } @@ -592,6 +621,7 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) { s.ttlKeyHeap.pop() node.Remove(true, true, callback) + reportExpiredKey() s.Stats.Inc(ExpireCount) s.WatcherHub.notify(e) diff --git a/store/watcher_hub.go b/store/watcher_hub.go index 0a7fa3c65..d27ace06b 100644 --- a/store/watcher_hub.go +++ b/store/watcher_hub.go @@ -53,6 +53,7 @@ func newWatchHub(capacity int) *watcherHub { // If recursive is false, the first change after index at key will be sent to the event channel of the watcher. // If index is zero, watch will start from the current index + 1. func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *etcdErr.Error) { + reportWatchRequest() event, err := wh.EventHistory.scan(key, recursive, index) if err != nil { @@ -97,12 +98,14 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde w.removed = true l.Remove(elem) atomic.AddInt64(&wh.count, -1) + reportWatcherRemoved() if l.Len() == 0 { delete(wh.watchers, key) } } atomic.AddInt64(&wh.count, 1) + reportWatcherAdded() return w, nil } @@ -148,6 +151,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) { w.removed = true l.Remove(curr) atomic.AddInt64(&wh.count, -1) + reportWatcherRemoved() } }