mirror of
https://github.com/etcd-io/etcd.git
synced 2024-09-27 06:25:44 +00:00
Merge pull request #3001 from mwitkow-io/feature/rich_metrics
Etcd Rich Metrics
This commit is contained in:
commit
8ab388fa56
@ -30,6 +30,45 @@ Pending proposal (`pending_proposal_total`) gives you an idea about how many pro
|
||||
|
||||
Failed proposals (`proposal_failed_total`) are normally related to two issues: temporary failures related to a leader election or longer duration downtime caused by a loss of quorum in the cluster.
|
||||
|
||||
|
||||
### store
|
||||
|
||||
These metrics describe the accesses into the data store of etcd members that exist in the cluster. They
|
||||
are useful to count what kind of actions are taken by users. It is also useful to see and whether all etcd members
|
||||
"see" the same set of data mutations, and whether reads and watches (which are local) are equally distributed.
|
||||
|
||||
All these metrics are prefixed with `etcd_store_`.
|
||||
|
||||
| Name | Description | Type |
|
||||
|---------------------------|------------------------------------------------------------------------------------------|--------------------|
|
||||
| reads_total | Total number of reads from store, should differ among etcd members (local reads). | Counter(action) |
|
||||
| writes_total | Total number of writes to store, should be same among all etcd members. | Counter(action) |
|
||||
| reads_failed_total | Number of failed reads from store (e.g. key missing) on local reads. | Counter(action) |
|
||||
| writes_failed_total | Number of failed writes to store (e.g. failed compare and swap). | Counter(action) |
|
||||
| expires_total | Total number of expired keys (due to TTL). | Counter |
|
||||
| watch_requests_totals | Total number of incoming watch requests to this etcd member (local watches). | Counter |
|
||||
| watchers | Current count of active watchers on this etcd member. | Gauge |
|
||||
|
||||
Both `reads_total` and `writes_total` count both successful and failed requests. `reads_failed_total` and
|
||||
`writes_failed_total` count failed requests. A lot of failed writes indicate possible contentions on keys (e.g. when
|
||||
doing `compareAndSet`), and read failures indicate that some clients try to access keys that don't exist.
|
||||
|
||||
Example Prometheus queries that may be useful from these metrics (across all etcd members):
|
||||
|
||||
* `sum(rate(etcd_store_reads_total{job="etcd"}[1m])) by (action)`
|
||||
`max(rate(etcd_store_writes_total{job="etcd"}[1m])) by (action)`
|
||||
|
||||
Rate of reads and writes by action, across all servers across a time window of `1m`. The reason why `max` is used
|
||||
for writes as opposed to `sum` for reads is because all of etcd nodes in the cluster apply all writes to their stores.
|
||||
Shows the rate of successfull readonly/write queries across all servers, across a time window of `1m`.
|
||||
* `sum(rate(etcd_store_watch_requests_total{job="etcd"}[1m]))`
|
||||
|
||||
Shows rate of new watch requests per second. Likely driven by how often watched keys change.
|
||||
* `sum(etcd_store_watchers{job="etcd"})`
|
||||
|
||||
Number of active watchers across all etcd servers.
|
||||
|
||||
|
||||
### wal
|
||||
|
||||
| Name | Description | Type |
|
||||
@ -47,6 +86,7 @@ Abnormally high fsync duration (`fsync_durations_microseconds`) indicates disk i
|
||||
|
||||
Abnormally high snapshot duration (`snapshot_save_total_durations_microseconds`) indicates disk issues and might cause the cluster to be unstable.
|
||||
|
||||
|
||||
### rafthttp
|
||||
|
||||
| Name | Description | Type | Labels |
|
||||
@ -64,3 +104,34 @@ Label `sendingType` is the connection type to send messages. `message`, `msgapp`
|
||||
Label `msgType` is the type of raft message. `MsgApp` is log replication message; `MsgSnap` is snapshot install message; `MsgProp` is proposal forward message; the others are used to maintain raft internal status. If you have a large snapshot, you would expect a long msgSnap sending latency. For other types of messages, you would expect low latency, which is comparable to your ping latency if you have enough network bandwidth.
|
||||
|
||||
Label `remoteID` is the member ID of the message destination.
|
||||
|
||||
|
||||
### proxy
|
||||
|
||||
etcd members operating in proxy mode do not do store operations. They forward all requests
|
||||
to cluster instances.
|
||||
|
||||
Tracking the rate of requests coming from a proxy allows one to pin down which machine is performing most reads/writes.
|
||||
|
||||
All these metrics are prefixed with `etcd_proxy_`
|
||||
|
||||
| Name | Description | Type |
|
||||
|---------------------------|-----------------------------------------------------------------------------------------|--------------------|
|
||||
| requests_total | Total number of requests by this proxy instance. . | Counter(method) |
|
||||
| handled_total | Total number of fully handled requests, with responses from etcd members. | Counter(method) |
|
||||
| dropped_total | Total number of dropped requests due to forwarding errors to etcd members. | Counter(method,error) |
|
||||
| handling_duration_seconds | Bucketed handling times by HTTP method, including round trip to member instances. | Histogram(method) |
|
||||
|
||||
Example Prometheus queries that may be useful from these metrics (across all etcd servers):
|
||||
|
||||
* `sum(rate(etcd_proxy_handled_total{job="etcd"}[1m])) by (method)`
|
||||
|
||||
Rate of requests (by HTTP method) handled by all proxies, across a window of `1m`.
|
||||
* `histogram_quantile(0.9, sum(increase(etcd_proxy_events_handling_time_seconds_bucket{job="etcd",method="GET"}[5m])) by (le))`
|
||||
`histogram_quantile(0.9, sum(increase(etcd_proxy_events_handling_time_seconds_bucket{job="etcd",method!="GET"}[5m])) by (le))`
|
||||
|
||||
Show the 0.90-tile latency (in seconds) of handling of user requestsacross all proxy machines, with a window of `5m`.
|
||||
* `sum(rate(etcd_proxy_dropped_total{job="etcd"}[1m])) by (proxying_error)`
|
||||
|
||||
Number of failed request on the proxy. This should be 0, spikes here indicate connectivity issues to etcd cluster.
|
||||
|
@ -39,6 +39,7 @@ import (
|
||||
"github.com/coreos/etcd/rafthttp"
|
||||
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type dirType string
|
||||
@ -342,7 +343,10 @@ func startProxy(cfg *config) error {
|
||||
host := u.Host
|
||||
go func() {
|
||||
plog.Info("proxy: listening for client requests on ", host)
|
||||
plog.Fatal(http.Serve(l, ph))
|
||||
mux := http.NewServeMux()
|
||||
mux.Handle("/metrics", prometheus.Handler())
|
||||
mux.Handle("/", ph)
|
||||
plog.Fatal(http.Serve(l, mux))
|
||||
}()
|
||||
}
|
||||
return nil
|
||||
|
86
proxy/metrics.go
Normal file
86
proxy/metrics.go
Normal file
@ -0,0 +1,86 @@
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
requestsIncoming = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "proxy",
|
||||
Name: "requests_total",
|
||||
Help: "Counter requests incoming by method.",
|
||||
}, []string{"method"})
|
||||
|
||||
requestsHandled = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "proxy",
|
||||
Name: "handled_total",
|
||||
Help: "Counter of requests fully handled (by authoratitave servers)",
|
||||
}, []string{"method", "code"})
|
||||
|
||||
requestsDropped = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "proxy",
|
||||
Name: "dropped_total",
|
||||
Help: "Counter of requests dropped on the proxy.",
|
||||
}, []string{"method", "proxying_error"})
|
||||
|
||||
requestsHandlingTime = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "proxy",
|
||||
Name: "handling_duration_seconds",
|
||||
Help: "Bucketed histogram of handling time of successful events (non-watches), by method " +
|
||||
"(GET/PUT etc.).",
|
||||
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
|
||||
}, []string{"method"})
|
||||
)
|
||||
|
||||
type forwardingError string
|
||||
|
||||
const (
|
||||
zeroEndpoints forwardingError = "zero_endpoints"
|
||||
failedSendingRequest forwardingError = "failed_sending_request"
|
||||
failedGettingResponse forwardingError = "failed_getting_response"
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(requestsIncoming)
|
||||
prometheus.MustRegister(requestsHandled)
|
||||
prometheus.MustRegister(requestsDropped)
|
||||
prometheus.MustRegister(requestsHandlingTime)
|
||||
}
|
||||
|
||||
func reportIncomingRequest(request *http.Request) {
|
||||
requestsIncoming.WithLabelValues(request.Method).Inc()
|
||||
}
|
||||
|
||||
func reportRequestHandled(request *http.Request, response *http.Response, startTime time.Time) {
|
||||
method := request.Method
|
||||
requestsHandled.WithLabelValues(method, strconv.Itoa(response.StatusCode)).Inc()
|
||||
requestsHandlingTime.WithLabelValues(method).Observe(time.Since(startTime).Seconds())
|
||||
}
|
||||
|
||||
func reportRequestDropped(request *http.Request, err forwardingError) {
|
||||
requestsDropped.WithLabelValues(request.Method, string(err)).Inc()
|
||||
}
|
@ -27,6 +27,7 @@ import (
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Hop-by-hop headers. These are removed when sent to the backend.
|
||||
@ -57,6 +58,7 @@ type reverseProxy struct {
|
||||
func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request) {
|
||||
proxyreq := new(http.Request)
|
||||
*proxyreq = *clientreq
|
||||
startTime := time.Now()
|
||||
|
||||
var (
|
||||
proxybody []byte
|
||||
@ -84,6 +86,8 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
endpoints := p.director.endpoints()
|
||||
if len(endpoints) == 0 {
|
||||
msg := "proxy: zero endpoints currently available"
|
||||
reportRequestDropped(clientreq, zeroEndpoints)
|
||||
|
||||
// TODO: limit the rate of the error logging.
|
||||
log.Printf(msg)
|
||||
e := httptypes.NewHTTPError(http.StatusServiceUnavailable, msg)
|
||||
@ -127,6 +131,7 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
reportRequestDropped(clientreq, failedSendingRequest)
|
||||
log.Printf("proxy: failed to direct request to %s: %v", ep.URL.String(), err)
|
||||
ep.Failed()
|
||||
continue
|
||||
@ -138,6 +143,7 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
if res == nil {
|
||||
// TODO: limit the rate of the error logging.
|
||||
msg := fmt.Sprintf("proxy: unable to get response from %d endpoint(s)", len(endpoints))
|
||||
reportRequestDropped(clientreq, failedGettingResponse)
|
||||
log.Printf(msg)
|
||||
e := httptypes.NewHTTPError(http.StatusBadGateway, msg)
|
||||
e.WriteTo(rw)
|
||||
@ -145,7 +151,7 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
|
||||
reportRequestHandled(clientreq, res, startTime)
|
||||
removeSingleHopHeaders(&res.Header)
|
||||
copyHeader(rw.Header(), res.Header)
|
||||
|
||||
|
128
store/metrics.go
Normal file
128
store/metrics.go
Normal file
@ -0,0 +1,128 @@
|
||||
// Copyright 2015 CoreOS, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package store
|
||||
|
||||
import (
|
||||
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// Set of raw Prometheus metrics.
|
||||
// Labels
|
||||
// * action = declared in event.go
|
||||
// * outcome = Outcome
|
||||
// Do not increment directly, use Report* methods.
|
||||
var (
|
||||
readCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "store",
|
||||
Name: "reads_total",
|
||||
Help: "Total number of reads action by (get/getRecursive), local to this member.",
|
||||
}, []string{"action"})
|
||||
|
||||
writeCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "store",
|
||||
Name: "writes_total",
|
||||
Help: "Total number of writes (e.g. set/compareAndDelete) seen by this member.",
|
||||
}, []string{"action"})
|
||||
|
||||
readFailedCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "store",
|
||||
Name: "reads_failed_total",
|
||||
Help: "Failed read actions by (get/getRecursive), local to this member.",
|
||||
}, []string{"action"})
|
||||
|
||||
writeFailedCounter = prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "store",
|
||||
Name: "writes_failed_total",
|
||||
Help: "Failed write actions (e.g. set/compareAndDelete), seen by this member.",
|
||||
}, []string{"action"})
|
||||
|
||||
expireCounter = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "store",
|
||||
Name: "expires_total",
|
||||
Help: "Total number of expired keys.",
|
||||
})
|
||||
|
||||
watchRequests = prometheus.NewCounter(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "store",
|
||||
Name: "watch_requests_total",
|
||||
Help: "Total number of incoming watch requests (new or reestablished).",
|
||||
})
|
||||
|
||||
watcherCount = prometheus.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "etcd",
|
||||
Subsystem: "store",
|
||||
Name: "watchers",
|
||||
Help: "Count of currently active watchers.",
|
||||
})
|
||||
)
|
||||
|
||||
const (
|
||||
GetRecursive = "getRecursive"
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(readCounter)
|
||||
prometheus.MustRegister(writeCounter)
|
||||
prometheus.MustRegister(expireCounter)
|
||||
prometheus.MustRegister(watchRequests)
|
||||
prometheus.MustRegister(watcherCount)
|
||||
}
|
||||
|
||||
func reportReadSuccess(read_action string) {
|
||||
readCounter.WithLabelValues(read_action).Inc()
|
||||
}
|
||||
|
||||
func reportReadFailure(read_action string) {
|
||||
readCounter.WithLabelValues(read_action).Inc()
|
||||
readFailedCounter.WithLabelValues(read_action).Inc()
|
||||
}
|
||||
|
||||
func reportWriteSuccess(write_action string) {
|
||||
writeCounter.WithLabelValues(write_action).Inc()
|
||||
}
|
||||
|
||||
func reportWriteFailure(write_action string) {
|
||||
writeCounter.WithLabelValues(write_action).Inc()
|
||||
writeFailedCounter.WithLabelValues(write_action).Inc()
|
||||
}
|
||||
|
||||
func reportExpiredKey() {
|
||||
expireCounter.Inc()
|
||||
}
|
||||
|
||||
func reportWatchRequest() {
|
||||
watchRequests.Inc()
|
||||
}
|
||||
|
||||
func reportWatcherAdded() {
|
||||
watcherCount.Inc()
|
||||
}
|
||||
|
||||
func reportWatcherRemoved() {
|
||||
watcherCount.Dec()
|
||||
}
|
@ -121,6 +121,11 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
|
||||
|
||||
if err != nil {
|
||||
s.Stats.Inc(GetFail)
|
||||
if recursive {
|
||||
reportReadFailure(GetRecursive)
|
||||
} else {
|
||||
reportReadFailure(Get)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -129,6 +134,11 @@ func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
|
||||
e.Node.loadInternalNode(n, recursive, sorted, s.clock)
|
||||
|
||||
s.Stats.Inc(GetSuccess)
|
||||
if recursive {
|
||||
reportReadSuccess(GetRecursive)
|
||||
} else {
|
||||
reportReadSuccess(Get)
|
||||
}
|
||||
|
||||
return e, nil
|
||||
}
|
||||
@ -145,8 +155,10 @@ func (s *store) Create(nodePath string, dir bool, value string, unique bool, exp
|
||||
e.EtcdIndex = s.CurrentIndex
|
||||
s.WatcherHub.notify(e)
|
||||
s.Stats.Inc(CreateSuccess)
|
||||
reportWriteSuccess(Create)
|
||||
} else {
|
||||
s.Stats.Inc(CreateFail)
|
||||
reportWriteFailure(Create)
|
||||
}
|
||||
|
||||
return e, err
|
||||
@ -162,8 +174,10 @@ func (s *store) Set(nodePath string, dir bool, value string, expireTime time.Tim
|
||||
defer func() {
|
||||
if err == nil {
|
||||
s.Stats.Inc(SetSuccess)
|
||||
reportWriteSuccess(Set)
|
||||
} else {
|
||||
s.Stats.Inc(SetFail)
|
||||
reportWriteFailure(Set)
|
||||
}
|
||||
}()
|
||||
|
||||
@ -221,11 +235,13 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
|
||||
|
||||
if err != nil {
|
||||
s.Stats.Inc(CompareAndSwapFail)
|
||||
reportWriteFailure(CompareAndSwap)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if n.IsDir() { // can only compare and swap file
|
||||
s.Stats.Inc(CompareAndSwapFail)
|
||||
reportWriteFailure(CompareAndSwap)
|
||||
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
|
||||
}
|
||||
|
||||
@ -234,6 +250,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
|
||||
if ok, which := n.Compare(prevValue, prevIndex); !ok {
|
||||
cause := getCompareFailCause(n, which, prevValue, prevIndex)
|
||||
s.Stats.Inc(CompareAndSwapFail)
|
||||
reportWriteFailure(CompareAndSwap)
|
||||
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
|
||||
}
|
||||
|
||||
@ -256,6 +273,7 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
|
||||
|
||||
s.WatcherHub.notify(e)
|
||||
s.Stats.Inc(CompareAndSwapSuccess)
|
||||
reportWriteSuccess(CompareAndSwap)
|
||||
|
||||
return e, nil
|
||||
}
|
||||
@ -281,6 +299,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
||||
|
||||
if err != nil { // if the node does not exist, return error
|
||||
s.Stats.Inc(DeleteFail)
|
||||
reportWriteFailure(Delete)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -303,6 +322,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
||||
|
||||
if err != nil {
|
||||
s.Stats.Inc(DeleteFail)
|
||||
reportWriteFailure(Delete)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -312,6 +332,7 @@ func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
|
||||
s.WatcherHub.notify(e)
|
||||
|
||||
s.Stats.Inc(DeleteSuccess)
|
||||
reportWriteSuccess(Delete)
|
||||
|
||||
return e, nil
|
||||
}
|
||||
@ -326,11 +347,13 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
|
||||
|
||||
if err != nil { // if the node does not exist, return error
|
||||
s.Stats.Inc(CompareAndDeleteFail)
|
||||
reportWriteFailure(CompareAndDelete)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if n.IsDir() { // can only compare and delete file
|
||||
s.Stats.Inc(CompareAndSwapFail)
|
||||
reportWriteFailure(CompareAndDelete)
|
||||
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, s.CurrentIndex)
|
||||
}
|
||||
|
||||
@ -339,6 +362,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
|
||||
if ok, which := n.Compare(prevValue, prevIndex); !ok {
|
||||
cause := getCompareFailCause(n, which, prevValue, prevIndex)
|
||||
s.Stats.Inc(CompareAndDeleteFail)
|
||||
reportWriteFailure(CompareAndDelete)
|
||||
return nil, etcdErr.NewError(etcdErr.EcodeTestFailed, cause, s.CurrentIndex)
|
||||
}
|
||||
|
||||
@ -361,6 +385,7 @@ func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex ui
|
||||
|
||||
s.WatcherHub.notify(e)
|
||||
s.Stats.Inc(CompareAndDeleteSuccess)
|
||||
reportWriteSuccess(CompareAndDelete)
|
||||
|
||||
return e, nil
|
||||
}
|
||||
@ -423,6 +448,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
|
||||
|
||||
if err != nil { // if the node does not exist, return error
|
||||
s.Stats.Inc(UpdateFail)
|
||||
reportWriteFailure(Update)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -434,6 +460,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
|
||||
if n.IsDir() && len(newValue) != 0 {
|
||||
// if the node is a directory, we cannot update value to non-empty
|
||||
s.Stats.Inc(UpdateFail)
|
||||
reportWriteFailure(Update)
|
||||
return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath, currIndex)
|
||||
}
|
||||
|
||||
@ -455,6 +482,7 @@ func (s *store) Update(nodePath string, newValue string, expireTime time.Time) (
|
||||
s.WatcherHub.notify(e)
|
||||
|
||||
s.Stats.Inc(UpdateSuccess)
|
||||
reportWriteSuccess(Update)
|
||||
|
||||
s.CurrentIndex = nextIndex
|
||||
|
||||
@ -490,6 +518,7 @@ func (s *store) internalCreate(nodePath string, dir bool, value string, unique,
|
||||
|
||||
if err != nil {
|
||||
s.Stats.Inc(SetFail)
|
||||
reportWriteFailure(action)
|
||||
err.Index = currIndex
|
||||
return nil, err
|
||||
}
|
||||
@ -592,6 +621,7 @@ func (s *store) DeleteExpiredKeys(cutoff time.Time) {
|
||||
s.ttlKeyHeap.pop()
|
||||
node.Remove(true, true, callback)
|
||||
|
||||
reportExpiredKey()
|
||||
s.Stats.Inc(ExpireCount)
|
||||
|
||||
s.WatcherHub.notify(e)
|
||||
|
@ -53,6 +53,7 @@ func newWatchHub(capacity int) *watcherHub {
|
||||
// If recursive is false, the first change after index at key will be sent to the event channel of the watcher.
|
||||
// If index is zero, watch will start from the current index + 1.
|
||||
func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *etcdErr.Error) {
|
||||
reportWatchRequest()
|
||||
event, err := wh.EventHistory.scan(key, recursive, index)
|
||||
|
||||
if err != nil {
|
||||
@ -97,12 +98,14 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
|
||||
w.removed = true
|
||||
l.Remove(elem)
|
||||
atomic.AddInt64(&wh.count, -1)
|
||||
reportWatcherRemoved()
|
||||
if l.Len() == 0 {
|
||||
delete(wh.watchers, key)
|
||||
}
|
||||
}
|
||||
|
||||
atomic.AddInt64(&wh.count, 1)
|
||||
reportWatcherAdded()
|
||||
|
||||
return w, nil
|
||||
}
|
||||
@ -148,6 +151,7 @@ func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
|
||||
w.removed = true
|
||||
l.Remove(curr)
|
||||
atomic.AddInt64(&wh.count, -1)
|
||||
reportWatcherRemoved()
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user